diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java index 0155ad553..58ad192cd 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java @@ -1924,7 +1924,7 @@ protected final int entryCountMod() { /** * Returns true if this entry may have been created by this transaction. */ - protected final boolean wasCreatedByTX() { + public final boolean wasCreatedByTX() { return Token.isRemoved(this.originalVersionId); } diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/GfxdTXStateProxy.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/GfxdTXStateProxy.java index 109e87aed..673dd9285 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/GfxdTXStateProxy.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/GfxdTXStateProxy.java @@ -67,7 +67,7 @@ /** * Extends GFE {@link TXStateProxy} to add GFXD specific artifacts including * DBSynchronizer message list to be sent out at commit time. - * + * * @author swale * @since 7.0 */ @@ -433,7 +433,7 @@ final void cleanupIndexEntryForDestroy(Object container, false /* isPutDML */); if (GemFireXDUtils.TraceIndex | GemFireXDUtils.TraceQuery) { - GfxdIndexManager.traceIndex("SortedMap2Index cleanup: " + GfxdIndexManager.traceIndex("GfxdTXStateProxy cleanupIndexEntryForDestroy: " + "rolled back key=%s to value=(%s) in %s", indexKey, GemFireXDUtils.TraceIndex ? oldRowLocation : ArrayUtils .objectRefString(oldRowLocation), indexContainer); @@ -476,7 +476,7 @@ public final boolean execute(final Object k1, final Object k2, } }); } - + if (unaffectedIndexInfo != null) { unaffectedIndexInfo.forEachEntry(new TObjectObjectObjectProcedure() { @Override @@ -663,12 +663,12 @@ private void updateIndexAtCommitAbortNoThrow(GfxdTXEntryState sqle, throws StandardException { boolean deleted; Object valueBytesBeingReplaced = sqle.getPendingValue(); - - //TODO:Asif: This will cause a memory leak if an existing entry is + + //TODO:Asif: This will cause a memory leak if an existing entry is //updated in txn, such a new index key is introduced, and then again - // a txn update happens such that byte [] changes . Then + // a txn update happens such that byte [] changes . Then // the firts indexkey's byte[] will not match the pending value - + try { if (rollback) { if (!indexContainer.isGlobalIndex()) { diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/ContainsUniqueKeyExecutorMessage.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/ContainsUniqueKeyExecutorMessage.java index f956cdb62..c606996f2 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/ContainsUniqueKeyExecutorMessage.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/ContainsUniqueKeyExecutorMessage.java @@ -143,7 +143,7 @@ public static boolean existsKey(TXStateInterface tx, LocalRegion lr, if (rl != null && (rlTXId = rl.getTXId()) != null) { rl = SortedMap2IndexScanController.AbstractRowLocationIterator .isRowLocationValidForTransaction(rl, rlTXId, tx, - GfxdConstants.SCAN_OPENMODE_FOR_READONLY_LOCK); + GfxdConstants.SCAN_OPENMODE_FOR_READONLY_LOCK, indexContainer.isUniqueIndex()); } final Object ckey = fetch.getCurrentKey(); final CompactCompositeIndexKey mapKey = ckey != null diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/MemIndexScanController.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/MemIndexScanController.java index 3d989aef8..7470b5fa3 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/MemIndexScanController.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/MemIndexScanController.java @@ -170,12 +170,11 @@ public final void init(GemFireTransaction tran, MemConglomerate conglomerate, if (GemFireXDUtils.TraceIndex) { GfxdIndexManager.traceIndex("Opening MemIndexScanController for index " + "container=%s, in mode=0x%s with scanColumnList=%s startKey=%s " - + "startOp=%s stopKey=%s stopOp=%s qualifier=%s tx=%s", + + "startOp=%s stopKey=%s stopOp=%s qualifier=%s", conglomerate.getGemFireContainer(), Integer.toHexString(openMode), scanColumnList, ArrayUtils.objectString(startKeyValue), startSearchOperator, ArrayUtils.objectString(stopKeyValue), - stopSearchOperator, ArrayUtils.objectString(qualifier), - tran.getActiveTXState()); + stopSearchOperator, ArrayUtils.objectString(qualifier)); } this.openMode = openMode; diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/SortedMap2IndexScanController.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/SortedMap2IndexScanController.java index c02ab9b21..fa96a3652 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/SortedMap2IndexScanController.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/index/SortedMap2IndexScanController.java @@ -863,7 +863,7 @@ static abstract class AbstractRowLocationIterator { public static final RowLocation isRowLocationValidForTransaction( final RowLocation rl, final TXId rlTXId, final TXStateInterface tx, - final int openMode) { + final int openMode, final boolean isUniqIndex) { RowLocation ret = null; final boolean sameTransaction = tx != null && rlTXId.equals(tx.getTransactionId()); @@ -904,6 +904,10 @@ public static final RowLocation isRowLocationValidForTransaction( } if (sameTransaction) { ret = rl; + // check if it is a case of unique index and then see if the underlying region entry + // should be returned or not. + } else if (isUniqIndex) { + ret = (RowLocation) ((GfxdTXEntryState)rl).getCommittedEntry(); } } return ret; @@ -912,7 +916,7 @@ public static final RowLocation isRowLocationValidForTransaction( public static final RowLocation isRowLocationValid(RowLocation rowloc, final LanguageConnectionContext lcc, final TXStateInterface tx, final GemFireContainer container, final boolean[] localBucketSet, - final int openMode) { + final int openMode, final OpenMemIndex openConglom) { final TXId rlTXId; if (GemFireXDUtils.TraceIndex) { @@ -927,7 +931,7 @@ public static final RowLocation isRowLocationValid(RowLocation rowloc, // below will tell whether RowLocation has to be skipped for this // transaction and also unwrap a TX deleted row if required rowloc = isRowLocationValidForTransaction(rowloc, rlTXId, tx, - openMode); + openMode, openConglom.isUnique()); if (rowloc == null) { if (GfxdTXStateProxy.LOG_FINEST | GemFireXDUtils.TraceIndex) { SanityManager.DEBUG_PRINT(GfxdConstants.TRACE_INDEX, @@ -1012,7 +1016,7 @@ public RowLocation next() throws StandardException { for (;;) { if (this.index < this.rlArray.length) { RowLocation rl = isRowLocationValid(this.rlArray[this.index], lcc, - txState, this.container, this.localBucketSet, openMode); + txState, this.container, this.localBucketSet, openMode, openConglom); this.index++; if (rl != null) { return rl; @@ -1063,7 +1067,7 @@ public RowLocation next() throws StandardException { while (itrPos < itrLen) { RowLocation rl = (RowLocation)itrCache[itrPos++]; rl = isRowLocationValid(rl, lcc, txState, this.container, - this.localBucketSet, openMode); + this.localBucketSet, openMode, openConglom); if (rl != null) { return rl; } @@ -1075,7 +1079,7 @@ public RowLocation next() throws StandardException { else if ((itr = this.itr) != null) { while (itr.hasNext()) { RowLocation rl = isRowLocationValid((RowLocation)itr.next(), lcc, - txState, this.container, this.localBucketSet, openMode); + txState, this.container, this.localBucketSet, openMode, openConglom); if (rl != null) { return rl; } @@ -1265,7 +1269,7 @@ else if (valueCls == ConcurrentTHashSet.class) { this.rlIterator = null; if ((rl = AbstractRowLocationIterator.isRowLocationValid( (RowLocation)nextValue, lcc, this.txState, this.baseContainer, - this.localBucketSet, this.openMode)) != null) { + this.localBucketSet, this.openMode, openConglom)) != null) { //this.movedToNextIndexKey = true; ++this.scanKeyGroupID; return rl; @@ -1343,7 +1347,7 @@ private final RowLocation scanHashSet(final ConcurrentTHashSet set) if (set.contains(this.initRowLocation)) { return AbstractRowLocationIterator.isRowLocationValid( this.initRowLocation, lcc, this.txState, this.baseContainer, - this.localBucketSet, this.openMode); + this.localBucketSet, this.openMode, openConglom); } return null; } diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/operations/SortedMap2IndexInsertOperation.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/operations/SortedMap2IndexInsertOperation.java index c37975d3c..86b41be2f 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/operations/SortedMap2IndexInsertOperation.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/access/operations/SortedMap2IndexInsertOperation.java @@ -27,13 +27,7 @@ import com.gemstone.gemfire.cache.ConflictException; import com.gemstone.gemfire.cache.query.IndexMaintenanceException; import com.gemstone.gemfire.i18n.LogWriterI18n; -import com.gemstone.gemfire.internal.cache.AbstractRegionEntry; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.ObjectEqualsHashingStrategy; -import com.gemstone.gemfire.internal.cache.OffHeapRegionEntry; -import com.gemstone.gemfire.internal.cache.TXEntryState; -import com.gemstone.gemfire.internal.cache.TXId; -import com.gemstone.gemfire.internal.cache.TXStateInterface; +import com.gemstone.gemfire.internal.cache.*; import com.gemstone.gemfire.internal.cache.locks.ExclusiveSharedSynchronizer; import com.gemstone.gemfire.internal.cache.locks.LockMode; import com.gemstone.gemfire.internal.cache.locks.LockingPolicy; @@ -197,80 +191,100 @@ else if (oldValue != null) { final TXId txId, rlTXId; final RowLocation rl; if (tx != null && oldValue instanceof RowLocation - && (rlTXId = (rl = (RowLocation)oldValue).getTXId()) != null - && !rlTXId.equals(txId = tx.getTransactionId())) { - final LockingPolicy lockPolicy = tx.getLockingPolicy(); - if (lockTimeout < 0) { // not set - final AbstractRegionEntry re = (AbstractRegionEntry)rl - .getUnderlyingRegionEntry(); - final LockMode currentMode; - if (re == null || (currentMode = ExclusiveSharedSynchronizer - .getLockModeFromState(re.getState())) == null) { - throw GemFireXDUtils.newDuplicateKeyViolation("unique constraint", - container.getQualifiedTableName(), "key=" + key.toString() - + ", row=" + value, rl, null, null); - } - lockTimeout = lockPolicy.getTimeout(rl, - lockPolicy.getWriteLockMode(), currentMode, 0, 0); - waitThreshold = re.getWaitThreshold() * 1000; - if (lockTimeout < 0) { - lockTimeout = ExclusiveSharedSynchronizer.getMaxMillis(); - } - continue; - } - else if (lockTimeout > 0) { // wait before throwing conflict - final long sleepTime = lockTimeout < 5 ? lockTimeout : 5; - final long start = System.nanoTime(); - final GemFireCacheImpl cache = Misc.getGemFireCache(); - try { - Thread.sleep(sleepTime); - long elapsed = (System.nanoTime() - start + 500000) / 1000000; - if (elapsed <= 0) { - elapsed = 1; + && (rlTXId = (rl = (RowLocation) oldValue).getTXId()) != null) { + if (!rlTXId.equals(txId = tx.getTransactionId())) { + final LockingPolicy lockPolicy = tx.getLockingPolicy(); + if (lockTimeout < 0) { // not set + final AbstractRegionEntry re = (AbstractRegionEntry) rl + .getUnderlyingRegionEntry(); + final LockMode currentMode; + if (re == null || (currentMode = ExclusiveSharedSynchronizer + .getLockModeFromState(re.getState())) == null) { + throw GemFireXDUtils.newDuplicateKeyViolation("unique constraint", + container.getQualifiedTableName(), "key=" + key.toString() + + ", row=" + value, rl, null, null); } - final long origLockTimeout = lockTimeout; - if (lockTimeout < elapsed) { - lockTimeout = 0; + lockTimeout = lockPolicy.getTimeout(rl, + lockPolicy.getWriteLockMode(), currentMode, 0, 0); + waitThreshold = re.getWaitThreshold() * 1000; + if (lockTimeout < 0) { + lockTimeout = ExclusiveSharedSynchronizer.getMaxMillis(); } - else { - lockTimeout -= elapsed; - } - if (waitThreshold > 0) { - // check if there is a factor of waitThreshold between - // origLockTimeout and lockTimeout - final long origQuotient = origLockTimeout / waitThreshold; - final long quotient = lockTimeout / waitThreshold; - if (origQuotient > quotient - || (origLockTimeout % waitThreshold) == 0) { - final LogWriterI18n logger = cache.getLoggerI18n(); - if (logger.warningEnabled()) { - logger.warning(LocalizedStrings.LocalLock_Waiting, - new Object[] { "SortedMap2IndexInsertOperation", - Double.toString(waitThreshold / 1000.0), - lockPolicy.getWriteLockMode().toString(), - "index key with oldValue=" + ArrayUtils - .objectStringNonRecursive(oldValue) + ", owner=" - + rlTXId, ArrayUtils.objectString(key), - lockTimeout }); + continue; + } else if (lockTimeout > 0) { // wait before throwing conflict + final long sleepTime = lockTimeout < 5 ? lockTimeout : 5; + final long start = System.nanoTime(); + final GemFireCacheImpl cache = Misc.getGemFireCache(); + try { + Thread.sleep(sleepTime); + long elapsed = (System.nanoTime() - start + 500000) / 1000000; + if (elapsed <= 0) { + elapsed = 1; + } + final long origLockTimeout = lockTimeout; + if (lockTimeout < elapsed) { + lockTimeout = 0; + } else { + lockTimeout -= elapsed; + } + if (waitThreshold > 0) { + // check if there is a factor of waitThreshold between + // origLockTimeout and lockTimeout + final long origQuotient = origLockTimeout / waitThreshold; + final long quotient = lockTimeout / waitThreshold; + if (origQuotient > quotient + || (origLockTimeout % waitThreshold) == 0) { + final LogWriterI18n logger = cache.getLoggerI18n(); + if (logger.warningEnabled()) { + logger.warning(LocalizedStrings.LocalLock_Waiting, + new Object[]{"SortedMap2IndexInsertOperation", + Double.toString(waitThreshold / 1000.0), + lockPolicy.getWriteLockMode().toString(), + "index key with oldValue=" + ArrayUtils + .objectStringNonRecursive(oldValue) + ", owner=" + + rlTXId, ArrayUtils.objectString(key), + lockTimeout}); + } + // increase waitThreshold by a factor of 2 for next iteration + waitThreshold <<= 1; } - // increase waitThreshold by a factor of 2 for next iteration - waitThreshold <<= 1; } + } catch (InterruptedException ie) { + cache.getCancelCriterion().checkCancelInProgress(ie); + } + continue; + } + final ConflictException ce = new ConflictException( + LocalizedStrings.TX_CONFLICT_ON_OBJECT.toLocalizedString("index=" + + container + "; indexKey=" + ArrayUtils.objectString(key) + + "; having oldValue=" + ArrayUtils.objectStringNonRecursive( + oldValue) + "; owner TX=" + rlTXId + + "; requested for TX=" + txId, + lockPolicy.getWriteLockMode().toString())); + throw StandardException.newException( + SQLState.GFXD_OPERATION_CONFLICT, ce, ce.getMessage()); + } else { + // This else part is mainly added to handle SNAP-2620 + if (oldValue instanceof WrapperRowLocationForTxn) { + GfxdTXEntryState gfxdtxentry = (GfxdTXEntryState)value; + gfxdtxentry.setCommittedRegionEntry( + ((WrapperRowLocationForTxn)oldValue).getUnderlyingRegionEntry()); + if (skipListMap.replace(key, oldValue, value)) { + // remove txentry, wrapper pair for reinstated map + // which is with TXRegionState + TXRegionState txrs = tx.readRegion((gfxdtxentry).getDataRegion()); + if (txrs.getToBeReinstatedIndexMap() != null) { + Object wrappedRL = ((WrapperRowLocationForTxn)oldValue).getWrappedRowLocation(); + txrs.getToBeReinstatedIndexMap().removeKeyPair(wrappedRL, container); + gfxdtxentry.updateIndexInfos(container, key); + } + break; + } + else { + continue; } - } catch (InterruptedException ie) { - cache.getCancelCriterion().checkCancelInProgress(ie); } - continue; } - final ConflictException ce = new ConflictException( - LocalizedStrings.TX_CONFLICT_ON_OBJECT.toLocalizedString("index=" - + container + "; indexKey=" + ArrayUtils.objectString(key) - + "; having oldValue=" + ArrayUtils.objectStringNonRecursive( - oldValue) + "; owner TX=" + rlTXId - + "; requested for TX=" + txId, - lockPolicy.getWriteLockMode().toString())); - throw StandardException.newException( - SQLState.GFXD_OPERATION_CONFLICT, ce, ce.getMessage()); } if (wrapperToReplaceUniqEntry != null) { // if existing value is already a wrapper then no need to replace; diff --git a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/entry/GfxdTXEntryState.java b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/entry/GfxdTXEntryState.java index 4683019bf..828af077e 100644 --- a/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/entry/GfxdTXEntryState.java +++ b/gemfirexd/core/src/main/java/com/pivotal/gemfirexd/internal/engine/store/entry/GfxdTXEntryState.java @@ -701,13 +701,13 @@ public WrapperRowLocationForTxn wrapperForRollback( final boolean isOpDestroy = isOpDestroy(); // in case of op = destroy or op = update/put of an existing entry // we need to reinstate the old index during rollback - if (!isOpPut && !isOpDestroy) { - SanityManager.DEBUG_PRINT(GfxdConstants.TRACE_TRAN, "ERROR: Unexpected op = " + - this.op + ", opToString returns: " + this.opToString() + " for " + toString() + - ", event: " + event); - throw new IllegalTransactionStateException("TXEntryState.wrapperForRollback: unexpected " + - "operation " + opToString() + " for event: " + event + ", on entry: " + toString()); - } +// if (!isOpPut && !isOpDestroy) { +// SanityManager.DEBUG_PRINT(GfxdConstants.TRACE_TRAN, "ERROR: Unexpected op = " + +// this.op + ", opToString returns: " + this.opToString() + " for " + toString() + +// ", event: " + event); +// throw new IllegalTransactionStateException("TXEntryState.wrapperForRollback: unexpected " + +// "operation " + opToString() + " for event: " + event + ", on entry: " + toString()); +// } final TXRegionState txrs = this.txRegionState; txrs.lock(); @@ -1715,4 +1715,14 @@ public void endIndexKeyUpdate() { public Version[] getSerializationVersions() { return null; } + + private volatile RegionEntry committedRegionEntry = null; + + public void setCommittedRegionEntry(RegionEntry committedRegionEntry) { + this.committedRegionEntry = committedRegionEntry; + } + + public RegionEntry getCommittedEntry() { + return this.committedRegionEntry; + } } diff --git a/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionDUnit.java b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionDUnit.java index 31937cea3..9c5d4654a 100644 --- a/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionDUnit.java +++ b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionDUnit.java @@ -52,10 +52,12 @@ import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverAdapter; import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverHolder; import com.pivotal.gemfirexd.internal.engine.Misc; +import com.pivotal.gemfirexd.internal.engine.access.index.GfxdIndexManager; import com.pivotal.gemfirexd.internal.engine.distributed.GfxdConnectionHolder; import com.pivotal.gemfirexd.internal.engine.distributed.GfxdConnectionWrapper; import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils; import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer; +import com.pivotal.gemfirexd.internal.engine.store.ServerGroupUtils; import com.pivotal.gemfirexd.internal.iapi.error.StandardException; import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection; import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedPreparedStatement; diff --git a/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionUniqIndexesMixDUnit.java b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionUniqIndexesMixDUnit.java new file mode 100644 index 000000000..2e630e93d --- /dev/null +++ b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionUniqIndexesMixDUnit.java @@ -0,0 +1,546 @@ +package com.pivotal.gemfirexd.transactions; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.internal.cache.DistributedRegion; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.TXManagerImpl; +import com.gemstone.gemfire.internal.cache.TXStateProxy; +import com.pivotal.gemfirexd.DistributedSQLTestBase; +import com.pivotal.gemfirexd.TestUtil; +import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverHolder; +import com.pivotal.gemfirexd.internal.engine.Misc; +import com.pivotal.gemfirexd.internal.engine.access.index.GfxdIndexManager; +import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils; +import com.pivotal.gemfirexd.internal.engine.store.ServerGroupUtils; +import io.snappydata.test.dunit.SerializableRunnable; +import io.snappydata.test.dunit.VM; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +public class TransactionUniqIndexesMixDUnit extends DistributedSQLTestBase { + public TransactionUniqIndexesMixDUnit(String name) { + super(name); + } + + protected String reduceLogging() { + return "config"; + } + + protected int getIsolationLevel() { + return Connection.TRANSACTION_READ_COMMITTED; + } + + private void createDDLs() throws Exception { + java.sql.Connection conn = TestUtil.jdbcConn; + conn.setAutoCommit(false); + Statement st = conn.createStatement(); + st.execute("Create table app.t1 (c1 int not null , c2 int not null," + + " c3 int not null , c4 int not null, " + + "primary key(c4)) replicate"); + st.execute("create index localindex on app.t1(c3)"); + st.execute("create unique index uniqindex on app.t1(c1)"); + } + // In all test create a normal index and a uniq index + // Test Case 1: + // Insert a row + // + // + public void testTransactionalInsertDeleteOnReplicatedTable_1() throws Exception { + createDDLs(); + java.sql.Connection conn = TestUtil.jdbcConn; + conn.commit(); + conn.setTransactionIsolation(getIsolationLevel()); + ResultSet rs = null; + Statement st = conn.createStatement(); + + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + conn.commit(); + + // craete another connection + Connection conn2 = TestUtil.getConnection(); + conn2.setTransactionIsolation(getIsolationLevel()); + Statement st2 = conn2.createStatement(); + // st.execute("insert into app.t1 values (10, 10, 10, 10)"); + st.execute("delete from app.t1 where c4 = 10"); + // Insert on another connection should fail + try { + st2.execute("insert into app.t1 values (10, 10, 10, 10)"); + fail("above should have failed with unique key violation exception"); + } catch (SQLException sqle) { + System.out.println("KN: sqlstate = " + sqle.getSQLState()); + assertTrue("23505".equalsIgnoreCase(sqle.getSQLState()) || + "X0Z02".equalsIgnoreCase(sqle.getSQLState())); + } + // Since an insert is being done after a delete on transaction one + // this should pass + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + + conn.commit(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER FINAL COMMIT"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + conn.close(); + } + + public void testTransactionalInsertDeleteOnReplicatedTable() throws Exception { + java.sql.Connection conn = TestUtil.jdbcConn; + conn.setAutoCommit(false); + Statement st = conn.createStatement(); + st.execute("Create table app.t1 (c1 int not null , c2 int not null," + + " c3 int not null , c4 int not null, " + + "primary key(c4)) replicate"); + st.execute("create index localindex on app.t1(c3)"); + st.execute("create unique index uniqindex on app.t1(c1)"); + conn.commit(); + conn.setTransactionIsolation(getIsolationLevel()); + ResultSet rs = null; + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + st.execute("insert into app.t1 values (20, 20, 20, 20)"); + conn.commit(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER FIRST TWO INSERT"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + st.execute("select * from app.t1"); + st.execute("delete from app.t1 where c4 = 10"); + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + // do few updates again + st.execute("update app.t1 set c3=50 where c4=10"); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: BEFORE UPDATE -- ROLLBACK"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + st.execute("update app.t1 set c1=50 where c4=10"); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER UPDATE -- ROLLBACK"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: JUST BEFORE ROLLBACK"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + conn.rollback(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER FIRST ROLLBACK"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + st.execute("delete from app.t1 where c4 = 10"); + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + conn.commit(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER FINAL COMMIT"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + rs = st.executeQuery("select count(*) from app.t1"); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 2); + assertFalse(rs.next()); + conn.commit(); + conn.close(); + } + + public void testTransactionalScanWhenUpdateOnNonUniqueIndexedColumn() throws Exception { + java.sql.Connection conn = TestUtil.jdbcConn; + conn.setAutoCommit(false); + Statement st = conn.createStatement(); + st.execute("Create table app.t1 (c1 int not null , c2 int not null," + + " c3 int not null , c4 int not null, " + + "primary key(c4)) replicate"); + st.execute("create index localindex on app.t1(c3)"); + st.execute("create unique index uniqindex on app.t1(c1)"); + conn.commit(); + conn.setTransactionIsolation(getIsolationLevel()); + ResultSet rs = null; + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + st.execute("insert into app.t1 values (20, 20, 20, 20)"); + conn.commit(); + st.execute("update app.t1 set c3=50 where c4=10"); + st.execute("select c3 from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 50); + conn.commit(); + st.execute("select c3 from app.t1 where c1 = 10"); + rs = st.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 50); + conn.commit(); + conn.close(); + } + + public void testOtherTxnalScanWhenDelAndInsertOnUniqueIndexedColumnByOtherTxn() throws Exception { + java.sql.Connection conn = TestUtil.jdbcConn; + conn.setAutoCommit(false); + Statement st = conn.createStatement(); + st.execute("Create table app.t1 (c1 int not null , c2 int not null," + + " c3 int not null , c4 int not null, " + + "primary key(c4)) replicate"); + st.execute("create index localindex on app.t1(c3)"); + st.execute("create unique index uniqindex on app.t1(c1)"); + conn.commit(); + conn.setTransactionIsolation(getIsolationLevel()); + ResultSet rs = null; + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + st.execute("insert into app.t1 values (20, 20, 20, 20)"); + conn.commit(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER FIRST TWO INSERT"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + st.execute("delete from app.t1 where c4=10"); + st.execute("insert into app.t1 values (10, 70, 70, 70)"); + st.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 10); + assertEquals(rs.getInt(2), 70); + assertEquals(rs.getInt(3), 70); + assertEquals(rs.getInt(4), 70); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: DELETE INSERT BEFORE COMMIT TX 1"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + // Take another connection and fire the same query ... that should see the + // committed value + Connection conn2 = TestUtil.getConnection(); + conn2.setTransactionIsolation(getIsolationLevel()); + Statement st2 = conn2.createStatement(); + st2.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st2.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 10); + assertEquals(rs.getInt(2), 10); + assertEquals(rs.getInt(3), 10); + assertEquals(rs.getInt(4), 10); + conn2.commit(); + conn.commit(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER COMMIT TX 1"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + // Now both the connection should see same committed row corresponding to 70 + st.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 10); + assertEquals(rs.getInt(2), 70); + assertEquals(rs.getInt(3), 70); + assertEquals(rs.getInt(4), 70); + conn.commit(); + st2.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st2.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 10); + assertEquals(rs.getInt(2), 70); + assertEquals(rs.getInt(3), 70); + assertEquals(rs.getInt(4), 70); + conn2.commit(); + conn.close(); + conn2.close(); + } + + public void testOtherTxnalScanWhenDelAndInsertOnUpdateOfUniqueIndexedColumn() throws Exception { + java.sql.Connection conn = TestUtil.jdbcConn; + conn.setAutoCommit(false); + Statement st = conn.createStatement(); + st.execute("Create table app.t1 (c1 int not null , c2 int not null," + + " c3 int not null , c4 int not null, " + + "primary key(c4)) replicate"); + st.execute("create index localindex on app.t1(c3)"); + st.execute("create unique index uniqindex on app.t1(c1)"); + conn.commit(); + conn.setTransactionIsolation(getIsolationLevel()); + ResultSet rs = null; + st.execute("insert into app.t1 values (10, 10, 10, 10)"); + st.execute("insert into app.t1 values (20, 20, 20, 20)"); + conn.commit(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER FIRST TWO INSERT"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + st.execute("delete from app.t1 where c4=10"); + st.execute("insert into app.t1 values (70, 70, 70, 70)"); + st.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 70"); + rs = st.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 70); + assertEquals(rs.getInt(2), 70); + assertEquals(rs.getInt(3), 70); + assertEquals(rs.getInt(4), 70); + st.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st.getResultSet(); + assertFalse(rs.next()); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: DELETE INSERT BEFORE COMMIT TX 1"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + // Take another connection and fire the same query ... that should see the + // committed value + Connection conn2 = TestUtil.getConnection(); + conn2.setTransactionIsolation(getIsolationLevel()); + Statement st2 = conn2.createStatement(); + st2.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st2.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 10); + assertEquals(rs.getInt(2), 10); + assertEquals(rs.getInt(3), 10); + assertEquals(rs.getInt(4), 10); + conn2.commit(); + conn.commit(); + // Now both the connection should see same committed row corresponding to 70 + st.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st.getResultSet(); + assertFalse(rs.next()); + st.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 70"); + rs = st.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 70); + assertEquals(rs.getInt(2), 70); + assertEquals(rs.getInt(3), 70); + assertEquals(rs.getInt(4), 70); + conn.commit(); + st2.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 10"); + rs = st2.getResultSet(); + assertFalse(rs.next()); + st2.execute("select * from app.t1 --GEMFIREXD-PROPERTIES index=uniqindex\n where c1 = 70"); + rs = st2.getResultSet(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 70); + assertEquals(rs.getInt(2), 70); + assertEquals(rs.getInt(3), 70); + assertEquals(rs.getInt(4), 70); + conn2.commit(); + conn.close(); + conn2.close(); + invokeInEveryVM(new SerializableRunnable() { + @Override + public void run() { + if (GemFireCacheImpl.getInstance() != null && + ServerGroupUtils.isDataStore()) { + String regionPath = "/APP/T1"; + Region r = Misc.getRegionByPath(regionPath); + DistributedRegion dr = (DistributedRegion) r; + dr.getLogWriterI18n().fine("KN: AFTER COMMIT TX 1"); + //pr.dumpAllBuckets(false, Misc.getI18NLogWriter()); + GfxdIndexManager idxmgr = (GfxdIndexManager)dr.getIndexUpdater(); + idxmgr.dumpAllIndexes(); + } + } + }); + } + // static lists to transfer clientVMs/serverVMs between tests since each test + // creates a new test object + private static List globalClientVMs; + private static List globalServerVMs; + + @Override + public void beforeClass() throws Exception { + globalClientVMs = null; + globalServerVMs = null; + super.beforeClass(); + super.baseShutDownAll(); + deleteAllOplogFiles(); + getLogWriter().info(getClass() + ".beforeClass: starting 1+3 VMs"); + startVMs(1, 3); + getLogWriter().info(getClass() + ".beforeClass: started 1+3 VMs: " + + clientVMs + " ; " + serverVMs); + currentUserName = GemFireXDUtils.getRandomString(true); + } + + @Override + public void setUp() throws Exception { + super.commonSetUp(); + super.baseSetUp(); + if (globalClientVMs != null) { + clientVMs.clear(); + clientVMs.addAll(globalClientVMs); + serverVMs.clear(); + serverVMs.addAll(globalServerVMs); + } + resetObservers(); + invokeInEveryVM(TransactionDUnit.class, "resetObservers"); + String userName = currentUserName; + setupConnection(userName); + invokeInEveryVM(TransactionDUnit.class, "setupConnection", + new Object[]{userName}); + getLogWriter().info(getClass() + "." + getTestName() + ".setUp VMs: " + + clientVMs + " ; " + serverVMs); + } + + public static void setupConnection(String userName) throws SQLException { + resetConnection(); + TestUtil.currentUserName = userName; + TestUtil.currentUserPassword = userName; + if (GemFireCacheImpl.getInstance() != null) { + TestUtil.setupConnection(); + } + } + + @Override + protected void baseShutDownAll() throws Exception { + TestUtil.stopNetServer(); + invokeInEveryVM(TestUtil.class, "stopNetServer"); + globalClientVMs = clientVMs; + globalServerVMs = serverVMs; + } + + @Override + public void afterClass() throws Exception { + globalClientVMs = null; + globalServerVMs = null; + super.baseShutDownAll(); + super.afterClass(); + } + + public static void resetObservers() { + final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + if (cache != null) { + TXManagerImpl txMgr = cache.getCacheTransactionManager(); + for (TXStateProxy tx : txMgr.getHostedTransactionsInProgress()) { + tx.setObserver(null); + } + txMgr.setObserver(null); + GemFireXDQueryObserverHolder.clearInstance(); + } + } + + protected static String currentUserName; +} diff --git a/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionUniqIndexesMixRRDUnit.java b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionUniqIndexesMixRRDUnit.java new file mode 100644 index 000000000..fb8c9f0c5 --- /dev/null +++ b/gemfirexd/tools/src/dunit/java/com/pivotal/gemfirexd/transactions/TransactionUniqIndexesMixRRDUnit.java @@ -0,0 +1,15 @@ +package com.pivotal.gemfirexd.transactions; + +import java.sql.Connection; + +public class TransactionUniqIndexesMixRRDUnit extends TransactionUniqIndexesMixDUnit { + + public TransactionUniqIndexesMixRRDUnit(String name) { + super(name); + } + + @Override + protected int getIsolationLevel() { + return Connection.TRANSACTION_REPEATABLE_READ; + } +}