Skip to content
4 changes: 4 additions & 0 deletions lib/dialects/abstract/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ class ConnectionManager {
_options.logging.__testLoggingFn = true;

return this.sequelize.databaseVersion(_options).then(version => {
if (this.dialectName === 'postgres' && _.isString(version) &&
!version.match(/\.\d+\./)) {
version += '.0';
}
this.sequelize.options.databaseVersion = semver.valid(version) ? version : this.defaultVersion;
this.versionPromise = null;

Expand Down
71 changes: 69 additions & 2 deletions lib/dialects/abstract/query-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,30 @@ class QueryGenerator {
emptyQuery += ' VALUES ()';
}

if (this._dialect.supports.returnValues && options.returning) {
if (this.dialect === 'postgres') {
if (options.onConflict && semver.gte(this.sequelize.options.databaseVersion, '9.5.0')) {
const constraint = options.conflict.constraint;
const target = Array.isArray(options.conflict.target) ? options.conflict.target : [options.conflict.target];
if (constraint) {
valueQuery += ' ON CONFLICT ON CONSTRAINT ' + this.quoteIdentifier(constraint) + options.onConflict;
} else if (target) {
const conflictTarget = target.map(e => this.quoteIdentifier(e)).join(', ');
valueQuery += ' ON CONFLICT (' + conflictTarget + ') ';

const indexPredicate = options.conflict.indexPredicate;
if (indexPredicate) {
valueQuery += 'WHERE ' + this.whereItemsQuery(indexPredicate, options) + ' ';
}

valueQuery += + options.onConflict;
}
valueQuery += ' RETURNING ' + options.returning;
} else {
options.onConflictFallback = true;
}
}

if (this._dialect.supports.returnValues && options.returning && !options.isPgUpsert) {
if (this._dialect.supports.returnValues.returning) {
valueQuery += ' RETURNING *';
emptyQuery += ' RETURNING *';
Expand Down Expand Up @@ -274,11 +297,12 @@ class QueryGenerator {
options = options || {};
fieldMappedAttributes = fieldMappedAttributes || {};

const query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %><%= onDuplicateKeyUpdate %><%= onConflictDoNothing %><%= returning %>;';
const query = 'INSERT<%= ignoreDuplicates %> INTO <%= table %> (<%= attributes %>) VALUES <%= tuples %><%= onDuplicateKeyUpdate %><%= onConflictDoNothing %><%= onConflictKeyUpdate %><%= returning %>;';
const tuples = [];
const serials = {};
const allAttributes = [];
let onDuplicateKeyUpdate = '';
let onConflictKeyUpdate = '';

for (const fieldValueHash of fieldValueHashes) {
_.forOwn(fieldValueHash, (value, key) => {
Expand Down Expand Up @@ -316,12 +340,55 @@ class QueryGenerator {
}).join(',');
}

if (this._dialect.supports.updateOnConflict && options.updateOnConflict) {
const { primaryKeyField, rawAttributes } = options.model;
options.updateOnConflict = _.extend({
constraint: '',
target: options.updateOnConflict.update === false ? null : primaryKeyField,
update: {}
}, options.updateOnConflict || {});
if (options.updateOnConflict.update !== false && !options.updateOnConflict.update.columns) {
options.updateOnConflict.update.columns = allAttributes.filter(value => value !== primaryKeyField);
}

const { constraint, target, update, indexPredicate } = options.updateOnConflict;
if (constraint) {
onConflictKeyUpdate += ' ON CONFLICT ON CONSTRAINT ' + this.quoteIdentifier(constraint);
} else {
onConflictKeyUpdate += ' ON CONFLICT ';
if (target) {
const targetArr = Array.isArray(target) ? target : [target];
const conflictTarget = targetArr.map(e => this.quoteIdentifier(e)).join(', ');
onConflictKeyUpdate += '(' + conflictTarget + ') ';
}
if (indexPredicate) {
onConflictKeyUpdate += ' WHERE ' + this.whereItemsQuery(indexPredicate, options);
}
}

if (update === false) {
onConflictKeyUpdate += ' DO NOTHING ';
} else {
onConflictKeyUpdate += ' DO UPDATE SET ' + update.columns.map(attr => {
const field = rawAttributes[attr] ? rawAttributes[attr].field : attr;
const key = this.quoteIdentifier(field);
return key + ' = EXCLUDED.' + key;
}).join(',');

if (update.where) {
onConflictKeyUpdate += ' WHERE ' + this.getWhereConditions(update.where, tableName, options.model, options);
}

}
}

const replacements = {
ignoreDuplicates: options.ignoreDuplicates ? this._dialect.supports.inserts.ignoreDuplicates : '',
table: this.quoteTable(tableName),
attributes: allAttributes.map(attr => this.quoteIdentifier(attr)).join(','),
tuples: tuples.join(','),
onDuplicateKeyUpdate,
onConflictKeyUpdate,
returning: this._dialect.supports.returnValues && options.returning ? ' RETURNING *' : '',
onConflictDoNothing: options.ignoreDuplicates ? this._dialect.supports.inserts.onConflictDoNothing : ''
};
Expand Down
1 change: 1 addition & 0 deletions lib/dialects/postgres/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ PostgresDialect.prototype.supports = _.merge(_.cloneDeep(AbstractDialect.prototy
JSONB: true,
HSTORE: true,
deferrableConstraints: true,
updateOnConflict: true,
searchPath: true
});

Expand Down
94 changes: 78 additions & 16 deletions lib/dialects/postgres/query-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,22 +347,84 @@ class PostgresQueryGenerator extends AbstractQueryGenerator {
}

upsertQuery(tableName, insertValues, updateValues, where, model, options) {
const primaryField = this.quoteIdentifier(model.primaryKeyField);

const upsertOptions = _.defaults({ bindParam: false }, options);
const insert = this.insertQuery(tableName, insertValues, model.rawAttributes, upsertOptions);
const update = this.updateQuery(tableName, updateValues, where, upsertOptions, model.rawAttributes);

insert.query = insert.query.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);
update.query = update.query.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);

return this.exceptionFn(
'sequelize_upsert',
tableName,
'OUT created boolean, OUT primary_key text',
`${insert.query} created := true;`,
`${update.query}; created := false`
);
const rawAttributes = model.rawAttributes;
const indexes = model.options.indexes;
const uniqueKeys = Object.keys(rawAttributes).filter(e => rawAttributes[e].unique);
if (!uniqueKeys.length) {
uniqueKeys.push(model.primaryKeyField);
}
const uniqueIndexes = indexes.filter(e => e.unique).map(e => e.name);
const uniqueCount = uniqueKeys.length + uniqueIndexes.length;
const fields = Object.keys(model.fieldAttributeMap);
const fieldMap = {};
fields.forEach(field => {
fieldMap[model.fieldAttributeMap[field]] = field;
});

if (uniqueCount > 1 && (!options.conflict || options.conflict && !options.conflict.constraint) ||
semver.lt(this.sequelize.options.databaseVersion, '9.5.0')) {
options.onConflictFallback = true;
const primaryField = this.quoteIdentifier(model.primaryKeyField);

const upsertOptions = _.defaults({ bindParam: false }, options);
const insert = this.insertQuery(tableName, insertValues, model.rawAttributes, upsertOptions);
const update = this.updateQuery(tableName, updateValues, where, upsertOptions, model.rawAttributes);

insert.query = insert.query.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);
update.query = update.query.replace('RETURNING *', `RETURNING ${primaryField} INTO primary_key`);

return this.exceptionFn(
'sequelize_upsert',
tableName,
'OUT created boolean, OUT primary_key text',
`${insert.query} created := true;`,
`${update.query}; created := false`
);
}

const getUpdateArray = (values, target) =>
Object.keys(values).map(key => fieldMap[key] || key).filter(value => target.indexOf(value) === -1);

options.isPgUpsert = true;
if (!options.conflict) {
options.conflict = {
target: uniqueKeys
};
}
if (!options.conflict.update) {
options.conflict.update = {
columns: getUpdateArray(updateValues, uniqueKeys)
};
}

if (options.conflict.update.columns === false) {
options.onConflict = ' DO NOTHING';
} else {
options.conflict.target = options.conflict.target || uniqueKeys;
if (!Array.isArray(options.conflict.target)) {
options.conflict.target = [options.conflict.target];
}
options.conflict.target = options.conflict.target.map(key => fieldMap[key] || key);
if (!options.conflict.update.columns) {
options.conflict.update.columns = getUpdateArray(updateValues, options.conflict.target);
}

if (rawAttributes.updatedAt) {
insertValues.updatedAt = new Date();
if (options.conflict.update.columns.indexOf('updatedAt') === -1) {
options.conflict.update.columns.push('updatedAt');
}
}

options.onConflict = 'DO UPDATE SET ' + options.conflict.update.columns.map(key => {
key = this.quoteIdentifier(key);
return key + ' = EXCLUDED.' + key;
}).join(', ');
}

options.returning = '*, (xmax = 0)::bool AS xmax';

return this.insertQuery(tableName, insertValues, rawAttributes, options);
}

truncateTableQuery(tableName, options = {}) {
Expand Down
8 changes: 7 additions & 1 deletion lib/dialects/postgres/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,13 @@ class Query extends AbstractQuery {
} else if (QueryTypes.BULKDELETE === this.options.type) {
return parseInt(rowCount, 10);
} else if (this.isUpsertQuery()) {
return rows[0];
if (this.options.onConflictFallback) {
return rows[0];
} else {
this.options.fieldMap = this.options.model.fieldAttributeMap;
const items = this.options.plain ? rows : this.handleSelectQuery(rows);
return [items[0], rows[0].xmax];
}
} else if (this.isInsertQuery() || this.isUpdateQuery()) {
if (this.instance && this.instance.dataValues) {
for (const key in rows[0]) {
Expand Down
22 changes: 22 additions & 0 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,9 @@ class Model {
}).then(() => {
return this.QueryInterface.upsert(this.getTableName(options), insertValues, updateValues, instance.where(), this, options);
}).spread((created, primaryKey) => {
if (primaryKey === true || primaryKey === false) {
return options.returning === true ? [created, primaryKey] : primaryKey;
}
if (options.returning === true && primaryKey) {
return this.findById(primaryKey, options).then(record => [record, created]);
}
Expand Down Expand Up @@ -2512,6 +2515,9 @@ class Model {
if (options.updateOnDuplicate && dialect !== 'mysql') {
return Promise.reject(new Error(`${dialect} does not support the updateOnDuplicate option.`));
}
if (options.updateOnConflict && dialect !== 'postgres') {
return Promise.reject(new Error(`${dialect} does not support the updateOnConflict option.`));
}

if (options.updateOnDuplicate !== undefined) {
if (_.isArray(options.updateOnDuplicate) && options.updateOnDuplicate.length) {
Expand All @@ -2524,6 +2530,22 @@ class Model {
}
}

if (options.updateOnConflict !== undefined && options.updateOnConflict.update) {
const where = options.updateOnConflict.update.where;
const columns = options.updateOnConflict.update.columns;
if (columns && _.isArray(columns) && columns.length) {
options.updateOnConflict.update.columns = _.intersection(
_.without(Object.keys(this.tableAttributes), this._timestampAttributes.createdAt),
columns
);
} else {
return Promise.reject(new Error('updateOnConflict.update.columns option only supports non-empty array.'));
}
if (where) {
options.updateOnConflict.update.where = Utils.mapWhereFieldNames(where, this);
}
}

options.model = this;

const createdAtAttr = this._timestampAttributes.createdAt;
Expand Down
4 changes: 2 additions & 2 deletions lib/query-interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -895,13 +895,13 @@ class QueryInterface {
where = { [Op.or]: wheres };

options.type = QueryTypes.UPSERT;
options.raw = true;
options.raw = options.isPgUpsert || false;

const sql = this.QueryGenerator.upsertQuery(tableName, insertValues, updateValues, where, model, options);
return this.sequelize.query(sql, options).then(result => {
switch (this.sequelize.options.dialect) {
case 'postgres':
return [result.created, result.primary_key];
return options.isPgUpsert ? result : [result.created, result.primary_key];

case 'mssql':
return [
Expand Down
Loading