Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/dialects/abstract/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ class ConnectionManager {
_options.logging.__testLoggingFn = true;

return this.sequelize.databaseVersion(_options).then(version => {
if (this.dialectName === 'postgres' && _.isString(version) &&
!version.match(/\.\d+\./)) {
version += '.0';
}
this.sequelize.options.databaseVersion = semver.valid(version) ? version : this.defaultVersion;
this.versionPromise = null;

Expand Down
65 changes: 63 additions & 2 deletions lib/dialects/abstract/query-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,30 @@ class QueryGenerator {
emptyQuery += ' VALUES ()';
}

if (this._dialect.supports.returnValues && options.returning) {
if (this.dialect === 'postgres') {
if (options.onConflict && semver.gte(this.sequelize.options.databaseVersion, '9.5.0')) {
const constraint = options.conflict.constraint;
const target = Array.isArray(options.conflict.target) ? options.conflict.target : [options.conflict.target];
if (constraint) {
valueQuery += ' ON CONFLICT ON CONSTRAINT ' + this.quoteIdentifier(constraint) + options.onConflict;
} else if (target) {
const conflictTarget = target.map(e => this.quoteIdentifier(e)).join(', ');
valueQuery += ' ON CONFLICT (' + conflictTarget + ') ';

const conflictPredicate = options.conflict.update.where;
if (conflictPredicate) {
valueQuery += 'WHERE ' + this.whereItemsQuery(conflictPredicate, options) + ' ';
}

valueQuery += + options.onConflict;
}
valueQuery += ' RETURNING ' + options.returning;
} else {
options.onConflictFallback = true;
}
}

if (this._dialect.supports.returnValues && options.returning && !options.isPgUpsert) {
if (this._dialect.supports.returnValues.returning) {
valueQuery += ' RETURNING *';
emptyQuery += ' RETURNING *';
Expand Down Expand Up @@ -251,11 +274,12 @@ class QueryGenerator {
options = options || {};
fieldMappedAttributes = fieldMappedAttributes || {};

const query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %><%= onDuplicateKeyUpdate %><%= returning %>;';
const query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %><%= onDuplicateKeyUpdate %><%= onConflictKeyUpdate %><%= returning %>;';
const tuples = [];
const serials = {};
const allAttributes = [];
let onDuplicateKeyUpdate = '';
let onConflictKeyUpdate = '';

for (const fieldValueHash of fieldValueHashes) {
_.forOwn(fieldValueHash, (value, key) => {
Expand Down Expand Up @@ -293,12 +317,49 @@ class QueryGenerator {
}).join(',');
}

if (this._dialect.supports.updateOnConflict && options.updateOnConflict) {
const { primaryKeyField, rawAttributes } = options.model;
options.updateOnConflict = _.extend({
constraint: '',
target: primaryKeyField,
update: {}
}, options.updateOnConflict || {});
if (!options.updateOnConflict.update.columns) {
options.updateOnConflict.update.columns = allAttributes.filter(value => value !== primaryKeyField);
}

const { constraint, target, update } = options.updateOnConflict;
let conflict = '';
if (constraint) {
conflict = ' ON CONFLICT ON CONSTRAINT ' + this.quoteIdentifier(constraint);
} else {
const targetArr = Array.isArray(target) ? target : [target];
const conflictTarget = targetArr.map(e => this.quoteIdentifier(e)).join(', ');
conflict = ' ON CONFLICT (' + conflictTarget + ')';

if (update.where) {
conflict += ' WHERE ' + this.whereItemsQuery(update.where, options);
}
}

if (update === false) {
onConflictKeyUpdate += ' DO NOTHING ';
} else {
onConflictKeyUpdate += conflict + ' DO UPDATE SET ' + update.columns.map(attr => {
const field = rawAttributes[attr] ? rawAttributes[attr].fieldName : attr;
const key = this.quoteIdentifier(field);
return key + ' = EXCLUDED.' + key;
}).join(',');
}
}

const replacements = {
ignoreDuplicates: options.ignoreDuplicates ? this._dialect.supports.ignoreDuplicates : '',
table: this.quoteTable(tableName),
attributes: allAttributes.map(attr => this.quoteIdentifier(attr)).join(','),
tuples: tuples.join(','),
onDuplicateKeyUpdate,
onConflictKeyUpdate,
returning: this._dialect.supports.returnValues && options.returning ? ' RETURNING *' : ''
};

Expand Down
1 change: 1 addition & 0 deletions lib/dialects/postgres/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototy
JSONB: true,
HSTORE: true,
deferrableConstraints: true,
updateOnConflict: true,
searchPath: true
});

Expand Down
86 changes: 74 additions & 12 deletions lib/dialects/postgres/query-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -342,21 +342,83 @@ class PostgresQueryGenerator extends AbstractQueryGenerator {
}

upsertQuery(tableName, insertValues, updateValues, where, model, options) {
const primaryField = this.quoteIdentifier(model.primaryKeyField);
const rawAttributes = model.rawAttributes;
const indexes = model.options.indexes;
const uniqueKeys = Object.keys(rawAttributes).filter(e => rawAttributes[e].unique);
if (!uniqueKeys.length) {
uniqueKeys.push(model.primaryKeyField);
}
const uniqueIndexes = indexes.filter(e => e.unique).map(e => e.name);
const uniqueCount = uniqueKeys.length + uniqueIndexes.length;
const fields = Object.keys(model.fieldAttributeMap);
const fieldMap = {};
fields.forEach(field => {
fieldMap[model.fieldAttributeMap[field]] = field;
});

if (uniqueCount > 1 && (!options.conflict || options.conflict && !options.conflict.constraint) ||
semver.lt(this.sequelize.options.databaseVersion, '9.5.0')) {
options.onConflictFallback = true;
const primaryField = this.quoteIdentifier(model.primaryKeyField);

let insert = this.insertQuery(tableName, insertValues, model.rawAttributes, options);
let update = this.updateQuery(tableName, updateValues, where, options, model.rawAttributes);

insert = insert.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);
update = update.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);

return this.exceptionFn(
'sequelize_upsert',
tableName,
'OUT created boolean, OUT primary_key text',
`${insert} created := true;`,
`${update}; created := false`
);
}

const getUpdateArray = (values, target) =>
Object.keys(values).map(key => fieldMap[key] || key).filter(value => target.indexOf(value) === -1);

options.isPgUpsert = true;
if (!options.conflict) {
options.conflict = {
target: uniqueKeys
};
}
if (!options.conflict.update) {
options.conflict.update = {
columns: getUpdateArray(updateValues, uniqueKeys)
}
}

if (options.conflict.update.columns === false) {
options.onConflict = ' DO NOTHING';
} else {
options.conflict.target = options.conflict.target || uniqueKeys;
if (!Array.isArray(options.conflict.target)) {
options.conflict.target = [options.conflict.target];
}
options.conflict.target = options.conflict.target.map(key => fieldMap[key] || key);
if (!options.conflict.update.columns) {
options.conflict.update.columns = getUpdateArray(updateValues, options.conflict.target);
}

let insert = this.insertQuery(tableName, insertValues, model.rawAttributes, options);
let update = this.updateQuery(tableName, updateValues, where, options, model.rawAttributes);
if (rawAttributes.updatedAt) {
insertValues.updatedAt = new Date();
if (options.conflict.update.columns.indexOf('updatedAt') === -1) {
options.conflict.update.columns.push('updatedAt');
}
}

options.onConflict = 'DO UPDATE SET ' + options.conflict.update.columns.map(key => {
key = this.quoteIdentifier(key);
return key + ' = EXCLUDED.' + key;
}).join(', ');
}

insert = insert.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);
update = update.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);
options.returning = '*, (xmax = 0)::bool AS xmax';

return this.exceptionFn(
'sequelize_upsert',
tableName,
'OUT created boolean, OUT primary_key text',
`${insert} created := true;`,
`${update}; created := false`
);
return this.insertQuery(tableName, insertValues, rawAttributes, options);
}

deleteQuery(tableName, where, options, model) {
Expand Down
8 changes: 7 additions & 1 deletion lib/dialects/postgres/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ class Query extends AbstractQuery {
} else if (QueryTypes.BULKDELETE === this.options.type) {
return parseInt(rowCount, 10);
} else if (this.isUpsertQuery()) {
return rows[0];
if (this.options.onConflictFallback) {
return rows[0];
} else {
this.options.fieldMap = this.options.model.fieldAttributeMap;
const items = this.options.plain ? rows : this.handleSelectQuery(rows);
return [items[0], rows[0].xmax];
}
} else if (this.isInsertQuery() || this.isUpdateQuery()) {
if (this.instance && this.instance.dataValues) {
for (const key in rows[0]) {
Expand Down
3 changes: 3 additions & 0 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,9 @@ class Model {
}).then(() => {
return this.QueryInterface.upsert(this.getTableName(options), insertValues, updateValues, instance.where(), this, options);
}).spread((created, primaryKey) => {
if (primaryKey === true || primaryKey === false) {
return options.returning === true ? [created, primaryKey] : primaryKey;
}
if (options.returning === true && primaryKey) {
return this.findById(primaryKey, options).then(record => [record, created]);
}
Expand Down
4 changes: 2 additions & 2 deletions lib/query-interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -959,13 +959,13 @@ class QueryInterface {
where = { [Op.or]: wheres };

options.type = QueryTypes.UPSERT;
options.raw = true;
options.raw = options.isPgUpsert || false;

const sql = this.QueryGenerator.upsertQuery(tableName, insertValues, updateValues, where, model, options);
return this.sequelize.query(sql, options).then(result => {
switch (this.sequelize.options.dialect) {
case 'postgres':
return [result.created, result.primary_key];
return options.isPgUpsert ? result : [result.created, result.primary_key];

case 'mssql':
return [
Expand Down
54 changes: 44 additions & 10 deletions test/integration/model/upsert.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ describe(Support.getTestDialectTeaser('Model'), () => {
});

it('works with upsert on a composite key', function() {
return this.User.upsert({ foo: 'baz', bar: 19, username: 'john' }).bind(this).then(function(created) {
const options = { conflict: { constraint: 'users_foo_bar_key' } };
return this.User.upsert({ foo: 'baz', bar: 19, username: 'john' }, options).bind(this).then(function(created) {
if (dialect === 'sqlite') {
expect(created).to.be.undefined;
} else {
expect(created).to.be.ok;
}

this.clock.tick(1000);
return this.User.upsert({ foo: 'baz', bar: 19, username: 'doe' });
return this.User.upsert({ foo: 'baz', bar: 19, username: 'doe' }, options);
}).then(function(created) {
if (dialect === 'sqlite') {
expect(created).to.be.undefined;
Expand Down Expand Up @@ -142,12 +143,13 @@ describe(Support.getTestDialectTeaser('Model'), () => {
},
username: DataTypes.STRING
});
const options = { conflict: { constraint: 'users_pkey' } };

return User.sync({ force: true }).bind(this).then(() => {
return Promise.all([
// Create two users
User.upsert({ a: 'a', b: 'b', username: 'john' }),
User.upsert({ a: 'a', b: 'a', username: 'curt' })
User.upsert({ a: 'a', b: 'b', username: 'john' }, options),
User.upsert({ a: 'a', b: 'a', username: 'curt' }, options),
]);
}).spread(function(created1, created2) {
if (dialect === 'sqlite') {
Expand All @@ -160,7 +162,7 @@ describe(Support.getTestDialectTeaser('Model'), () => {

this.clock.tick(1000);
// Update the first one
return User.upsert({ a: 'a', b: 'b', username: 'doe' });
return User.upsert({ a: 'a', b: 'b', username: 'doe' }, options);
}).then(created => {
if (dialect === 'sqlite') {
expect(created).to.be.undefined;
Expand Down Expand Up @@ -268,15 +270,16 @@ describe(Support.getTestDialectTeaser('Model'), () => {
});

it('works with primary key using .field', function() {
return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'first' }).bind(this).then(function(created) {
const options = { conflict: { target: 'userId' } };
return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'first' }, options).bind(this).then(function(created) {
if (dialect === 'sqlite') {
expect(created).to.be.undefined;
} else {
expect(created).to.be.ok;
}

this.clock.tick(1000);
return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'second' });
return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'second' }, options);
}).then(function(created) {
if (dialect === 'sqlite') {
expect(created).to.be.undefined;
Expand Down Expand Up @@ -464,14 +467,15 @@ describe(Support.getTestDialectTeaser('Model'), () => {
});

return User.sync({ force: true }).then(() => {
return User.upsert({ name: 'user1', address: 'address', city: 'City' })
const options = { conflict: { target: ['name', 'address'] } };
return User.upsert({ name: 'user1', address: 'address', city: 'City' }, options)
.then(created => {
if (dialect === 'sqlite') {
expect(created).to.be.undefined;
} else {
expect(created).to.be.ok;
}
return User.upsert({ name: 'user1', address: 'address', city: 'New City' });
return User.upsert({ name: 'user1', address: 'address', city: 'New City' }, options);
}).then(created => {
if (dialect === 'sqlite') {
expect(created).to.be.undefined;
Expand Down Expand Up @@ -537,7 +541,8 @@ describe(Support.getTestDialectTeaser('Model'), () => {
// this record is soft deleted
User.create({ name: 'user3', deletedAt: -Infinity })
]).then(() => {
return User.upsert({ name: 'user1', address: 'address' });
const options = { conflict: { target: ['name', 'deletedAt'] } };
return User.upsert({ name: 'user1', address: 'address' }, options);
}).then(() => {
return User.findAll({
where: { address: null }
Expand All @@ -547,6 +552,35 @@ describe(Support.getTestDialectTeaser('Model'), () => {
});
});
});

it('works with bulkCreate', function() {
const User = this.sequelize.define('User', {
name: {
type: DataTypes.STRING,
primaryKey: true
},
address: DataTypes.STRING
});

return User.sync({ force: true }).then(() => {
return User.bulkCreate([
{ name: 'user1', address: 'user1' },
{ name: 'user2' }
]).then(() => {
return User.bulkCreate([
{ name: 'user1' },
{ name: 'user2', address: 'user2' }
], { updateOnConflict: true });
}).then(() => {
return User.findAll({
where: { address: null }
});
}).then(users => {
expect(users).to.have.lengthOf(1);
expect(users[0].name).to.equal('user1');
});
});
});
}

if (current.dialect.supports.returnValues) {
Expand Down