diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 5b45530d2e20e..4fa1c6aca0fee 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -86,9 +86,13 @@ public void close() throws Exception { // Fail all the pending items MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed"); - readOps.drain(op -> op.getFuture().completeExceptionally(ex)); - writeOps.drain(op -> op.getFuture().completeExceptionally(ex)); - + MetadataOp op; + while ((op = readOps.poll()) != null) { + op.getFuture().completeExceptionally(ex); + } + while ((op = writeOps.poll()) != null) { + op.getFuture().completeExceptionally(ex); + } scheduledTask.cancel(true); } super.close(); @@ -98,7 +102,13 @@ public void close() throws Exception { private void flush() { while (!readOps.isEmpty()) { List ops = new ArrayList<>(); - readOps.drain(ops::add, maxOperations); + for (int i = 0; i < maxOperations; i++) { + MetadataOp op = readOps.poll(); + if (op == null) { + break; + } + ops.add(op); + } internalBatchOperation(ops); } @@ -167,6 +177,11 @@ public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchroniz } private void enqueue(MessagePassingQueue queue, MetadataOp op) { + if (isClosed()) { + MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException(); + op.getFuture().completeExceptionally(ex); + return; + } if (enabled) { if (!queue.offer(op)) { // Execute individually if we're failing to enqueue @@ -182,6 +197,12 @@ private void enqueue(MessagePassingQueue queue, MetadataOp op) { } private void internalBatchOperation(List ops) { + if (isClosed()) { + MetadataStoreException ex = + new MetadataStoreException.AlreadyClosedException(); + ops.forEach(op -> op.getFuture().completeExceptionally(ex)); + return; + } long now = System.currentTimeMillis(); for (MetadataOp op : ops) { this.batchMetadataStoreStats.recordOpWaiting(now - op.created());