Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,7 @@ protected final int entryCountMod() {
/**
* Returns true if this entry may have been created by this transaction.
*/
protected final boolean wasCreatedByTX() {
public final boolean wasCreatedByTX() {
return Token.isRemoved(this.originalVersionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
/**
* Extends GFE {@link TXStateProxy} to add GFXD specific artifacts including
* DBSynchronizer message list to be sent out at commit time.
*
*
* @author swale
* @since 7.0
*/
Expand Down Expand Up @@ -433,7 +433,7 @@ final void cleanupIndexEntryForDestroy(Object container,
false /* isPutDML */);

if (GemFireXDUtils.TraceIndex | GemFireXDUtils.TraceQuery) {
GfxdIndexManager.traceIndex("SortedMap2Index cleanup: "
GfxdIndexManager.traceIndex("GfxdTXStateProxy cleanupIndexEntryForDestroy: "
+ "rolled back key=%s to value=(%s) in %s", indexKey,
GemFireXDUtils.TraceIndex ? oldRowLocation : ArrayUtils
.objectRefString(oldRowLocation), indexContainer);
Expand Down Expand Up @@ -476,7 +476,7 @@ public final boolean execute(final Object k1, final Object k2,
}
});
}

if (unaffectedIndexInfo != null) {
unaffectedIndexInfo.forEachEntry(new TObjectObjectObjectProcedure() {
@Override
Expand Down Expand Up @@ -663,12 +663,12 @@ private void updateIndexAtCommitAbortNoThrow(GfxdTXEntryState sqle,
throws StandardException {
boolean deleted;
Object valueBytesBeingReplaced = sqle.getPendingValue();
//TODO:Asif: This will cause a memory leak if an existing entry is

//TODO:Asif: This will cause a memory leak if an existing entry is
//updated in txn, such a new index key is introduced, and then again
// a txn update happens such that byte [] changes . Then
// a txn update happens such that byte [] changes . Then
// the firts indexkey's byte[] will not match the pending value

try {
if (rollback) {
if (!indexContainer.isGlobalIndex()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static boolean existsKey(TXStateInterface tx, LocalRegion lr,
if (rl != null && (rlTXId = rl.getTXId()) != null) {
rl = SortedMap2IndexScanController.AbstractRowLocationIterator
.isRowLocationValidForTransaction(rl, rlTXId, tx,
GfxdConstants.SCAN_OPENMODE_FOR_READONLY_LOCK);
GfxdConstants.SCAN_OPENMODE_FOR_READONLY_LOCK, indexContainer.isUniqueIndex());
}
final Object ckey = fetch.getCurrentKey();
final CompactCompositeIndexKey mapKey = ckey != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,11 @@ public final void init(GemFireTransaction tran, MemConglomerate conglomerate,
if (GemFireXDUtils.TraceIndex) {
GfxdIndexManager.traceIndex("Opening MemIndexScanController for index "
+ "container=%s, in mode=0x%s with scanColumnList=%s startKey=%s "
+ "startOp=%s stopKey=%s stopOp=%s qualifier=%s tx=%s",
+ "startOp=%s stopKey=%s stopOp=%s qualifier=%s",
conglomerate.getGemFireContainer(), Integer.toHexString(openMode),
scanColumnList, ArrayUtils.objectString(startKeyValue),
startSearchOperator, ArrayUtils.objectString(stopKeyValue),
stopSearchOperator, ArrayUtils.objectString(qualifier),
tran.getActiveTXState());
stopSearchOperator, ArrayUtils.objectString(qualifier));
}

this.openMode = openMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ static abstract class AbstractRowLocationIterator {

public static final RowLocation isRowLocationValidForTransaction(
final RowLocation rl, final TXId rlTXId, final TXStateInterface tx,
final int openMode) {
final int openMode, final boolean isUniqIndex) {
RowLocation ret = null;
final boolean sameTransaction = tx != null
&& rlTXId.equals(tx.getTransactionId());
Expand Down Expand Up @@ -904,6 +904,10 @@ public static final RowLocation isRowLocationValidForTransaction(
}
if (sameTransaction) {
ret = rl;
// check if it is a case of unique index and then see if the underlying region entry
// should be returned or not.
} else if (isUniqIndex) {
ret = (RowLocation) ((GfxdTXEntryState)rl).getCommittedEntry();
}
}
return ret;
Expand All @@ -912,7 +916,7 @@ public static final RowLocation isRowLocationValidForTransaction(
public static final RowLocation isRowLocationValid(RowLocation rowloc,
final LanguageConnectionContext lcc, final TXStateInterface tx,
final GemFireContainer container, final boolean[] localBucketSet,
final int openMode) {
final int openMode, final OpenMemIndex openConglom) {
final TXId rlTXId;

if (GemFireXDUtils.TraceIndex) {
Expand All @@ -927,7 +931,7 @@ public static final RowLocation isRowLocationValid(RowLocation rowloc,
// below will tell whether RowLocation has to be skipped for this
// transaction and also unwrap a TX deleted row if required
rowloc = isRowLocationValidForTransaction(rowloc, rlTXId, tx,
openMode);
openMode, openConglom.isUnique());
if (rowloc == null) {
if (GfxdTXStateProxy.LOG_FINEST | GemFireXDUtils.TraceIndex) {
SanityManager.DEBUG_PRINT(GfxdConstants.TRACE_INDEX,
Expand Down Expand Up @@ -1012,7 +1016,7 @@ public RowLocation next() throws StandardException {
for (;;) {
if (this.index < this.rlArray.length) {
RowLocation rl = isRowLocationValid(this.rlArray[this.index], lcc,
txState, this.container, this.localBucketSet, openMode);
txState, this.container, this.localBucketSet, openMode, openConglom);
this.index++;
if (rl != null) {
return rl;
Expand Down Expand Up @@ -1063,7 +1067,7 @@ public RowLocation next() throws StandardException {
while (itrPos < itrLen) {
RowLocation rl = (RowLocation)itrCache[itrPos++];
rl = isRowLocationValid(rl, lcc, txState, this.container,
this.localBucketSet, openMode);
this.localBucketSet, openMode, openConglom);
if (rl != null) {
return rl;
}
Expand All @@ -1075,7 +1079,7 @@ public RowLocation next() throws StandardException {
else if ((itr = this.itr) != null) {
while (itr.hasNext()) {
RowLocation rl = isRowLocationValid((RowLocation)itr.next(), lcc,
txState, this.container, this.localBucketSet, openMode);
txState, this.container, this.localBucketSet, openMode, openConglom);
if (rl != null) {
return rl;
}
Expand Down Expand Up @@ -1265,7 +1269,7 @@ else if (valueCls == ConcurrentTHashSet.class) {
this.rlIterator = null;
if ((rl = AbstractRowLocationIterator.isRowLocationValid(
(RowLocation)nextValue, lcc, this.txState, this.baseContainer,
this.localBucketSet, this.openMode)) != null) {
this.localBucketSet, this.openMode, openConglom)) != null) {
//this.movedToNextIndexKey = true;
++this.scanKeyGroupID;
return rl;
Expand Down Expand Up @@ -1343,7 +1347,7 @@ private final RowLocation scanHashSet(final ConcurrentTHashSet<?> set)
if (set.contains(this.initRowLocation)) {
return AbstractRowLocationIterator.isRowLocationValid(
this.initRowLocation, lcc, this.txState, this.baseContainer,
this.localBucketSet, this.openMode);
this.localBucketSet, this.openMode, openConglom);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@
import com.gemstone.gemfire.cache.ConflictException;
import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
import com.gemstone.gemfire.i18n.LogWriterI18n;
import com.gemstone.gemfire.internal.cache.AbstractRegionEntry;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.ObjectEqualsHashingStrategy;
import com.gemstone.gemfire.internal.cache.OffHeapRegionEntry;
import com.gemstone.gemfire.internal.cache.TXEntryState;
import com.gemstone.gemfire.internal.cache.TXId;
import com.gemstone.gemfire.internal.cache.TXStateInterface;
import com.gemstone.gemfire.internal.cache.*;
import com.gemstone.gemfire.internal.cache.locks.ExclusiveSharedSynchronizer;
import com.gemstone.gemfire.internal.cache.locks.LockMode;
import com.gemstone.gemfire.internal.cache.locks.LockingPolicy;
Expand Down Expand Up @@ -197,80 +191,100 @@ else if (oldValue != null) {
final TXId txId, rlTXId;
final RowLocation rl;
if (tx != null && oldValue instanceof RowLocation
&& (rlTXId = (rl = (RowLocation)oldValue).getTXId()) != null
&& !rlTXId.equals(txId = tx.getTransactionId())) {
final LockingPolicy lockPolicy = tx.getLockingPolicy();
if (lockTimeout < 0) { // not set
final AbstractRegionEntry re = (AbstractRegionEntry)rl
.getUnderlyingRegionEntry();
final LockMode currentMode;
if (re == null || (currentMode = ExclusiveSharedSynchronizer
.getLockModeFromState(re.getState())) == null) {
throw GemFireXDUtils.newDuplicateKeyViolation("unique constraint",
container.getQualifiedTableName(), "key=" + key.toString()
+ ", row=" + value, rl, null, null);
}
lockTimeout = lockPolicy.getTimeout(rl,
lockPolicy.getWriteLockMode(), currentMode, 0, 0);
waitThreshold = re.getWaitThreshold() * 1000;
if (lockTimeout < 0) {
lockTimeout = ExclusiveSharedSynchronizer.getMaxMillis();
}
continue;
}
else if (lockTimeout > 0) { // wait before throwing conflict
final long sleepTime = lockTimeout < 5 ? lockTimeout : 5;
final long start = System.nanoTime();
final GemFireCacheImpl cache = Misc.getGemFireCache();
try {
Thread.sleep(sleepTime);
long elapsed = (System.nanoTime() - start + 500000) / 1000000;
if (elapsed <= 0) {
elapsed = 1;
&& (rlTXId = (rl = (RowLocation) oldValue).getTXId()) != null) {
if (!rlTXId.equals(txId = tx.getTransactionId())) {
final LockingPolicy lockPolicy = tx.getLockingPolicy();
if (lockTimeout < 0) { // not set
final AbstractRegionEntry re = (AbstractRegionEntry) rl
.getUnderlyingRegionEntry();
final LockMode currentMode;
if (re == null || (currentMode = ExclusiveSharedSynchronizer
.getLockModeFromState(re.getState())) == null) {
throw GemFireXDUtils.newDuplicateKeyViolation("unique constraint",
container.getQualifiedTableName(), "key=" + key.toString()
+ ", row=" + value, rl, null, null);
}
final long origLockTimeout = lockTimeout;
if (lockTimeout < elapsed) {
lockTimeout = 0;
lockTimeout = lockPolicy.getTimeout(rl,
lockPolicy.getWriteLockMode(), currentMode, 0, 0);
waitThreshold = re.getWaitThreshold() * 1000;
if (lockTimeout < 0) {
lockTimeout = ExclusiveSharedSynchronizer.getMaxMillis();
}
else {
lockTimeout -= elapsed;
}
if (waitThreshold > 0) {
// check if there is a factor of waitThreshold between
// origLockTimeout and lockTimeout
final long origQuotient = origLockTimeout / waitThreshold;
final long quotient = lockTimeout / waitThreshold;
if (origQuotient > quotient
|| (origLockTimeout % waitThreshold) == 0) {
final LogWriterI18n logger = cache.getLoggerI18n();
if (logger.warningEnabled()) {
logger.warning(LocalizedStrings.LocalLock_Waiting,
new Object[] { "SortedMap2IndexInsertOperation",
Double.toString(waitThreshold / 1000.0),
lockPolicy.getWriteLockMode().toString(),
"index key with oldValue=" + ArrayUtils
.objectStringNonRecursive(oldValue) + ", owner="
+ rlTXId, ArrayUtils.objectString(key),
lockTimeout });
continue;
} else if (lockTimeout > 0) { // wait before throwing conflict
final long sleepTime = lockTimeout < 5 ? lockTimeout : 5;
final long start = System.nanoTime();
final GemFireCacheImpl cache = Misc.getGemFireCache();
try {
Thread.sleep(sleepTime);
long elapsed = (System.nanoTime() - start + 500000) / 1000000;
if (elapsed <= 0) {
elapsed = 1;
}
final long origLockTimeout = lockTimeout;
if (lockTimeout < elapsed) {
lockTimeout = 0;
} else {
lockTimeout -= elapsed;
}
if (waitThreshold > 0) {
// check if there is a factor of waitThreshold between
// origLockTimeout and lockTimeout
final long origQuotient = origLockTimeout / waitThreshold;
final long quotient = lockTimeout / waitThreshold;
if (origQuotient > quotient
|| (origLockTimeout % waitThreshold) == 0) {
final LogWriterI18n logger = cache.getLoggerI18n();
if (logger.warningEnabled()) {
logger.warning(LocalizedStrings.LocalLock_Waiting,
new Object[]{"SortedMap2IndexInsertOperation",
Double.toString(waitThreshold / 1000.0),
lockPolicy.getWriteLockMode().toString(),
"index key with oldValue=" + ArrayUtils
.objectStringNonRecursive(oldValue) + ", owner="
+ rlTXId, ArrayUtils.objectString(key),
lockTimeout});
}
// increase waitThreshold by a factor of 2 for next iteration
waitThreshold <<= 1;
}
// increase waitThreshold by a factor of 2 for next iteration
waitThreshold <<= 1;
}
} catch (InterruptedException ie) {
cache.getCancelCriterion().checkCancelInProgress(ie);
}
continue;
}
final ConflictException ce = new ConflictException(
LocalizedStrings.TX_CONFLICT_ON_OBJECT.toLocalizedString("index="
+ container + "; indexKey=" + ArrayUtils.objectString(key)
+ "; having oldValue=" + ArrayUtils.objectStringNonRecursive(
oldValue) + "; owner TX=" + rlTXId
+ "; requested for TX=" + txId,
lockPolicy.getWriteLockMode().toString()));
throw StandardException.newException(
SQLState.GFXD_OPERATION_CONFLICT, ce, ce.getMessage());
} else {
// This else part is mainly added to handle SNAP-2620
if (oldValue instanceof WrapperRowLocationForTxn) {
GfxdTXEntryState gfxdtxentry = (GfxdTXEntryState)value;
gfxdtxentry.setCommittedRegionEntry(
((WrapperRowLocationForTxn)oldValue).getUnderlyingRegionEntry());
if (skipListMap.replace(key, oldValue, value)) {
// remove txentry, wrapper pair for reinstated map
// which is with TXRegionState
TXRegionState txrs = tx.readRegion((gfxdtxentry).getDataRegion());
if (txrs.getToBeReinstatedIndexMap() != null) {
Object wrappedRL = ((WrapperRowLocationForTxn)oldValue).getWrappedRowLocation();
txrs.getToBeReinstatedIndexMap().removeKeyPair(wrappedRL, container);
gfxdtxentry.updateIndexInfos(container, key);
}
break;
}
else {
continue;
}
} catch (InterruptedException ie) {
cache.getCancelCriterion().checkCancelInProgress(ie);
}
continue;
}
final ConflictException ce = new ConflictException(
LocalizedStrings.TX_CONFLICT_ON_OBJECT.toLocalizedString("index="
+ container + "; indexKey=" + ArrayUtils.objectString(key)
+ "; having oldValue=" + ArrayUtils.objectStringNonRecursive(
oldValue) + "; owner TX=" + rlTXId
+ "; requested for TX=" + txId,
lockPolicy.getWriteLockMode().toString()));
throw StandardException.newException(
SQLState.GFXD_OPERATION_CONFLICT, ce, ce.getMessage());
}
if (wrapperToReplaceUniqEntry != null) {
// if existing value is already a wrapper then no need to replace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,13 +701,13 @@ public WrapperRowLocationForTxn wrapperForRollback(
final boolean isOpDestroy = isOpDestroy();
// in case of op = destroy or op = update/put of an existing entry
// we need to reinstate the old index during rollback
if (!isOpPut && !isOpDestroy) {
SanityManager.DEBUG_PRINT(GfxdConstants.TRACE_TRAN, "ERROR: Unexpected op = " +
this.op + ", opToString returns: " + this.opToString() + " for " + toString() +
", event: " + event);
throw new IllegalTransactionStateException("TXEntryState.wrapperForRollback: unexpected " +
"operation " + opToString() + " for event: " + event + ", on entry: " + toString());
}
// if (!isOpPut && !isOpDestroy) {
// SanityManager.DEBUG_PRINT(GfxdConstants.TRACE_TRAN, "ERROR: Unexpected op = " +
// this.op + ", opToString returns: " + this.opToString() + " for " + toString() +
// ", event: " + event);
// throw new IllegalTransactionStateException("TXEntryState.wrapperForRollback: unexpected " +
// "operation " + opToString() + " for event: " + event + ", on entry: " + toString());
// }

final TXRegionState txrs = this.txRegionState;
txrs.lock();
Expand Down Expand Up @@ -1715,4 +1715,14 @@ public void endIndexKeyUpdate() {
public Version[] getSerializationVersions() {
return null;
}

private volatile RegionEntry committedRegionEntry = null;

public void setCommittedRegionEntry(RegionEntry committedRegionEntry) {
this.committedRegionEntry = committedRegionEntry;
}

public RegionEntry getCommittedEntry() {
return this.committedRegionEntry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@
import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverAdapter;
import com.pivotal.gemfirexd.internal.engine.GemFireXDQueryObserverHolder;
import com.pivotal.gemfirexd.internal.engine.Misc;
import com.pivotal.gemfirexd.internal.engine.access.index.GfxdIndexManager;
import com.pivotal.gemfirexd.internal.engine.distributed.GfxdConnectionHolder;
import com.pivotal.gemfirexd.internal.engine.distributed.GfxdConnectionWrapper;
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils;
import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer;
import com.pivotal.gemfirexd.internal.engine.store.ServerGroupUtils;
import com.pivotal.gemfirexd.internal.iapi.error.StandardException;
import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection;
import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedPreparedStatement;
Expand Down
Loading