Skip to content

Commit bcd6770

Browse files
committed
Handle multiple transactions in parallel
Instead of assuming one global transaction ID, track knex transaction IDs with Aurora Data API transaction IDs so multiple transactions can occur in parallel.
1 parent 9cc6d52 commit bcd6770

File tree

4 files changed

+110
-17
lines changed

4 files changed

+110
-17
lines changed

index.js

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ class Client_AuroraDataMySQL extends Client_MySQL { // eslint-disable-line camel
103103
promise: Promise.resolve({
104104
client: this.driver,
105105
parameters,
106+
transactions: {},
106107
__knexUid: this.knexUid++
107108
})
108109
}),
@@ -207,13 +208,19 @@ class Client_AuroraDataMySQL extends Client_MySQL { // eslint-disable-line camel
207208
}
208209

209210
async _query (connection, obj) {
211+
const params = {
212+
...connection.parameters,
213+
includeResultMetadata: true,
214+
sql: obj.sql,
215+
parameters: obj.bindings
216+
};
217+
218+
if ('__knexTxId' in connection) {
219+
params.transactionId = connection.transactions[connection.__knexTxId];
220+
}
221+
210222
obj.data = await connection.client
211-
.executeStatement({
212-
...connection.parameters,
213-
includeResultMetadata: true,
214-
sql: obj.sql,
215-
parameters: obj.bindings
216-
})
223+
.executeStatement(params)
217224
.promise();
218225

219226
return obj;

tests/constants.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2178,5 +2178,9 @@ module.exports = {
21782178
},
21792179
ROLLBACK_TRANSACTION_DATA: {
21802180
transactionStatus: 'Rollback Complete'
2181+
},
2182+
2183+
BEGIN_TRANSACTION_DATA_2: {
2184+
transactionId: 'lksdjfojf_2'
21812185
}
21822186
};

tests/test.js

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,4 +627,80 @@ describe('Query statement tests', () => {
627627
expect(mockExecuteStatement).toHaveBeenCalledTimes(0);
628628
expect(mockCommitTransaction).toHaveBeenCalledTimes(0);
629629
});
630+
631+
test('Two transactions in parallel', async () => {
632+
mockBeginTransactionPromise
633+
.mockResolvedValueOnce(constants.BEGIN_TRANSACTION_DATA)
634+
.mockResolvedValueOnce(constants.BEGIN_TRANSACTION_DATA_2);
635+
mockExecuteStatementPromise.mockResolvedValue(constants.ALL_QUERY_RESPONSE_DATA);
636+
mockCommitTransactionPromise.mockResolvedValue(constants.COMMIT_TRANSACTION_DATA);
637+
638+
let firstTrxInProgress = false;
639+
640+
const [rows, rows2] = await Promise.all([
641+
knex.transaction(async trx => {
642+
firstTrxInProgress = true;
643+
await new Promise(resolve => setTimeout(resolve, 100));
644+
const rows = trx.select('*').from('foo');
645+
firstTrxInProgress = false;
646+
return rows;
647+
}),
648+
knex.transaction(async trx => {
649+
if (!firstTrxInProgress) {
650+
throw new Error('First transaction not concurrently in progress');
651+
}
652+
await new Promise(resolve => setTimeout(resolve, 100));
653+
return trx.select('*').from('foo');
654+
})
655+
]);
656+
657+
expect(mockBeginTransaction).toHaveBeenCalledTimes(2);
658+
expect(mockBeginTransaction).toHaveBeenNthCalledWith(1, {
659+
resourceArn: constants.AURORA_CLUSTER_ARN,
660+
secretArn: constants.SECRET_ARN,
661+
database: constants.DATABASE
662+
});
663+
expect(mockBeginTransaction).toHaveBeenNthCalledWith(2, {
664+
resourceArn: constants.AURORA_CLUSTER_ARN,
665+
secretArn: constants.SECRET_ARN,
666+
database: constants.DATABASE
667+
});
668+
669+
expect(mockExecuteStatement).toHaveBeenCalledTimes(2);
670+
expect(mockExecuteStatement).toHaveBeenNthCalledWith(1, {
671+
resourceArn: constants.AURORA_CLUSTER_ARN,
672+
secretArn: constants.SECRET_ARN,
673+
database: constants.DATABASE,
674+
transactionId: constants.BEGIN_TRANSACTION_DATA.transactionId,
675+
sql: 'select * from `foo`',
676+
parameters: [],
677+
includeResultMetadata: true
678+
});
679+
expect(mockExecuteStatement).toHaveBeenNthCalledWith(2, {
680+
resourceArn: constants.AURORA_CLUSTER_ARN,
681+
secretArn: constants.SECRET_ARN,
682+
database: constants.DATABASE,
683+
transactionId: constants.BEGIN_TRANSACTION_DATA_2.transactionId,
684+
sql: 'select * from `foo`',
685+
parameters: [],
686+
includeResultMetadata: true
687+
});
688+
689+
expect(mockCommitTransaction).toHaveBeenCalledTimes(2);
690+
expect(mockCommitTransaction).toHaveBeenNthCalledWith(1, {
691+
resourceArn: constants.AURORA_CLUSTER_ARN,
692+
secretArn: constants.SECRET_ARN,
693+
transactionId: constants.BEGIN_TRANSACTION_DATA.transactionId
694+
});
695+
expect(mockCommitTransaction).toHaveBeenNthCalledWith(2, {
696+
resourceArn: constants.AURORA_CLUSTER_ARN,
697+
secretArn: constants.SECRET_ARN,
698+
transactionId: constants.BEGIN_TRANSACTION_DATA_2.transactionId
699+
});
700+
701+
expect(mockRollbackTransaction).toHaveBeenCalledTimes(0);
702+
703+
expect(rows).toEqual(constants.ALL_QUERY_RESPONSE_ROWS);
704+
expect(rows2).toEqual(constants.ALL_QUERY_RESPONSE_ROWS);
705+
});
630706
});

transaction.js

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ class Transaction_AuroraDataMySQL extends Transaction { // eslint-disable-line c
1414

1515
async begin (conn) {
1616
/* istanbul ignore next */
17-
if (conn.parameters.transactionId) {
17+
if (conn.__knexTxId in conn.transactions) {
1818
throw new Error(
19-
`Attempted to begin a new transaction for connection with existing transaction ${conn.parameters.transactionId}`
19+
`Attempted to begin a new transaction for connection ${conn.__knexUid} transaction ${conn.__knexTxId} with existing Aurora Data API transaction ID ${conn.transactions[conn.__knexTxId]}`
2020
);
2121
}
2222

@@ -25,49 +25,55 @@ class Transaction_AuroraDataMySQL extends Transaction { // eslint-disable-line c
2525
.promise();
2626
debug(`Transaction begun with id ${transactionId}`);
2727

28-
conn.parameters.transactionId = transactionId;
28+
conn.transactions[conn.__knexTxId] = transactionId;
2929
}
3030

3131
async commit (conn, value) {
3232
// When a transaction is explicitly rolled back this method is still called
3333
// at the end of the transaction block after the transaction no longer
3434
// exists.
35-
if ('transactionId' in conn.parameters) {
36-
const params = { ...conn.parameters };
35+
if (conn.__knexTxId in conn.transactions) {
36+
const params = {
37+
...conn.parameters,
38+
transactionId: conn.transactions[conn.__knexTxId]
39+
};
3740
delete params.database;
3841

3942
const { transactionStatus } = await conn.client
4043
.commitTransaction(params)
4144
.promise();
4245
debug(
43-
`Transaction ${conn.parameters.transactionId} commit status: ${transactionStatus}`
46+
`Transaction ${conn.transactions[conn.__knexTxId]} commit status: ${transactionStatus}`
4447
);
4548

46-
delete conn.parameters.transactionId;
49+
delete conn.transactions[conn.__knexTxId];
4750
}
4851

4952
this._resolver(value);
5053
}
5154

5255
async rollback (conn, error) {
5356
/* istanbul ignore next */
54-
if (!('transactionId' in conn.parameters)) {
57+
if (!(conn.__knexTxId in conn.transactions)) {
5558
throw new Error(
5659
'Attempted to rollback a transaction when one is not in progress'
5760
);
5861
}
5962

60-
const params = { ...conn.parameters };
63+
const params = {
64+
...conn.parameters,
65+
transactionId: conn.transactions[conn.__knexTxId]
66+
};
6167
delete params.database;
6268

6369
const { transactionStatus } = await conn.client
6470
.rollbackTransaction(params)
6571
.promise();
6672
debug(
67-
`Transaction ${conn.parameters.transactionId} rollback status: ${transactionStatus}`
73+
`Transaction ${conn.transactions[conn.__knexTxId]} rollback status: ${transactionStatus}`
6874
);
6975

70-
delete conn.parameters.transactionId;
76+
delete conn.transactions[conn.__knexTxId];
7177

7278
if (error === undefined) {
7379
if (this.doNotRejectOnRollback) {

0 commit comments

Comments
 (0)