Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,26 @@ protected void periodicExecute(final Env env) {
// Failed procedures aren't persisted in WAL.
batchIds[batchCount++] = entry.getKey();
if (batchCount == batchIds.length) {
store.delete(batchIds, 0, batchCount);
batchCount = 0;
try {
store.delete(batchIds, 0, batchCount);
} catch (Exception e) {
LOG.error("Error deleting completed procedures {}.", proc, e);
// Do not remove from the completed map
continue;
} finally {
batchCount = 0;
}
}
it.remove();
LOG.trace("Evict completed {}", proc);
}
}
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
try {
store.delete(batchIds, 0, batchCount);
} catch (Exception e) {
LOG.error("Error deleting completed procedures {}.", batchIds, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void deserialize(ByteBuffer byteBuffer) {
byteBuffer.get(resultArr);
}
// has lock
if (byteBuffer.get() == 1) {
if (byteBuffer.get() == 1 && this.state != ProcedureState.ROLLEDBACK) {
this.lockedWhenLoading();
}
}
Expand Down Expand Up @@ -300,8 +300,12 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
}
ProcedureLockState state = acquireLock(env);
if (state == ProcedureLockState.LOCK_ACQUIRED) {
locked = true;
store.update(this);
try {
locked = true;
store.update(this);
} catch (Exception e) {
LOG.warn("pid={} Failed to persist lock state to store.", this.procId, e);
}
}
return state;
}
Expand All @@ -314,10 +318,16 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
*/
public final void doReleaseLock(Env env, IProcedureStore store) {
locked = false;
if (getState() != ProcedureState.ROLLEDBACK) {
if (getState() == ProcedureState.ROLLEDBACK) {
LOG.info("Force write unlock state to raft for pid={}", this.procId);
}
try {
store.update(this);
// do not release lock when consensus layer is not working
releaseLock(env);
} catch (Exception e) {
Comment on lines +324 to +328
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who will release the lock then?

LOG.error("pid={} Failed to persist unlock state to store.", this.procId, e);
}
releaseLock(env);
}

public final void restoreLock(Env env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,17 @@ private void countDownChildren(RootProcedureStack rootProcStack, Procedure<Env>
}
if (parent != null && parent.tryRunnable()) {
// If success, means all its children have completed, move parent to front of the queue.
store.update(parent);
scheduler.addFront(parent);
LOG.info(
"Finished subprocedure pid={}, resume processing ppid={}",
proc.getProcId(),
parent.getProcId());
try {
store.update(parent);
// do not add this procedure when exception occurred
scheduler.addFront(parent);
LOG.info(
"Finished subprocedure pid={}, resume processing ppid={}",
proc.getProcId(),
parent.getProcId());
} catch (Exception e) {
LOG.warn("Failed to update parent on countdown", e);
}
Comment on lines +480 to +490
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is solely printing a log enough?

}
}

Expand All @@ -506,21 +511,38 @@ private void updateStoreOnExecution(
if (LOG.isDebugEnabled()) {
LOG.debug("Stored {}, children {}", proc, Arrays.toString(subprocs));
}
store.update(subprocs);
try {
store.update(subprocs);
} catch (Exception e) {
LOG.warn("Failed to update subprocs on execution", e);
}
} else {
LOG.debug("Store update {}", proc);
if (proc.isFinished() && !proc.hasParent()) {
final long[] childProcIds = rootProcStack.getSubprocedureIds();
if (childProcIds != null) {
store.delete(childProcIds);
for (long childProcId : childProcIds) {
procedures.remove(childProcId);
try {
store.delete(childProcIds);
// do not remove these procedures when exception occurred
for (long childProcId : childProcIds) {
procedures.remove(childProcId);
}
} catch (Exception e) {
LOG.warn("Failed to delete subprocedures on execution", e);
}
} else {
store.update(proc);
try {
store.update(proc);
} catch (Exception e) {
LOG.warn("Failed to update procedure on execution", e);
}
}
} else {
store.update(proc);
try {
store.update(proc);
} catch (Exception e) {
LOG.warn("Failed to update procedure on execution", e);
}
}
}
}
Expand Down Expand Up @@ -577,7 +599,13 @@ private ProcedureLockState executeRootStackRollback(
if (exception == null) {
exception = procedureStack.getException();
rootProcedure.setFailure(exception);
store.update(rootProcedure);
try {
store.update(rootProcedure);
} catch (Exception e) {
LOG.warn("Failed to update root procedure on rollback", e);
// roll back
rootProcedure.setFailure(null);
}
}
List<Procedure<Env>> subprocStack = procedureStack.getSubproceduresStack();
int stackTail = subprocStack.size();
Expand Down Expand Up @@ -653,18 +681,31 @@ private void cleanupAfterRollback(Procedure<Env> procedure) {
procedure.updateMetricsOnFinish(getEnvironment(), procedure.elapsedTime(), false);

if (procedure.hasParent()) {
store.delete(procedure.getProcId());
procedures.remove(procedure.getProcId());
try {
store.delete(procedure.getProcId());
// do not remove this procedure when exception occurred
procedures.remove(procedure.getProcId());
} catch (Exception e) {
LOG.warn("Failed to delete procedure on rollback", e);
}
} else {
final long[] childProcIds = rollbackStack.get(procedure.getProcId()).getSubprocedureIds();
if (childProcIds != null) {
store.delete(childProcIds);
} else {
store.update(procedure);
try {
if (childProcIds != null) {
store.delete(childProcIds);
} else {
store.update(procedure);
}
} catch (Exception e) {
LOG.warn("Failed to delete procedure on rollback", e);
}
}
} else {
store.update(procedure);
try {
store.update(procedure);
} catch (Exception e) {
LOG.warn("Failed to update procedure on rollback", e);
}
}
}

Expand Down Expand Up @@ -916,7 +957,11 @@ public long submitProcedure(Procedure<Env> procedure) {
procedure.setProcId(store.getNextProcId());
procedure.setProcRunnable();
// Commit the transaction
store.update(procedure);
try {
store.update(procedure);
} catch (Exception e) {
LOG.error("Failed to update store procedure {}", procedure, e);
}
LOG.debug("{} is stored.", procedure);
// Add the procedure to the executor
return pushProcedure(procedure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

package org.apache.iotdb.confignode.procedure;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class TimeoutExecutorThread<Env> extends StoppableThread {

private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutExecutorThread.class);
private static final int DELAY_QUEUE_TIMEOUT = 20;
private final ProcedureExecutor<Env> executor;
private final DelayQueue<ProcedureDelayContainer<Env>> queue = new DelayQueue<>();
Expand Down Expand Up @@ -71,7 +75,11 @@ public void run() {
long rootProcId = executor.getRootProcedureId(procedure);
RootProcedureStack<Env> rollbackStack = executor.getRollbackStack(rootProcId);
rollbackStack.abort();
executor.getStore().update(procedure);
try {
executor.getStore().update(procedure);
} catch (Exception e) {
LOGGER.warn("Failed to update procedure {}", procedure, e);
}
executor.getScheduler().addFront(procedure);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,43 +89,51 @@ public long getNextProcId() {
}

@Override
public void update(Procedure<ConfigNodeProcedureEnv> procedure) {
public void update(Procedure<ConfigNodeProcedureEnv> procedure) throws Exception {
Objects.requireNonNull(ProcedureFactory.getProcedureType(procedure), "Procedure type is null");
final UpdateProcedurePlan updateProcedurePlan = new UpdateProcedurePlan(procedure);
try {
configManager.getConsensusManager().write(updateProcedurePlan);
} catch (ConsensusException e) {
LOG.warn("Failed in the write API executing the consensus layer due to: ", e);
LOG.warn(
"pid={} Failed in the write update API executing the consensus layer due to: ",
procedure.getProcId(),
e);
throw e;
}
}

@Override
public void update(Procedure[] subprocs) {
public void update(Procedure[] subprocs) throws Exception {
for (Procedure subproc : subprocs) {
update(subproc);
}
}

@Override
public void delete(long procId) {
public void delete(long procId) throws Exception {
DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan();
deleteProcedurePlan.setProcId(procId);
try {
configManager.getConsensusManager().write(deleteProcedurePlan);
} catch (ConsensusException e) {
LOG.warn("Failed in the write API executing the consensus layer due to: ", e);
LOG.warn(
"pid={} Failed in the write delete API executing the consensus layer due to: ",
procId,
e);
throw e;
}
}

@Override
public void delete(long[] childProcIds) {
public void delete(long[] childProcIds) throws Exception {
for (long childProcId : childProcIds) {
delete(childProcId);
}
}

@Override
public void delete(long[] batchIds, int startIndex, int batchCount) {
public void delete(long[] batchIds, int startIndex, int batchCount) throws Exception {
for (int i = startIndex; i < batchCount; i++) {
delete(batchIds[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ public interface IProcedureStore<Env> {

long getNextProcId();

void update(Procedure<Env> procedure);
void update(Procedure<Env> procedure) throws Exception;

void update(Procedure<Env>[] subprocs);
void update(Procedure<Env>[] subprocs) throws Exception;

void delete(long procId);
void delete(long procId) throws Exception;

void delete(long[] childProcIds);
void delete(long[] childProcIds) throws Exception;

void delete(long[] batchIds, int startIndex, int batchCount);
void delete(long[] batchIds, int startIndex, int batchCount) throws Exception;

void cleanup();

Expand Down
Loading