diff --git a/lib/dialects/abstract/connection-manager.js b/lib/dialects/abstract/connection-manager.js index 17c0f3d026b6..15597d7cc67b 100644 --- a/lib/dialects/abstract/connection-manager.js +++ b/lib/dialects/abstract/connection-manager.js @@ -256,6 +256,10 @@ class ConnectionManager { _options.logging.__testLoggingFn = true; return this.sequelize.databaseVersion(_options).then(version => { + if (this.dialectName === 'postgres' && _.isString(version) && + !version.match(/\.\d+\./)) { + version += '.0'; + } this.sequelize.options.databaseVersion = semver.valid(version) ? version : this.defaultVersion; this.versionPromise = null; diff --git a/lib/dialects/abstract/query-generator.js b/lib/dialects/abstract/query-generator.js index 12eedd58685d..d57223b2cf41 100755 --- a/lib/dialects/abstract/query-generator.js +++ b/lib/dialects/abstract/query-generator.js @@ -128,7 +128,30 @@ class QueryGenerator { emptyQuery += ' VALUES ()'; } - if (this._dialect.supports.returnValues && options.returning) { + if (this.dialect === 'postgres') { + if (options.onConflict && semver.gte(this.sequelize.options.databaseVersion, '9.5.0')) { + const constraint = options.conflict.constraint; + const target = Array.isArray(options.conflict.target) ? options.conflict.target : [options.conflict.target]; + if (constraint) { + valueQuery += ' ON CONFLICT ON CONSTRAINT ' + this.quoteIdentifier(constraint) + options.onConflict; + } else if (target) { + const conflictTarget = target.map(e => this.quoteIdentifier(e)).join(', '); + valueQuery += ' ON CONFLICT (' + conflictTarget + ') '; + + const conflictPredicate = options.conflict.update.where; + if (conflictPredicate) { + valueQuery += 'WHERE ' + this.whereItemsQuery(conflictPredicate, options) + ' '; + } + + valueQuery += + options.onConflict; + } + valueQuery += ' RETURNING ' + options.returning; + } else { + options.onConflictFallback = true; + } + } + + if (this._dialect.supports.returnValues && options.returning && !options.isPgUpsert) { if (this._dialect.supports.returnValues.returning) { valueQuery += ' RETURNING *'; emptyQuery += ' RETURNING *'; @@ -251,11 +274,12 @@ class QueryGenerator { options = options || {}; fieldMappedAttributes = fieldMappedAttributes || {}; - const query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %><%= onDuplicateKeyUpdate %><%= returning %>;'; + const query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %><%= onDuplicateKeyUpdate %><%= onConflictKeyUpdate %><%= returning %>;'; const tuples = []; const serials = {}; const allAttributes = []; let onDuplicateKeyUpdate = ''; + let onConflictKeyUpdate = ''; for (const fieldValueHash of fieldValueHashes) { _.forOwn(fieldValueHash, (value, key) => { @@ -293,12 +317,49 @@ class QueryGenerator { }).join(','); } + if (this._dialect.supports.updateOnConflict && options.updateOnConflict) { + const { primaryKeyField, rawAttributes } = options.model; + options.updateOnConflict = _.extend({ + constraint: '', + target: primaryKeyField, + update: {} + }, options.updateOnConflict || {}); + if (!options.updateOnConflict.update.columns) { + options.updateOnConflict.update.columns = allAttributes.filter(value => value !== primaryKeyField); + } + + const { constraint, target, update } = options.updateOnConflict; + let conflict = ''; + if (constraint) { + conflict = ' ON CONFLICT ON CONSTRAINT ' + this.quoteIdentifier(constraint); + } else { + const targetArr = Array.isArray(target) ? target : [target]; + const conflictTarget = targetArr.map(e => this.quoteIdentifier(e)).join(', '); + conflict = ' ON CONFLICT (' + conflictTarget + ')'; + + if (update.where) { + conflict += ' WHERE ' + this.whereItemsQuery(update.where, options); + } + } + + if (update === false) { + onConflictKeyUpdate += ' DO NOTHING '; + } else { + onConflictKeyUpdate += conflict + ' DO UPDATE SET ' + update.columns.map(attr => { + const field = rawAttributes[attr] ? rawAttributes[attr].fieldName : attr; + const key = this.quoteIdentifier(field); + return key + ' = EXCLUDED.' + key; + }).join(','); + } + } + const replacements = { ignoreDuplicates: options.ignoreDuplicates ? this._dialect.supports.ignoreDuplicates : '', table: this.quoteTable(tableName), attributes: allAttributes.map(attr => this.quoteIdentifier(attr)).join(','), tuples: tuples.join(','), onDuplicateKeyUpdate, + onConflictKeyUpdate, returning: this._dialect.supports.returnValues && options.returning ? ' RETURNING *' : '' }; diff --git a/lib/dialects/postgres/index.js b/lib/dialects/postgres/index.js index 3bb09e4f98e7..e3afb2c6d587 100644 --- a/lib/dialects/postgres/index.js +++ b/lib/dialects/postgres/index.js @@ -49,6 +49,7 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototy JSONB: true, HSTORE: true, deferrableConstraints: true, + updateOnConflict: true, searchPath: true }); diff --git a/lib/dialects/postgres/query-generator.js b/lib/dialects/postgres/query-generator.js index 66b0acc3a74c..96218d013a0b 100755 --- a/lib/dialects/postgres/query-generator.js +++ b/lib/dialects/postgres/query-generator.js @@ -342,21 +342,83 @@ class PostgresQueryGenerator extends AbstractQueryGenerator { } upsertQuery(tableName, insertValues, updateValues, where, model, options) { - const primaryField = this.quoteIdentifier(model.primaryKeyField); + const rawAttributes = model.rawAttributes; + const indexes = model.options.indexes; + const uniqueKeys = Object.keys(rawAttributes).filter(e => rawAttributes[e].unique); + if (!uniqueKeys.length) { + uniqueKeys.push(model.primaryKeyField); + } + const uniqueIndexes = indexes.filter(e => e.unique).map(e => e.name); + const uniqueCount = uniqueKeys.length + uniqueIndexes.length; + const fields = Object.keys(model.fieldAttributeMap); + const fieldMap = {}; + fields.forEach(field => { + fieldMap[model.fieldAttributeMap[field]] = field; + }); + + if (uniqueCount > 1 && (!options.conflict || options.conflict && !options.conflict.constraint) || + semver.lt(this.sequelize.options.databaseVersion, '9.5.0')) { + options.onConflictFallback = true; + const primaryField = this.quoteIdentifier(model.primaryKeyField); + + let insert = this.insertQuery(tableName, insertValues, model.rawAttributes, options); + let update = this.updateQuery(tableName, updateValues, where, options, model.rawAttributes); + + insert = insert.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`); + update = update.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`); + + return this.exceptionFn( + 'sequelize_upsert', + tableName, + 'OUT created boolean, OUT primary_key text', + `${insert} created := true;`, + `${update}; created := false` + ); + } + + const getUpdateArray = (values, target) => + Object.keys(values).map(key => fieldMap[key] || key).filter(value => target.indexOf(value) === -1); + + options.isPgUpsert = true; + if (!options.conflict) { + options.conflict = { + target: uniqueKeys + }; + } + if (!options.conflict.update) { + options.conflict.update = { + columns: getUpdateArray(updateValues, uniqueKeys) + } + } + + if (options.conflict.update.columns === false) { + options.onConflict = ' DO NOTHING'; + } else { + options.conflict.target = options.conflict.target || uniqueKeys; + if (!Array.isArray(options.conflict.target)) { + options.conflict.target = [options.conflict.target]; + } + options.conflict.target = options.conflict.target.map(key => fieldMap[key] || key); + if (!options.conflict.update.columns) { + options.conflict.update.columns = getUpdateArray(updateValues, options.conflict.target); + } - let insert = this.insertQuery(tableName, insertValues, model.rawAttributes, options); - let update = this.updateQuery(tableName, updateValues, where, options, model.rawAttributes); + if (rawAttributes.updatedAt) { + insertValues.updatedAt = new Date(); + if (options.conflict.update.columns.indexOf('updatedAt') === -1) { + options.conflict.update.columns.push('updatedAt'); + } + } + + options.onConflict = 'DO UPDATE SET ' + options.conflict.update.columns.map(key => { + key = this.quoteIdentifier(key); + return key + ' = EXCLUDED.' + key; + }).join(', '); + } - insert = insert.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`); - update = update.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`); + options.returning = '*, (xmax = 0)::bool AS xmax'; - return this.exceptionFn( - 'sequelize_upsert', - tableName, - 'OUT created boolean, OUT primary_key text', - `${insert} created := true;`, - `${update}; created := false` - ); + return this.insertQuery(tableName, insertValues, rawAttributes, options); } deleteQuery(tableName, where, options, model) { diff --git a/lib/dialects/postgres/query.js b/lib/dialects/postgres/query.js index 8e517d7768d7..058e15f85eca 100644 --- a/lib/dialects/postgres/query.js +++ b/lib/dialects/postgres/query.js @@ -248,7 +248,13 @@ class Query extends AbstractQuery { } else if (QueryTypes.BULKDELETE === this.options.type) { return parseInt(rowCount, 10); } else if (this.isUpsertQuery()) { - return rows[0]; + if (this.options.onConflictFallback) { + return rows[0]; + } else { + this.options.fieldMap = this.options.model.fieldAttributeMap; + const items = this.options.plain ? rows : this.handleSelectQuery(rows); + return [items[0], rows[0].xmax]; + } } else if (this.isInsertQuery() || this.isUpdateQuery()) { if (this.instance && this.instance.dataValues) { for (const key in rows[0]) { diff --git a/lib/model.js b/lib/model.js index fbfe8a4cc613..bc1655441968 100644 --- a/lib/model.js +++ b/lib/model.js @@ -2285,6 +2285,9 @@ class Model { }).then(() => { return this.QueryInterface.upsert(this.getTableName(options), insertValues, updateValues, instance.where(), this, options); }).spread((created, primaryKey) => { + if (primaryKey === true || primaryKey === false) { + return options.returning === true ? [created, primaryKey] : primaryKey; + } if (options.returning === true && primaryKey) { return this.findById(primaryKey, options).then(record => [record, created]); } diff --git a/lib/query-interface.js b/lib/query-interface.js index e1eaf291c050..2036817da9ab 100644 --- a/lib/query-interface.js +++ b/lib/query-interface.js @@ -959,13 +959,13 @@ class QueryInterface { where = { [Op.or]: wheres }; options.type = QueryTypes.UPSERT; - options.raw = true; + options.raw = options.isPgUpsert || false; const sql = this.QueryGenerator.upsertQuery(tableName, insertValues, updateValues, where, model, options); return this.sequelize.query(sql, options).then(result => { switch (this.sequelize.options.dialect) { case 'postgres': - return [result.created, result.primary_key]; + return options.isPgUpsert ? result : [result.created, result.primary_key]; case 'mssql': return [ diff --git a/test/integration/model/upsert.test.js b/test/integration/model/upsert.test.js index 2b32426f37cd..b37fd377d1de 100644 --- a/test/integration/model/upsert.test.js +++ b/test/integration/model/upsert.test.js @@ -86,7 +86,8 @@ describe(Support.getTestDialectTeaser('Model'), () => { }); it('works with upsert on a composite key', function() { - return this.User.upsert({ foo: 'baz', bar: 19, username: 'john' }).bind(this).then(function(created) { + const options = { conflict: { constraint: 'users_foo_bar_key' } }; + return this.User.upsert({ foo: 'baz', bar: 19, username: 'john' }, options).bind(this).then(function(created) { if (dialect === 'sqlite') { expect(created).to.be.undefined; } else { @@ -94,7 +95,7 @@ describe(Support.getTestDialectTeaser('Model'), () => { } this.clock.tick(1000); - return this.User.upsert({ foo: 'baz', bar: 19, username: 'doe' }); + return this.User.upsert({ foo: 'baz', bar: 19, username: 'doe' }, options); }).then(function(created) { if (dialect === 'sqlite') { expect(created).to.be.undefined; @@ -142,12 +143,13 @@ describe(Support.getTestDialectTeaser('Model'), () => { }, username: DataTypes.STRING }); + const options = { conflict: { constraint: 'users_pkey' } }; return User.sync({ force: true }).bind(this).then(() => { return Promise.all([ // Create two users - User.upsert({ a: 'a', b: 'b', username: 'john' }), - User.upsert({ a: 'a', b: 'a', username: 'curt' }) + User.upsert({ a: 'a', b: 'b', username: 'john' }, options), + User.upsert({ a: 'a', b: 'a', username: 'curt' }, options), ]); }).spread(function(created1, created2) { if (dialect === 'sqlite') { @@ -160,7 +162,7 @@ describe(Support.getTestDialectTeaser('Model'), () => { this.clock.tick(1000); // Update the first one - return User.upsert({ a: 'a', b: 'b', username: 'doe' }); + return User.upsert({ a: 'a', b: 'b', username: 'doe' }, options); }).then(created => { if (dialect === 'sqlite') { expect(created).to.be.undefined; @@ -268,7 +270,8 @@ describe(Support.getTestDialectTeaser('Model'), () => { }); it('works with primary key using .field', function() { - return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'first' }).bind(this).then(function(created) { + const options = { conflict: { target: 'userId' } }; + return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'first' }, options).bind(this).then(function(created) { if (dialect === 'sqlite') { expect(created).to.be.undefined; } else { @@ -276,7 +279,7 @@ describe(Support.getTestDialectTeaser('Model'), () => { } this.clock.tick(1000); - return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'second' }); + return this.ModelWithFieldPK.upsert({ userId: 42, foo: 'second' }, options); }).then(function(created) { if (dialect === 'sqlite') { expect(created).to.be.undefined; @@ -464,14 +467,15 @@ describe(Support.getTestDialectTeaser('Model'), () => { }); return User.sync({ force: true }).then(() => { - return User.upsert({ name: 'user1', address: 'address', city: 'City' }) + const options = { conflict: { target: ['name', 'address'] } }; + return User.upsert({ name: 'user1', address: 'address', city: 'City' }, options) .then(created => { if (dialect === 'sqlite') { expect(created).to.be.undefined; } else { expect(created).to.be.ok; } - return User.upsert({ name: 'user1', address: 'address', city: 'New City' }); + return User.upsert({ name: 'user1', address: 'address', city: 'New City' }, options); }).then(created => { if (dialect === 'sqlite') { expect(created).to.be.undefined; @@ -537,7 +541,8 @@ describe(Support.getTestDialectTeaser('Model'), () => { // this record is soft deleted User.create({ name: 'user3', deletedAt: -Infinity }) ]).then(() => { - return User.upsert({ name: 'user1', address: 'address' }); + const options = { conflict: { target: ['name', 'deletedAt'] } }; + return User.upsert({ name: 'user1', address: 'address' }, options); }).then(() => { return User.findAll({ where: { address: null } @@ -547,6 +552,35 @@ describe(Support.getTestDialectTeaser('Model'), () => { }); }); }); + + it('works with bulkCreate', function() { + const User = this.sequelize.define('User', { + name: { + type: DataTypes.STRING, + primaryKey: true + }, + address: DataTypes.STRING + }); + + return User.sync({ force: true }).then(() => { + return User.bulkCreate([ + { name: 'user1', address: 'user1' }, + { name: 'user2' } + ]).then(() => { + return User.bulkCreate([ + { name: 'user1' }, + { name: 'user2', address: 'user2' } + ], { updateOnConflict: true }); + }).then(() => { + return User.findAll({ + where: { address: null } + }); + }).then(users => { + expect(users).to.have.lengthOf(1); + expect(users[0].name).to.equal('user1'); + }); + }); + }); } if (current.dialect.supports.returnValues) {