diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java index cea15a1bfb68..db3a599163ae 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java @@ -70,15 +70,26 @@ protected void periodicExecute(final Env env) { // Failed procedures aren't persisted in WAL. batchIds[batchCount++] = entry.getKey(); if (batchCount == batchIds.length) { - store.delete(batchIds, 0, batchCount); - batchCount = 0; + try { + store.delete(batchIds, 0, batchCount); + } catch (Exception e) { + LOG.error("Error deleting completed procedures {}.", proc, e); + // Do not remove from the completed map + continue; + } finally { + batchCount = 0; + } } it.remove(); LOG.trace("Evict completed {}", proc); } } if (batchCount > 0) { - store.delete(batchIds, 0, batchCount); + try { + store.delete(batchIds, 0, batchCount); + } catch (Exception e) { + LOG.error("Error deleting completed procedures {}.", batchIds, e); + } } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 91af03d3971b..b660737bf3fa 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -196,7 +196,7 @@ public void deserialize(ByteBuffer byteBuffer) { byteBuffer.get(resultArr); } // has lock - if (byteBuffer.get() == 1) { + if (byteBuffer.get() == 1 && this.state != ProcedureState.ROLLEDBACK) { this.lockedWhenLoading(); } } @@ -300,8 +300,12 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) { } ProcedureLockState state = acquireLock(env); if (state == ProcedureLockState.LOCK_ACQUIRED) { - locked = true; - store.update(this); + try { + locked = true; + store.update(this); + } catch (Exception e) { + LOG.warn("pid={} Failed to persist lock state to store.", this.procId, e); + } } return state; } @@ -314,10 +318,16 @@ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) { */ public final void doReleaseLock(Env env, IProcedureStore store) { locked = false; - if (getState() != ProcedureState.ROLLEDBACK) { + if (getState() == ProcedureState.ROLLEDBACK) { + LOG.info("Force write unlock state to raft for pid={}", this.procId); + } + try { store.update(this); + // do not release lock when consensus layer is not working + releaseLock(env); + } catch (Exception e) { + LOG.error("pid={} Failed to persist unlock state to store.", this.procId, e); } - releaseLock(env); } public final void restoreLock(Env env) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 0d8368583b4e..9e3cb5c48775 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -477,12 +477,17 @@ private void countDownChildren(RootProcedureStack rootProcStack, Procedure } if (parent != null && parent.tryRunnable()) { // If success, means all its children have completed, move parent to front of the queue. - store.update(parent); - scheduler.addFront(parent); - LOG.info( - "Finished subprocedure pid={}, resume processing ppid={}", - proc.getProcId(), - parent.getProcId()); + try { + store.update(parent); + // do not add this procedure when exception occurred + scheduler.addFront(parent); + LOG.info( + "Finished subprocedure pid={}, resume processing ppid={}", + proc.getProcId(), + parent.getProcId()); + } catch (Exception e) { + LOG.warn("Failed to update parent on countdown", e); + } } } @@ -506,21 +511,38 @@ private void updateStoreOnExecution( if (LOG.isDebugEnabled()) { LOG.debug("Stored {}, children {}", proc, Arrays.toString(subprocs)); } - store.update(subprocs); + try { + store.update(subprocs); + } catch (Exception e) { + LOG.warn("Failed to update subprocs on execution", e); + } } else { LOG.debug("Store update {}", proc); if (proc.isFinished() && !proc.hasParent()) { final long[] childProcIds = rootProcStack.getSubprocedureIds(); if (childProcIds != null) { - store.delete(childProcIds); - for (long childProcId : childProcIds) { - procedures.remove(childProcId); + try { + store.delete(childProcIds); + // do not remove these procedures when exception occurred + for (long childProcId : childProcIds) { + procedures.remove(childProcId); + } + } catch (Exception e) { + LOG.warn("Failed to delete subprocedures on execution", e); } } else { - store.update(proc); + try { + store.update(proc); + } catch (Exception e) { + LOG.warn("Failed to update procedure on execution", e); + } } } else { - store.update(proc); + try { + store.update(proc); + } catch (Exception e) { + LOG.warn("Failed to update procedure on execution", e); + } } } } @@ -577,7 +599,13 @@ private ProcedureLockState executeRootStackRollback( if (exception == null) { exception = procedureStack.getException(); rootProcedure.setFailure(exception); - store.update(rootProcedure); + try { + store.update(rootProcedure); + } catch (Exception e) { + LOG.warn("Failed to update root procedure on rollback", e); + // roll back + rootProcedure.setFailure(null); + } } List> subprocStack = procedureStack.getSubproceduresStack(); int stackTail = subprocStack.size(); @@ -653,18 +681,31 @@ private void cleanupAfterRollback(Procedure procedure) { procedure.updateMetricsOnFinish(getEnvironment(), procedure.elapsedTime(), false); if (procedure.hasParent()) { - store.delete(procedure.getProcId()); - procedures.remove(procedure.getProcId()); + try { + store.delete(procedure.getProcId()); + // do not remove this procedure when exception occurred + procedures.remove(procedure.getProcId()); + } catch (Exception e) { + LOG.warn("Failed to delete procedure on rollback", e); + } } else { final long[] childProcIds = rollbackStack.get(procedure.getProcId()).getSubprocedureIds(); - if (childProcIds != null) { - store.delete(childProcIds); - } else { - store.update(procedure); + try { + if (childProcIds != null) { + store.delete(childProcIds); + } else { + store.update(procedure); + } + } catch (Exception e) { + LOG.warn("Failed to delete procedure on rollback", e); } } } else { - store.update(procedure); + try { + store.update(procedure); + } catch (Exception e) { + LOG.warn("Failed to update procedure on rollback", e); + } } } @@ -916,7 +957,11 @@ public long submitProcedure(Procedure procedure) { procedure.setProcId(store.getNextProcId()); procedure.setProcRunnable(); // Commit the transaction - store.update(procedure); + try { + store.update(procedure); + } catch (Exception e) { + LOG.error("Failed to update store procedure {}", procedure, e); + } LOG.debug("{} is stored.", procedure); // Add the procedure to the executor return pushProcedure(procedure); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java index 5aaf9a623f52..0a1f5f8248b3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java @@ -19,12 +19,16 @@ package org.apache.iotdb.confignode.procedure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class TimeoutExecutorThread extends StoppableThread { + private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutExecutorThread.class); private static final int DELAY_QUEUE_TIMEOUT = 20; private final ProcedureExecutor executor; private final DelayQueue> queue = new DelayQueue<>(); @@ -71,7 +75,11 @@ public void run() { long rootProcId = executor.getRootProcedureId(procedure); RootProcedureStack rollbackStack = executor.getRollbackStack(rootProcId); rollbackStack.abort(); - executor.getStore().update(procedure); + try { + executor.getStore().update(procedure); + } catch (Exception e) { + LOGGER.warn("Failed to update procedure {}", procedure, e); + } executor.getScheduler().addFront(procedure); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java index 393c1e93740e..1cd6687b26ce 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java @@ -89,43 +89,51 @@ public long getNextProcId() { } @Override - public void update(Procedure procedure) { + public void update(Procedure procedure) throws Exception { Objects.requireNonNull(ProcedureFactory.getProcedureType(procedure), "Procedure type is null"); final UpdateProcedurePlan updateProcedurePlan = new UpdateProcedurePlan(procedure); try { configManager.getConsensusManager().write(updateProcedurePlan); } catch (ConsensusException e) { - LOG.warn("Failed in the write API executing the consensus layer due to: ", e); + LOG.warn( + "pid={} Failed in the write update API executing the consensus layer due to: ", + procedure.getProcId(), + e); + throw e; } } @Override - public void update(Procedure[] subprocs) { + public void update(Procedure[] subprocs) throws Exception { for (Procedure subproc : subprocs) { update(subproc); } } @Override - public void delete(long procId) { + public void delete(long procId) throws Exception { DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan(); deleteProcedurePlan.setProcId(procId); try { configManager.getConsensusManager().write(deleteProcedurePlan); } catch (ConsensusException e) { - LOG.warn("Failed in the write API executing the consensus layer due to: ", e); + LOG.warn( + "pid={} Failed in the write delete API executing the consensus layer due to: ", + procId, + e); + throw e; } } @Override - public void delete(long[] childProcIds) { + public void delete(long[] childProcIds) throws Exception { for (long childProcId : childProcIds) { delete(childProcId); } } @Override - public void delete(long[] batchIds, int startIndex, int batchCount) { + public void delete(long[] batchIds, int startIndex, int batchCount) throws Exception { for (int i = startIndex; i < batchCount; i++) { delete(batchIds[i]); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java index 8e8e715fd84f..3dba6d29288d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java @@ -38,15 +38,15 @@ public interface IProcedureStore { long getNextProcId(); - void update(Procedure procedure); + void update(Procedure procedure) throws Exception; - void update(Procedure[] subprocs); + void update(Procedure[] subprocs) throws Exception; - void delete(long procId); + void delete(long procId) throws Exception; - void delete(long[] childProcIds); + void delete(long[] childProcIds) throws Exception; - void delete(long[] batchIds, int startIndex, int batchCount); + void delete(long[] batchIds, int startIndex, int batchCount) throws Exception; void cleanup();