From edb72fddc03d7de166497666f5b2849c9a62ddfa Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 14:32:59 +0300 Subject: [PATCH] [improve][meta] Fix race condition in closing metadata store --- .../metadata/impl/AbstractMetadataStore.java | 2 +- .../AbstractBatchedMetadataStore.java | 29 ++++++++++++++++--- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 0a35664391455..5444c573e9089 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -517,7 +517,7 @@ protected void receivedSessionEvent(SessionEvent event) { } } - private boolean isClosed() { + protected boolean isClosed() { return isClosed.get(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index a164e4c246066..75d3085c8a466 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -87,9 +87,13 @@ public void close() throws Exception { // Fail all the pending items MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed"); - readOps.drain(op -> op.getFuture().completeExceptionally(ex)); - writeOps.drain(op -> op.getFuture().completeExceptionally(ex)); - + MetadataOp op; + while ((op = readOps.poll()) != null) { + op.getFuture().completeExceptionally(ex); + } + while ((op = writeOps.poll()) != null) { + op.getFuture().completeExceptionally(ex); + } scheduledTask.cancel(true); } super.close(); @@ -99,7 +103,13 @@ public void close() throws Exception { private void flush() { while (!readOps.isEmpty()) { List ops = new ArrayList<>(); - readOps.drain(ops::add, maxOperations); + for (int i = 0; i < maxOperations; i++) { + MetadataOp op = readOps.poll(); + if (op == null) { + break; + } + ops.add(op); + } internalBatchOperation(ops); } @@ -162,6 +172,11 @@ public Optional getMetadataEventSynchronizer() { } private void enqueue(MessagePassingQueue queue, MetadataOp op) { + if (isClosed()) { + MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException(); + op.getFuture().completeExceptionally(ex); + return; + } if (enabled) { if (!queue.offer(op)) { // Execute individually if we're failing to enqueue @@ -177,6 +192,12 @@ private void enqueue(MessagePassingQueue queue, MetadataOp op) { } private void internalBatchOperation(List ops) { + if (isClosed()) { + MetadataStoreException ex = + new MetadataStoreException.AlreadyClosedException(); + ops.forEach(op -> op.getFuture().completeExceptionally(ex)); + return; + } long now = System.currentTimeMillis(); for (MetadataOp op : ops) { this.batchMetadataStoreStats.recordOpWaiting(now - op.created());