diff --git a/bouncy-castle/bc/pom.xml b/bouncy-castle/bc/pom.xml
index ddca4e6494ada..e5120c0985e34 100644
--- a/bouncy-castle/bc/pom.xml
+++ b/bouncy-castle/bc/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
bouncy-castle-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/bouncy-castle/bcfips-include-test/pom.xml b/bouncy-castle/bcfips-include-test/pom.xml
index fb84e6fe63d32..f5a13b6d21ec3 100644
--- a/bouncy-castle/bcfips-include-test/pom.xml
+++ b/bouncy-castle/bcfips-include-test/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
bouncy-castle-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/bouncy-castle/bcfips/pom.xml b/bouncy-castle/bcfips/pom.xml
index 4fa3ead0eadd4..1c8dde67a9854 100644
--- a/bouncy-castle/bcfips/pom.xml
+++ b/bouncy-castle/bcfips/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
bouncy-castle-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/bouncy-castle/pom.xml b/bouncy-castle/pom.xml
index 979803c96f58c..72cad121ef66e 100644
--- a/bouncy-castle/pom.xml
+++ b/bouncy-castle/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index eca2dadddf384..5bd5cc8a0874e 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -31,7 +31,7 @@
org.apache.pulsar
buildtools
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
jar
Pulsar Build Tools
diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml b/deployment/terraform-ansible/deploy-pulsar.yaml
index cdde04d8aab5c..80c700c568b7d 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -39,7 +39,7 @@
zookeeper_servers: "{{ groups['zookeeper']|map('extract', hostvars, ['ansible_default_ipv4', 'address'])|map('regex_replace', '^(.*)$', '\\1:2181') | join(',') }}"
service_url: "{{ pulsar_service_url }}"
http_url: "{{ pulsar_web_url }}"
- pulsar_version: "2.9.2"
+ pulsar_version: "2.9.2-offload-SNAPSHOT"
- name: Download Pulsar binary package
unarchive:
src: https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-{{ pulsar_version }}/apache-pulsar-{{ pulsar_version }}-bin.tar.gz
diff --git a/distribution/io/pom.xml b/distribution/io/pom.xml
index 155b7da8a080d..8e2c1f61ed21e 100644
--- a/distribution/io/pom.xml
+++ b/distribution/io/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
distribution
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/distribution/offloaders/pom.xml b/distribution/offloaders/pom.xml
index ff4055cafd909..49a5dd7537713 100644
--- a/distribution/offloaders/pom.xml
+++ b/distribution/offloaders/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
distribution
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 72afff04ad6ab..f2cf803c7ab78 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 5fa966d476187..de4f6695e2286 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
distribution
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/docker/grafana/pom.xml b/docker/grafana/pom.xml
index df9617f314ec9..e95132f59d53f 100644
--- a/docker/grafana/pom.xml
+++ b/docker/grafana/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
docker-images
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
grafana-docker-image
diff --git a/docker/pom.xml b/docker/pom.xml
index 94fabef4d82be..ae619a44353f0 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
docker-images
Apache Pulsar :: Docker Images
diff --git a/docker/pulsar-all/pom.xml b/docker/pulsar-all/pom.xml
index a897b00b648cb..f3cb981988ae5 100644
--- a/docker/pulsar-all/pom.xml
+++ b/docker/pulsar-all/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
docker-images
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
pulsar-all-docker-image
diff --git a/docker/pulsar/pom.xml b/docker/pulsar/pom.xml
index 9ee0334651e1c..08669c3bde59c 100644
--- a/docker/pulsar/pom.xml
+++ b/docker/pulsar/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
docker-images
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
pulsar-docker-image
diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
index 3e51419304178..75ccaa75e673b 100644
--- a/jclouds-shaded/pom.xml
+++ b/jclouds-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/kafka-connect-avro-converter-shaded/pom.xml b/kafka-connect-avro-converter-shaded/pom.xml
index 85d189f87b5b5..a1b6e4ed3d6f6 100644
--- a/kafka-connect-avro-converter-shaded/pom.xml
+++ b/kafka-connect-avro-converter-shaded/pom.xml
@@ -26,7 +26,7 @@
pulsar
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 63525923b0003..9109c155a615a 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
@@ -60,6 +60,12 @@
${project.version}
+
+ ${project.groupId}
+ pulsar-broker-common
+ ${project.version}
+
+
${project.groupId}
pulsar-metadata
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 7205b776a9753..e983a08e9a818 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -143,6 +143,12 @@ interface OffloadCallback {
void offloadFailed(ManagedLedgerException exception, Object ctx);
}
+ interface OffloadServiceCallback {
+ void offloadComplete(Object ctx);
+
+ void offloadFailed(ManagedLedgerException exception, Object ctx);
+ }
+
interface UpdatePropertiesCallback {
void updatePropertiesComplete(Map properties, Object ctx);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
index c854996cd1685..73eedb387a28f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java
@@ -20,9 +20,15 @@
import java.io.IOException;
import java.util.Map;
+import org.apache.bookkeeper.client.api.BookKeeper;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
@@ -31,7 +37,7 @@
*/
@LimitedPrivate
@Evolving
-public interface LedgerOffloaderFactory {
+public interface LedgerOffloaderFactory {
/**
* Check whether the provided driver driverName is supported.
@@ -72,4 +78,17 @@ default T create(OffloadPoliciesImpl offloadPolicies,
throws IOException {
return create(offloadPolicies, userMetadata, scheduler);
}
+
+
+ default T create(ServiceConfiguration conf,
+ OffloadPoliciesImpl offloadPolicies,
+ Map userMetadata,
+ PulsarClient pulsarClient,
+ PulsarAdmin pulsarAdmin,
+ BookKeeper bkc,
+ OrderedExecutor executor,
+ OrderedScheduler scheduler,
+ StatsLogger statsLogger) throws IOException {
+ return create(offloadPolicies, userMetadata, scheduler);
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index cd39919a3b357..525a3cd14183a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -503,6 +503,16 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
*/
void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx);
+ /**
+ * Start offload service.
+ * @param topicName
+ * @param operationType
+ * @param callback
+ * @param ctx
+ */
+ void asyncOffloadService(String topicName, String operationType,
+ AsyncCallbacks.OffloadServiceCallback callback, Object ctx);
+
/**
* Get the slowest consumer.
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 2f4e098d677e5..d68e6acba0563 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -33,6 +33,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NullOffloadService;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
@@ -74,6 +75,7 @@ public class ManagedLedgerConfig {
private Class extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName;
private Map bookKeeperEnsemblePlacementPolicyProperties;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
+ private OffloadService offloadService = NullOffloadService.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;
@@ -509,6 +511,28 @@ public ManagedLedgerConfig setLedgerOffloader(LedgerOffloader offloader) {
return this;
}
+ /**
+ * Get offload service which will be used to offload ledgers to longterm storage.
+ *
+ * The default offloader throws an exception on any attempt to offload.
+ *
+ * @return offloadService
+ */
+ public OffloadService getOffloadService() {
+ return offloadService;
+ }
+
+ /**
+ * Set offload service to use to offloading ledgers to long term storage.
+ *
+ * @param offloadService the offload service to use.
+ * @return
+ */
+ public ManagedLedgerConfig setOffloadService(OffloadService offloadService) {
+ this.offloadService = offloadService;
+ return this;
+ }
+
/**
* Get clock to use to time operations
*
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadService.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadService.java
new file mode 100644
index 0000000000000..50a76670fe956
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadService.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+
+/**
+ * The offload service is used for offload the message from a topic.
+ * Each topic will have individual offload service.
+ */
+public interface OffloadService {
+
+ /**
+ * Initialize the offload service.
+ * @return
+ */
+ CompletableFuture initialize();
+
+ /**
+ * Trigger offload process for the topic. The offload process can keep running or stop in specific condition.
+ * @param topic
+ * @return
+ */
+ CompletableFuture offload(String topic);
+
+ /**
+ * Close the offload service.
+ * @return
+ */
+ CompletableFuture closeAsync();
+
+ String getOffloadDriverName();
+
+ /**
+ * Get offload policies of this LedgerOffloader
+ *
+ * @return offload policies
+ */
+ OffloadPoliciesImpl getOffloadPolicies();
+
+ /**
+ * Get related ledger's ReadHandle.
+ * @param ledgerId
+ * @return
+ */
+ CompletableFuture readOffloaded(long ledgerId, String managedLedgerName);
+
+ /**
+ * Get related ledger's ReadHandle.
+ * @param ledgerId
+ * @param executor
+ * @param conf
+ * @return
+ */
+ CompletableFuture readOffloaded(long ledgerId, String managedLedgerName,
+ OrderedExecutor executor, ServiceConfiguration conf);
+
+ void close();
+}
+
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 59b04fec8f81c..8d12765a262dd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -140,6 +140,7 @@
public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private static final long MegaByte = 1024 * 1024;
+ private static final String OFFLOAD_SUB_NAME = "__OFFLOAD";
protected static final int AsyncOperationTimeoutSeconds = 30;
@@ -1753,6 +1754,7 @@ CompletableFuture getLedgerHandle(long ledgerId) {
LedgerInfo info = ledgers.get(ledgerId);
CompletableFuture openFuture;
+ ManagedCursor offloadCursor = cursors.get(OFFLOAD_SUB_NAME);
if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader().getOffloadPolicies() != null
@@ -1771,6 +1773,11 @@ CompletableFuture getLedgerHandle(long ledgerId) {
offloadDriverMetadata.put("ManagedLedgerName", name);
openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid,
offloadDriverMetadata);
+ } else if (config.getOffloadService() != null
+ && !config.getOffloadService().equals(NullOffloadService.INSTANCE)
+ && info != null && offloadCursor != null
+ && ledgerId < offloadCursor.getMarkDeletedPosition().getLedgerId()) {
+ openFuture = config.getOffloadService().readOffloaded(ledgerId, name);
} else {
openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
@@ -2777,6 +2784,17 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) {
}
}
+ @Override
+ public void asyncOffloadService(String topicName, String operationType, AsyncCallbacks.OffloadServiceCallback callback, Object ctx) {
+ config.getOffloadService().offload(topicName).whenComplete((ignore, e) -> {
+ if (e != null) {
+ callback.offloadFailed(ManagedLedgerException.getManagedLedgerException(e), null);
+ } else {
+ callback.offloadComplete(null);
+ }
+ });
+ }
+
@Override
public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
PositionImpl requestOffloadTo = (PositionImpl) pos;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullOffloadService.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullOffloadService.java
new file mode 100644
index 0000000000000..e07ed60115419
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullOffloadService.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.OffloadService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+
+public class NullOffloadService implements OffloadService {
+ public static NullOffloadService INSTANCE = new NullOffloadService();
+
+ @Override
+ public String getOffloadDriverName() {
+ return "NullOffloadService";
+ }
+
+ @Override
+ public CompletableFuture initialize() {
+ CompletableFuture promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture offload(String topic) {
+ CompletableFuture promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ CompletableFuture promise = new CompletableFuture<>();
+ promise.completeExceptionally(new UnsupportedOperationException());
+ return promise;
+ }
+
+ @Override
+ public OffloadPoliciesImpl getOffloadPolicies() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ return;
+ }
+
+ @Override
+ public CompletableFuture readOffloaded(long ledgerId, String managedLedgerName) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture readOffloaded(long ledgerId, String managedLedgerName,
+ OrderedExecutor executor,
+ ServiceConfiguration conf) {
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 30df8e64aadb3..8a6c28f760188 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
Pulsar
Pulsar is a distributed pub-sub messaging platform with a very
diff --git a/pulsar-broker-auth-athenz/pom.xml b/pulsar-broker-auth-athenz/pom.xml
index c6ff0570e1bde..12334229eb6eb 100644
--- a/pulsar-broker-auth-athenz/pom.xml
+++ b/pulsar-broker-auth-athenz/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-broker-auth-athenz
diff --git a/pulsar-broker-auth-sasl/pom.xml b/pulsar-broker-auth-sasl/pom.xml
index 660097361abad..e36e979cb2849 100644
--- a/pulsar-broker-auth-sasl/pom.xml
+++ b/pulsar-broker-auth-sasl/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-broker-auth-sasl
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 77757b42fa6d8..9d892dd69dd0c 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-broker-common
diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml
index 289105f78f2f8..3169e37cc9628 100644
--- a/pulsar-broker-shaded/pom.xml
+++ b/pulsar-broker-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 60b175c3d4ecd..ebed261998bee 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 56d8b2b8ad907..fabdd282bf9e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -73,9 +73,14 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.OffloadService;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NullOffloadService;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -108,6 +113,7 @@
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
@@ -207,7 +213,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OrderedScheduler offloaderScheduler;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
+ private OffloadService defaultOffloadService;
private Map ledgerOffloaderMap = new ConcurrentHashMap<>();
+ private Map offloadServiceMap = new ConcurrentHashMap<>();
private ScheduledFuture> loadReportTask = null;
private ScheduledFuture> loadSheddingTask = null;
private ScheduledFuture> loadResourceQuotaTask = null;
@@ -274,6 +282,8 @@ public enum State {
private Map advertisedListeners;
private NamespaceName heartbeatNamespaceV2;
+ private PrometheusMetricsProvider statsProvider;
+
public PulsarService(ServiceConfiguration config) {
this(config, Optional.empty(), (exitCode) -> {
LOG.info("Process termination requested with code {}. "
@@ -324,6 +334,8 @@ public PulsarService(ServiceConfiguration config,
this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
new DefaultThreadFactory("pulsar-io"));
+
+ this.statsProvider = new PrometheusMetricsProvider();
}
public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
@@ -383,6 +395,10 @@ public CompletableFuture closeAsync() {
this.resourceUsageTransportManager = null;
}
+ if (statsProvider != null) {
+ statsProvider.stop();
+ }
+
if (this.webService != null) {
try {
this.webService.close();
@@ -661,8 +677,22 @@ config, localMetadataStore, getZkClient(),
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers());
+ Configuration configuration = new BaseConfiguration();
+ configuration.setProperty(PrometheusMetricsProvider.CLUSTER_NAME, config.getClusterName());
+ this.statsProvider.start(configuration);
+
this.defaultOffloader = createManagedLedgerOffloader(
OffloadPoliciesImpl.create(this.getConfiguration().getProperties()));
+ this.defaultOffloadService = createOffloadService(config,
+ OffloadPoliciesImpl.create(this.getConfiguration().getProperties()),
+ client,
+ adminClient,
+ getBookKeeperClient(),
+ orderedExecutor,
+ offloaderScheduler,
+ statsProvider.getStatsLogger("offload_service"));
+ addPrometheusRawMetricsProvider(statsProvider);
+
this.brokerInterceptor = BrokerInterceptors.load(config);
brokerService.setInterceptor(getBrokerInterceptor());
this.brokerInterceptor.initialize(this);
@@ -1019,6 +1049,10 @@ public void waitUntilClosed() throws InterruptedException {
}
}
+ public PrometheusMetricsProvider getStatsProvider() {
+ return statsProvider;
+ }
+
protected void startNamespaceService() throws PulsarServerException {
LOG.info("Starting name space service, bootstrap namespaces=" + config.getBootstrapNamespaces());
@@ -1201,12 +1235,44 @@ public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, Of
return createManagedLedgerOffloader(offloadPolicies);
}
} catch (PulsarServerException e) {
- LOG.error("create ledgerOffloader failed for namespace {}", namespaceName.toString(), e);
+ LOG.error("create ledger Offloader failed for namespace {}", namespaceName.toString(), e);
return new NullLedgerOffloader();
}
});
}
+ public OffloadService getOffloadService(NamespaceName namespaceName,
+ OffloadPoliciesImpl offloadPolicies,
+ ServiceConfiguration conf,
+ PulsarClient pulsarClient,
+ PulsarAdmin pulsarAdmin,
+ BookKeeper bkc,
+ OrderedExecutor executor,
+ OrderedScheduler scheduler,
+ StatsLogger statsLogger) {
+ if (offloadPolicies == null) {
+ return getDefaultOffloadService();
+ }
+
+ return offloadServiceMap.compute(namespaceName, (ns, offloadService) -> {
+ try {
+ if (offloadService != null && Objects.equals(offloadService.getOffloadPolicies(), offloadPolicies)) {
+ return offloadService;
+ } else {
+ if (offloadService != null) {
+ offloadService.close();
+ }
+
+ return createOffloadService(conf, offloadPolicies,
+ pulsarClient, pulsarAdmin, bkc, executor, scheduler, statsLogger);
+ }
+ } catch (PulsarServerException e) {
+ LOG.error("create offload service failed for namespace {}", namespaceName, e);
+ return new NullOffloadService();
+ }
+ });
+ }
+
public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies)
throws PulsarServerException {
try {
@@ -1219,8 +1285,9 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies
LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
+
try {
- return offloaderFactory.create(
+ Object offloader = offloaderFactory.create(
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
@@ -1228,16 +1295,71 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies
),
schemaStorage,
getOffloaderScheduler(offloadPolicies));
+
+ if (offloader instanceof LedgerOffloader) {
+ return (LedgerOffloader) offloader;
+ }
} catch (IOException ioe) {
throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
}
} else {
LOG.info("No ledger offloader configured, using NULL instance");
- return NullLedgerOffloader.INSTANCE;
}
} catch (Throwable t) {
throw new PulsarServerException(t);
}
+
+ return NullLedgerOffloader.INSTANCE;
+ }
+
+ public synchronized OffloadService createOffloadService(ServiceConfiguration conf,
+ OffloadPoliciesImpl offloadPolicies,
+ PulsarClient pulsarClient,
+ PulsarAdmin pulsarAdmin,
+ BookKeeper bkc,
+ OrderedExecutor executor,
+ OrderedScheduler scheduler,
+ StatsLogger statsLogger)
+ throws PulsarServerException {
+ try {
+ if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
+ checkNotNull(offloadPolicies.getOffloadersDirectory(),
+ "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
+ offloadPolicies.getManagedLedgerOffloadDriver());
+
+ Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+ offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+ LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
+ offloadPolicies.getManagedLedgerOffloadDriver());
+
+ try {
+ Object offloader = offloaderFactory.create(conf,
+ offloadPolicies,
+ ImmutableMap.of(
+ LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
+ LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
+ ),
+ pulsarClient,
+ pulsarAdmin,
+ bkc,
+ executor,
+ scheduler,
+ statsLogger);
+ if (offloader instanceof OffloadService) {
+ return (OffloadService) offloader;
+ }
+ } catch (IOException ioe) {
+ throw new PulsarServerException(ioe.getMessage(), ioe.getCause());
+ }
+ } else {
+ LOG.info("No ledger offloader configured, using NULL instance");
+ }
+ } catch (Throwable t) {
+ throw new PulsarServerException(t);
+ }
+
+ return NullOffloadService.INSTANCE;
}
private SchemaStorage createAndStartSchemaStorage() throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b124e404481cd..181bc7d6bb763 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3547,6 +3547,21 @@ protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messa
}
}
+ protected void internalTriggerOffloadService(boolean authoritative, String operationType) {
+ validateTopicOwnership(topicName, authoritative);
+ validateTopicOperation(topicName, TopicOperation.OFFLOAD);
+
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ try {
+ topic.triggerOffloadService(operationType);
+ } catch (AlreadyRunningException e) {
+ throw new RestException(Status.CONFLICT, e.getMessage());
+ } catch (Exception e) {
+ log.warn("Unexpected error triggering offload", e);
+ throw new RestException(e);
+ }
+ }
+
protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.OFFLOAD);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 3b2ba8d670ce6..d216d29fc083d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -43,6 +43,7 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -2631,6 +2632,38 @@ public OffloadProcessStatus offloadStatus(
return internalOffloadStatus(authoritative);
}
+ @PUT
+ @Path("/{tenant}/{namespace}/{topic}/offloadService")
+ @ApiOperation(value = "Offload a topic to long term storage")
+ @ApiResponses(value = {
+ @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+ @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ + "subscriber is not authorized to access this operation"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Topic does not exist"),
+ @ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
+ @ApiResponse(code = 409, message = "Offload already running"),
+ @ApiResponse(code = 412, message = "Topic name is not valid"),
+ @ApiResponse(code = 500, message = "Internal server error"),
+ @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+ public void triggerOffloadService(
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Is authentication required to perform this operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ String operationType) {
+ if (StringUtils.isBlank(operationType)
+ || (!"start".equals(operationType) && !"stop".equals(operationType) && !"status".equals(operationType))) {
+ throw new RestException(Response.Status.BAD_REQUEST, "operation type is invalid");
+ }
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalTriggerOffloadService(authoritative, operationType);
+ }
+
@GET
@Path("/{tenant}/{namespace}/{topic}/lastMessageId")
@ApiOperation(value = "Return the last commit message id of topic")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d37dfb6a08105..10425a144e4e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -86,6 +86,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.OffloadService;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -1500,6 +1501,13 @@ public CompletableFuture getManagedLedgerConfig(TopicName t
LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+
+ OffloadService topicLevelOffloadService =
+ pulsar().createOffloadService(serviceConfig, offloadPolicies, pulsar.getClient(),
+ pulsar.getAdminClient(), pulsar.getBookKeeperClient(), pulsar.getOrderedExecutor(),
+ pulsar.getOffloaderScheduler(),
+ pulsar.getStatsProvider().getStatsLogger("offload_service"));
+ managedLedgerConfig.setOffloadService(topicLevelOffloadService);
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
@@ -1507,6 +1515,15 @@ public CompletableFuture getManagedLedgerConfig(TopicName t
//If the topic level policy is null, use the namespace level
managedLedgerConfig
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ try {
+ managedLedgerConfig.setOffloadService(pulsar.getOffloadService(namespace, offloadPolicies,
+ serviceConfig, pulsar.getClient(), pulsar.getAdminClient(),
+ pulsar.getBookKeeperClient(),
+ pulsar.getOrderedExecutor(), pulsar.getOffloaderScheduler(),
+ pulsar.getStatsProvider().getStatsLogger("offload_service")));
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
+ }
}
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e6cf77fde3466..2242a2998ab93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2832,6 +2832,25 @@ public void offloadFailed(ManagedLedgerException exception, Object ctx) {
}
}
+ public synchronized void triggerOffloadService(String operationType) throws AlreadyRunningException {
+ log.info("[{}] Starting offload operation {}", topic, operationType);
+ CompletableFuture promise = new CompletableFuture<>();
+ getManagedLedger().asyncOffloadService(topic, operationType,
+ new AsyncCallbacks.OffloadServiceCallback() {
+ @Override
+ public void offloadComplete(Object ctx) {
+ log.info("[{}] {} offload service succeed.", topic, operationType);
+ promise.complete(null);
+ }
+
+ @Override
+ public void offloadFailed(ManagedLedgerException exception, Object ctx) {
+ log.warn("[{}] {} offload service failed", topic, operationType, exception);
+ promise.completeExceptionally(exception);
+ }
+ }, null);
+ }
+
public synchronized OffloadProcessStatus offloadStatus() {
if (!currentOffload.isDone()) {
return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
index 0e59286861a40..b44a9d1e43b1d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java
@@ -34,11 +34,13 @@
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
/**
* A Prometheus based {@link StatsProvider} implementation.
*/
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
private ScheduledExecutorService executor;
public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
@@ -129,4 +131,12 @@ void rotateLatencyCollection() {
metric.rotateLatencyCollection();
});
}
+
+ @Override
+ public void generate(SimpleTextOutputStream writer) {
+ gauges.forEach((sc, gauge) -> PrometheusTextFormatUtilV2.writeGauge(writer, sc, cluster, gauge));
+ counters.forEach((sc, counter) -> PrometheusTextFormatUtilV2.writeCounter(writer, sc, cluster, counter));
+ opStats.forEach((sc, opStatLogger) ->
+ PrometheusTextFormatUtilV2.writeOpStat(writer, sc, cluster, opStatLogger));
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtilV2.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtilV2.java
new file mode 100644
index 0000000000000..7a537b18be3ea
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtilV2.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import java.util.Enumeration;
+import java.util.Map;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Logic to write metrics in Prometheus text format.
+ */
+public class PrometheusTextFormatUtilV2 {
+ public static void writeGauge(SimpleTextOutputStream w, String name, String cluster, SimpleGauge extends Number> gauge) {
+ // Example:
+ // # TYPE bookie_storage_entries_count gauge
+ // bookie_storage_entries_count 519
+ w.write("# TYPE ").write(name).write(" gauge\n");
+ w.write(name);
+ w.write(' ').write(gauge.getSample().toString()).write('\n');
+
+ }
+
+ public static void writeCounter(SimpleTextOutputStream w, String name, String cluster, LongAdderCounter counter) {
+ // Example:
+ // # TYPE jvm_threads_started_total counter
+ // jvm_threads_started_total 59
+ w.write("# TYPE ").write(name).write(" counter\n");
+ w.write(name);
+ w.write(' ').write(counter.get().toString()).write('\n');
+ }
+
+ public static void writeOpStat(SimpleTextOutputStream w, String name, String cluster, DataSketchesOpStatsLogger opStat) {
+ // Example:
+ // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN
+ // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0
+ // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902
+ // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902
+ // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0
+ // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002
+ w.write("# TYPE ").write(name).write(" summary\n");
+ writeQuantile(w, opStat, name, false, 0.5);
+ writeQuantile(w, opStat, name, false, 0.75);
+ writeQuantile(w, opStat, name, false, 0.95);
+ writeQuantile(w, opStat, name, false, 0.99);
+ writeQuantile(w, opStat, name, false, 0.999);
+ writeQuantile(w, opStat, name, false, 0.9999);
+ writeQuantile(w, opStat, name, false, 1.0);
+ writeCount(w, opStat, name, false);
+ writeSum(w, opStat, name, false);
+
+ writeQuantile(w, opStat, name, true, 0.5);
+ writeQuantile(w, opStat, name, true, 0.75);
+ writeQuantile(w, opStat, name, true, 0.95);
+ writeQuantile(w, opStat, name, true, 0.99);
+ writeQuantile(w, opStat, name, true, 0.999);
+ writeQuantile(w, opStat, name, true, 0.9999);
+ writeQuantile(w, opStat, name, true, 1.0);
+ writeCount(w, opStat, name, true);
+ writeSum(w, opStat, name, true);
+ }
+
+ private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+ Boolean success, double quantile) {
+ w.write(name)
+ .write("{success=\"").write(success.toString())
+ .write("\",quantile=\"").write(Double.toString(quantile));
+ w.write("\"");
+ w.write("} ")
+ .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n');
+ }
+
+ private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+ Boolean success) {
+ w.write(name).write("_count{success=\"").write(success.toString());
+ w.write("\"");
+ w.write("} ")
+ .write(Long.toString(opStat.getCount(success))).write('\n');
+ }
+
+ private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+ Boolean success) {
+ w.write(name).write("_sum{success=\"").write(success.toString());
+ w.write("\"");
+ w.write("} ")
+ .write(Double.toString(opStat.getSum(success))).write('\n');
+ }
+
+ public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) {
+ Enumeration metricFamilySamples = registry.metricFamilySamples();
+ while (metricFamilySamples.hasMoreElements()) {
+ MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
+
+ for (int i = 0; i < metricFamily.samples.size(); i++) {
+ Sample sample = metricFamily.samples.get(i);
+ w.write(sample.name);
+ w.write('{');
+ for (int j = 0; j < sample.labelNames.size(); j++) {
+ if (j != 0) {
+ w.write(", ");
+ }
+ w.write(sample.labelNames.get(j));
+ w.write("=\"");
+ w.write(sample.labelValues.get(j));
+ w.write('"');
+ }
+
+ w.write("} ");
+ w.write(Collector.doubleToGoString(sample.value));
+ w.write('\n');
+ }
+ }
+ }
+
+ private static void writeLabels(SimpleTextOutputStream w, Map labels) {
+ if (labels.isEmpty()) {
+ return;
+ }
+
+ w.write('{');
+ writeLabelsNoBraces(w, labels);
+ w.write('}');
+ }
+
+ private static void writeLabelsNoBraces(SimpleTextOutputStream w, Map labels) {
+ if (labels.isEmpty()) {
+ return;
+ }
+
+ boolean isFirst = true;
+ for (Map.Entry e : labels.entrySet()) {
+ if (!isFirst) {
+ w.write(',');
+ }
+ isFirst = false;
+ w.write(e.getKey())
+ .write("=\"")
+ .write(e.getValue())
+ .write('"');
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 217dd5ccc8529..a12127439eb3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -207,9 +207,10 @@ void messageReceived(MessageIdData messageId, int redeliveryCount,
log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription,
messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition());
}
+
incomingRawMessages.add(
- new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx));
- tryCompletePending();
+ new RawMessageAndCnx(new RawMessageImpl(messageId, headersAndPayload), cnx));
+ internalPinnedExecutor.execute(this::tryCompletePending);
}
}
diff --git a/pulsar-client-1x-base/pom.xml b/pulsar-client-1x-base/pom.xml
index 6078062dc1e69..ef2b93fcd6ad2 100644
--- a/pulsar-client-1x-base/pom.xml
+++ b/pulsar-client-1x-base/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-1x-base/pulsar-client-1x/pom.xml b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
index 59cf04834338d..bf803ce60eec2 100644
--- a/pulsar-client-1x-base/pulsar-client-1x/pom.xml
+++ b/pulsar-client-1x-base/pulsar-client-1x/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-client-1x-base
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
index 1f3346fd0da53..92f51c88a0585 100644
--- a/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
+++ b/pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-client-1x-base
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-admin-api/pom.xml b/pulsar-client-admin-api/pom.xml
index 60840c9f37f44..69e601352b028 100644
--- a/pulsar-client-admin-api/pom.xml
+++ b/pulsar-client-admin-api/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index daab4094b1258..9e501464153c1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1575,7 +1575,22 @@ void createSubscription(String topic, String subscriptionName, MessageId message
void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException;
/**
- * Trigger offloading messages in topic to longterm storage asynchronously.
+ * start, stop or get status of the offload service.
+ * @param topic
+ * @param operationType
+ * @throws PulsarAdminException
+ */
+ void offloadService(String topic, String operationType) throws PulsarAdminException;
+
+ /**
+ * start, stop or get status of the offload service asynchronously.
+ * @param topic
+ * @param operationType
+ * @return
+ */
+ CompletableFuture offloadServiceAsync(String topic, String operationType);
+ /**
+ * Trigger offloading messages in topic to long term storage asynchronously.
*
* @param topic the topic to offload
* @param messageId ID of maximum message which should be offloaded
diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index faebb94d05622..c6e37558c232d 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index f6db9d39ee693..accbd8f0a4770 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 57f36995a8fdf..45c9348b1f4fc 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1528,6 +1528,28 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public void offloadService(String topic, String operationType) throws PulsarAdminException {
+ try {
+ offloadServiceAsync(topic, operationType).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture offloadServiceAsync(String topic, String operationType) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "offloadService");
+ final CompletableFuture future = new CompletableFuture<>();
+ return asyncPutRequest(path, Entity.entity(operationType, MediaType.APPLICATION_JSON));
+ }
+
@Override
public OffloadProcessStatus offloadStatus(String topic)
throws PulsarAdminException {
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 2722b42911976..bec37df85ffd0 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-api/pom.xml b/pulsar-client-api/pom.xml
index c9ef5d87e4106..bf511f05d9a0a 100644
--- a/pulsar-client-api/pom.xml
+++ b/pulsar-client-api/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-auth-athenz/pom.xml b/pulsar-client-auth-athenz/pom.xml
index 40f5f8407b8d5..0009f341f4353 100644
--- a/pulsar-client-auth-athenz/pom.xml
+++ b/pulsar-client-auth-athenz/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-auth-sasl/pom.xml b/pulsar-client-auth-sasl/pom.xml
index 020af46f5f855..6ef2809172729 100644
--- a/pulsar-client-auth-sasl/pom.xml
+++ b/pulsar-client-auth-sasl/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-messagecrypto-bc/pom.xml b/pulsar-client-messagecrypto-bc/pom.xml
index 9f9528b58e80e..6e91a29db3324 100644
--- a/pulsar-client-messagecrypto-bc/pom.xml
+++ b/pulsar-client-messagecrypto-bc/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 97002332870b7..f952b0d85f3c4 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-tools-test/pom.xml b/pulsar-client-tools-test/pom.xml
index 2397289149c1e..7e820ed2c7cc9 100644
--- a/pulsar-client-tools-test/pom.xml
+++ b/pulsar-client-tools-test/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 9252d603f32b4..8d4e13bec0390 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index d91a880b3926d..7dd825bcd052f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -23,6 +23,7 @@
import java.util.Set;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -63,6 +64,15 @@ static String validatePersistentTopic(List params) {
return topicName.toString();
}
+ static String validateOperationType(String type) {
+ if (StringUtils.isBlank(type) ||
+ (!"start".equals(type) && !"stop".equals(type) && !"status".equals(type))) {
+ throw new ParameterException("Invalid operation type. Only support start, stop or status");
+ }
+
+ return type;
+ }
+
static String validateNonPersistentTopic(List params) {
String topic = checkArgument(params);
TopicName topicName = TopicName.get(topic);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index c541cc1a5752a..9160e91440301 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -122,6 +122,7 @@ public CmdTopics(Supplier admin) {
jcommander.addCommand("compact", new Compact());
jcommander.addCommand("compaction-status", new CompactionStatusCmd());
jcommander.addCommand("offload", new Offload());
+ jcommander.addCommand("offload-service", new OffloadService());
jcommander.addCommand("offload-status", new OffloadStatusCmd());
jcommander.addCommand("last-message-id", new GetLastMessageId());
jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap());
@@ -1130,6 +1131,25 @@ void run() throws PulsarAdminException {
}
}
+ @Parameters(commandDescription = "Start or stop offload service")
+ private class OffloadService extends CliCommand {
+ @Parameter(description = "persistent://tenent/namespace/topic", required = true)
+ private List params;
+
+ @Parameter(names = {"-t", "--type"},
+ description = "The operation type of offload service. (start, stop, status)")
+ private String type;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ String operationType = validateOperationType(type);
+
+ getTopics().offloadService(persistentTopic, operationType);
+ System.out.println("Offload Service for " + persistentTopic + " operation " + operationType);
+ }
+ }
+
@Parameters(commandDescription = "Check the status of data offloading from a topic to long-term storage")
private class OffloadStatusCmd extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 089ac49c5d579..da97a618463cf 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 42c823a7f8615..e2a1cfc3f0f02 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-config-validation/pom.xml b/pulsar-config-validation/pom.xml
index afbc14ec0eec1..bddce61201449 100644
--- a/pulsar-config-validation/pom.xml
+++ b/pulsar-config-validation/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml
index 6e42f63c66acc..92b2e579be9a7 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions-api
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index efbb8fb6229c4..9b836b8417cd0 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions-instance
diff --git a/pulsar-functions/java-examples/pom.xml b/pulsar-functions/java-examples/pom.xml
index ad37c3588e0fe..f9a22841f02be 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions-api-examples
diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
index 254ed1437d129..205bc369f31f4 100644
--- a/pulsar-functions/localrun-shaded/pom.xml
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-functions/localrun/pom.xml b/pulsar-functions/localrun/pom.xml
index b5c22b5cff2af..a33c2966ee24e 100644
--- a/pulsar-functions/localrun/pom.xml
+++ b/pulsar-functions/localrun/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 36591e6f2f194..440b7d336d35b 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions
diff --git a/pulsar-functions/proto/pom.xml b/pulsar-functions/proto/pom.xml
index 871a07a9658dc..10b23ae232113 100644
--- a/pulsar-functions/proto/pom.xml
+++ b/pulsar-functions/proto/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions-proto
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index bf01c32367513..3687b5a90a4e0 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 2037b5d3f6194..ef81360a79431 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions-runtime
diff --git a/pulsar-functions/secrets/pom.xml b/pulsar-functions/secrets/pom.xml
index 30c8375a0c7b9..37903954d75e5 100644
--- a/pulsar-functions/secrets/pom.xml
+++ b/pulsar-functions/secrets/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions-secrets
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 676a983f20dc2..fc53d6c998661 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-functions-utils
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 8bc57600a8759..2dcafc0084ddf 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-functions
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-io/aerospike/pom.xml b/pulsar-io/aerospike/pom.xml
index 8bc73937e2029..2e47fca518bbf 100644
--- a/pulsar-io/aerospike/pom.xml
+++ b/pulsar-io/aerospike/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-aerospike
diff --git a/pulsar-io/aws/pom.xml b/pulsar-io/aws/pom.xml
index 176d920631e2f..e61ffab0dec9b 100644
--- a/pulsar-io/aws/pom.xml
+++ b/pulsar-io/aws/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-aws
diff --git a/pulsar-io/batch-data-generator/pom.xml b/pulsar-io/batch-data-generator/pom.xml
index 6ce7e8c415c03..c9980d8ebb555 100644
--- a/pulsar-io/batch-data-generator/pom.xml
+++ b/pulsar-io/batch-data-generator/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-batch-data-generator
diff --git a/pulsar-io/batch-discovery-triggerers/pom.xml b/pulsar-io/batch-discovery-triggerers/pom.xml
index b66f5bddd275b..3fd6f0c9421af 100644
--- a/pulsar-io/batch-discovery-triggerers/pom.xml
+++ b/pulsar-io/batch-discovery-triggerers/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-batch-discovery-triggerers
diff --git a/pulsar-io/canal/pom.xml b/pulsar-io/canal/pom.xml
index 4c5b57d67fbbd..10b10ef5f7c95 100644
--- a/pulsar-io/canal/pom.xml
+++ b/pulsar-io/canal/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/pulsar-io/cassandra/pom.xml b/pulsar-io/cassandra/pom.xml
index 311197c9f2756..c4050e2d29f56 100644
--- a/pulsar-io/cassandra/pom.xml
+++ b/pulsar-io/cassandra/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-cassandra
diff --git a/pulsar-io/common/pom.xml b/pulsar-io/common/pom.xml
index d4a711ed01186..73c4401a04727 100644
--- a/pulsar-io/common/pom.xml
+++ b/pulsar-io/common/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-common
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 917981f55658c..97c4765eda9b2 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-core
diff --git a/pulsar-io/data-generator/pom.xml b/pulsar-io/data-generator/pom.xml
index 1a49c3eb32b6e..d71ac961d0d82 100644
--- a/pulsar-io/data-generator/pom.xml
+++ b/pulsar-io/data-generator/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-data-generator
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index 5401ace104fc4..936c6f092b555 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-debezium-core
diff --git a/pulsar-io/debezium/mongodb/pom.xml b/pulsar-io/debezium/mongodb/pom.xml
index c6c8c58f95e27..6783c93b03809 100644
--- a/pulsar-io/debezium/mongodb/pom.xml
+++ b/pulsar-io/debezium/mongodb/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-debezium-mongodb
diff --git a/pulsar-io/debezium/mssql/pom.xml b/pulsar-io/debezium/mssql/pom.xml
index 0a0f831bef961..7e521db84e8a1 100644
--- a/pulsar-io/debezium/mssql/pom.xml
+++ b/pulsar-io/debezium/mssql/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-debezium-mssql
diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml
index c963ae626c7e4..d7a3e92b851f4 100644
--- a/pulsar-io/debezium/mysql/pom.xml
+++ b/pulsar-io/debezium/mysql/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-debezium-mysql
diff --git a/pulsar-io/debezium/oracle/pom.xml b/pulsar-io/debezium/oracle/pom.xml
index cde53aeb6359b..ecba47f7294a4 100644
--- a/pulsar-io/debezium/oracle/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-debezium-oracle
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 79b92b89999c2..5b7dcb0431421 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-debezium
diff --git a/pulsar-io/debezium/postgres/pom.xml b/pulsar-io/debezium/postgres/pom.xml
index 5bcf07941e921..fb7c955a2943e 100644
--- a/pulsar-io/debezium/postgres/pom.xml
+++ b/pulsar-io/debezium/postgres/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io-debezium
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-debezium-postgres
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 38c8b53a6bd0b..6f78ec8a2591e 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-docs
diff --git a/pulsar-io/dynamodb/pom.xml b/pulsar-io/dynamodb/pom.xml
index 0a173aa8650e8..8352b67f32d3e 100644
--- a/pulsar-io/dynamodb/pom.xml
+++ b/pulsar-io/dynamodb/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-dynamodb
diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml
index 180e338099f7f..99d27638640f6 100644
--- a/pulsar-io/elastic-search/pom.xml
+++ b/pulsar-io/elastic-search/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-elastic-search
Pulsar IO :: ElasticSearch
diff --git a/pulsar-io/file/pom.xml b/pulsar-io/file/pom.xml
index 4a7a2a335e4b7..0c2b054836c30 100644
--- a/pulsar-io/file/pom.xml
+++ b/pulsar-io/file/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-file
diff --git a/pulsar-io/flume/pom.xml b/pulsar-io/flume/pom.xml
index b259cbb1de580..414be6050ec7b 100644
--- a/pulsar-io/flume/pom.xml
+++ b/pulsar-io/flume/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-flume
diff --git a/pulsar-io/hbase/pom.xml b/pulsar-io/hbase/pom.xml
index cde101487ff3e..b45ef84b3236b 100644
--- a/pulsar-io/hbase/pom.xml
+++ b/pulsar-io/hbase/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-hbase
Pulsar IO :: Hbase
diff --git a/pulsar-io/hdfs2/pom.xml b/pulsar-io/hdfs2/pom.xml
index 2f3e178a9de09..78c78daaea4a9 100644
--- a/pulsar-io/hdfs2/pom.xml
+++ b/pulsar-io/hdfs2/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-hdfs2
Pulsar IO :: Hdfs2
diff --git a/pulsar-io/hdfs3/pom.xml b/pulsar-io/hdfs3/pom.xml
index d6dabf60355ba..32f4b053363f5 100644
--- a/pulsar-io/hdfs3/pom.xml
+++ b/pulsar-io/hdfs3/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-hdfs3
Pulsar IO :: Hdfs3
diff --git a/pulsar-io/influxdb/pom.xml b/pulsar-io/influxdb/pom.xml
index 3fa60cd1807d7..61ed06052efc8 100644
--- a/pulsar-io/influxdb/pom.xml
+++ b/pulsar-io/influxdb/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-influxdb
diff --git a/pulsar-io/jdbc/clickhouse/pom.xml b/pulsar-io/jdbc/clickhouse/pom.xml
index b1fcbc010f996..0514565d9cb79 100644
--- a/pulsar-io/jdbc/clickhouse/pom.xml
+++ b/pulsar-io/jdbc/clickhouse/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 2db6fb2ef7d99..7f5a95873952d 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/pulsar-io/jdbc/mariadb/pom.xml b/pulsar-io/jdbc/mariadb/pom.xml
index 483a855523e08..56ab5160ea057 100644
--- a/pulsar-io/jdbc/mariadb/pom.xml
+++ b/pulsar-io/jdbc/mariadb/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/pulsar-io/jdbc/pom.xml b/pulsar-io/jdbc/pom.xml
index 5f95ff1e50211..23f4a4197d634 100644
--- a/pulsar-io/jdbc/pom.xml
+++ b/pulsar-io/jdbc/pom.xml
@@ -32,7 +32,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-jdbc
diff --git a/pulsar-io/jdbc/postgres/pom.xml b/pulsar-io/jdbc/postgres/pom.xml
index 127c6f72ef6fa..a933e64cff43d 100644
--- a/pulsar-io/jdbc/postgres/pom.xml
+++ b/pulsar-io/jdbc/postgres/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/pulsar-io/jdbc/sqlite/pom.xml b/pulsar-io/jdbc/sqlite/pom.xml
index e1d65e87302bf..154f08e2584d2 100644
--- a/pulsar-io/jdbc/sqlite/pom.xml
+++ b/pulsar-io/jdbc/sqlite/pom.xml
@@ -24,7 +24,7 @@
pulsar-io-jdbc
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
pulsar-io-jdbc-sqlite
diff --git a/pulsar-io/kafka-connect-adaptor-nar/pom.xml b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
index af7f025330897..4e852c6604fb6 100644
--- a/pulsar-io/kafka-connect-adaptor-nar/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-kafka-connect-adaptor-nar
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 4df689dfff5d2..eac51953d6e0e 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-kafka-connect-adaptor
diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml
index f3060d71166bc..9ffa4f2992e36 100644
--- a/pulsar-io/kafka/pom.xml
+++ b/pulsar-io/kafka/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-kafka
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 5d6166e13ffad..b3694b27127ee 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-kinesis
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 1678a9294dc89..807137e101c4b 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-mongo
diff --git a/pulsar-io/netty/pom.xml b/pulsar-io/netty/pom.xml
index 2bf4d0baf3452..d21fdf94d4b68 100644
--- a/pulsar-io/netty/pom.xml
+++ b/pulsar-io/netty/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-netty
diff --git a/pulsar-io/nsq/pom.xml b/pulsar-io/nsq/pom.xml
index bdf1081106964..5e166757d4913 100644
--- a/pulsar-io/nsq/pom.xml
+++ b/pulsar-io/nsq/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-nsq
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 5974289444586..bd2816ce1cc20 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index e573833baba2d..f7add9141935e 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-rabbitmq
diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml
index 7291f3e88f29e..d9e2dcdf76986 100644
--- a/pulsar-io/redis/pom.xml
+++ b/pulsar-io/redis/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-redis
diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml
index 76fcf52c380fa..9437e63c86aea 100644
--- a/pulsar-io/solr/pom.xml
+++ b/pulsar-io/solr/pom.xml
@@ -25,7 +25,7 @@
pulsar-io
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml
index 780e216c96484..8cd19c28b11c2 100644
--- a/pulsar-io/twitter/pom.xml
+++ b/pulsar-io/twitter/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar-io
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-io-twitter
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 809def86ed1dc..497006b0d0033 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-package-management/bookkeeper-storage/pom.xml b/pulsar-package-management/bookkeeper-storage/pom.xml
index e688a104a16f2..98789ae2ff8d3 100644
--- a/pulsar-package-management/bookkeeper-storage/pom.xml
+++ b/pulsar-package-management/bookkeeper-storage/pom.xml
@@ -25,7 +25,7 @@
pulsar-package-management
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/pulsar-package-management/core/pom.xml b/pulsar-package-management/core/pom.xml
index 168397dacb44d..c709d267eb116 100644
--- a/pulsar-package-management/core/pom.xml
+++ b/pulsar-package-management/core/pom.xml
@@ -25,7 +25,7 @@
pulsar-package-management
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/pulsar-package-management/pom.xml b/pulsar-package-management/pom.xml
index f6f776ce015a3..83bee31f49e37 100644
--- a/pulsar-package-management/pom.xml
+++ b/pulsar-package-management/pom.xml
@@ -25,7 +25,7 @@
pulsar
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
4.0.0
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index e132a5de6ace0..0964db509abbf 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -24,7 +24,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-proxy
diff --git a/pulsar-sql/pom.xml b/pulsar-sql/pom.xml
index 31ec8436754ff..dc8dc6bc4c6d7 100644
--- a/pulsar-sql/pom.xml
+++ b/pulsar-sql/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-sql
diff --git a/pulsar-sql/presto-distribution/pom.xml b/pulsar-sql/presto-distribution/pom.xml
index e3f0ae3b51475..c01f61f907cbb 100644
--- a/pulsar-sql/presto-distribution/pom.xml
+++ b/pulsar-sql/presto-distribution/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-sql
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-presto-distribution
diff --git a/pulsar-sql/presto-pulsar-plugin/pom.xml b/pulsar-sql/presto-pulsar-plugin/pom.xml
index b1f9486286769..cf79ac8d2e661 100644
--- a/pulsar-sql/presto-pulsar-plugin/pom.xml
+++ b/pulsar-sql/presto-pulsar-plugin/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-sql
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-presto-connector
diff --git a/pulsar-sql/presto-pulsar/pom.xml b/pulsar-sql/presto-pulsar/pom.xml
index 48376baa59270..64c95f827b00a 100644
--- a/pulsar-sql/presto-pulsar/pom.xml
+++ b/pulsar-sql/presto-pulsar/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar-sql
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-presto-connector-original
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 9a64c055d07ac..e9d0e69723e29 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -165,7 +165,7 @@ private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPo
offloadPolicies.getManagedLedgerOffloadDriver());
try {
- return offloaderFactory.create(
+ return (LedgerOffloader) offloaderFactory.create(
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml
index c31872944da0d..227a6273083d8 100644
--- a/pulsar-testclient/pom.xml
+++ b/pulsar-testclient/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-transaction/common/pom.xml b/pulsar-transaction/common/pom.xml
index ddc76bebb24e9..68e8b383f9564 100644
--- a/pulsar-transaction/common/pom.xml
+++ b/pulsar-transaction/common/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-transaction-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-transaction-common
diff --git a/pulsar-transaction/coordinator/pom.xml b/pulsar-transaction/coordinator/pom.xml
index b4ddf32ae19de..8acb057ee4cba 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar
pulsar-transaction-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-transaction-coordinator
diff --git a/pulsar-transaction/pom.xml b/pulsar-transaction/pom.xml
index e0b566a0c3434..d54ca2d60f2ba 100644
--- a/pulsar-transaction/pom.xml
+++ b/pulsar-transaction/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-transaction-parent
diff --git a/pulsar-websocket/pom.xml b/pulsar-websocket/pom.xml
index 2fff29d1c0165..a28635e1dad3a 100644
--- a/pulsar-websocket/pom.xml
+++ b/pulsar-websocket/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index e2c29fe00ae0b..7c8b11c034cee 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/structured-event-log/pom.xml b/structured-event-log/pom.xml
index e52e5b7350f2c..b4e259663da4b 100644
--- a/structured-event-log/pom.xml
+++ b/structured-event-log/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/testmocks/pom.xml b/testmocks/pom.xml
index b18cd2eb75c9b..823c1bb88bcc8 100644
--- a/testmocks/pom.xml
+++ b/testmocks/pom.xml
@@ -25,7 +25,7 @@
pulsar
org.apache.pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
testmocks
diff --git a/tests/bc_2_0_0/pom.xml b/tests/bc_2_0_0/pom.xml
index 80f166bb0c575..7d041c69d510b 100644
--- a/tests/bc_2_0_0/pom.xml
+++ b/tests/bc_2_0_0/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
bc_2_0_0
diff --git a/tests/bc_2_0_1/pom.xml b/tests/bc_2_0_1/pom.xml
index 9a61cb41d4f6e..6498365f4a2fd 100644
--- a/tests/bc_2_0_1/pom.xml
+++ b/tests/bc_2_0_1/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
bc_2_0_1
diff --git a/tests/bc_2_6_0/pom.xml b/tests/bc_2_6_0/pom.xml
index 559bc5fe6f658..a6e5d22300886 100644
--- a/tests/bc_2_6_0/pom.xml
+++ b/tests/bc_2_6_0/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index 20e71e217ce44..14c8b5d15a6e0 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar.tests
docker-images
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
java-test-functions
diff --git a/tests/docker-images/java-test-image/pom.xml b/tests/docker-images/java-test-image/pom.xml
index 46558ca492aa2..a82039bd7b735 100644
--- a/tests/docker-images/java-test-image/pom.xml
+++ b/tests/docker-images/java-test-image/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar.tests
docker-images
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
java-test-image
diff --git a/tests/docker-images/latest-version-image/pom.xml b/tests/docker-images/latest-version-image/pom.xml
index c9e16e115f056..a2d6ac4d89d14 100644
--- a/tests/docker-images/latest-version-image/pom.xml
+++ b/tests/docker-images/latest-version-image/pom.xml
@@ -23,7 +23,7 @@
org.apache.pulsar.tests
docker-images
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
4.0.0
latest-version-image
diff --git a/tests/docker-images/pom.xml b/tests/docker-images/pom.xml
index f2681751f3eda..2d85307d6ed4a 100644
--- a/tests/docker-images/pom.xml
+++ b/tests/docker-images/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
docker-images
Apache Pulsar :: Tests :: Docker Images
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 1314c15dcb45c..f1b54f920e4c7 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
integration
diff --git a/tests/pom.xml b/tests/pom.xml
index 512c1529b195f..8bae9e6b8445b 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
org.apache.pulsar.tests
tests-parent
diff --git a/tests/pulsar-client-admin-shade-test/pom.xml b/tests/pulsar-client-admin-shade-test/pom.xml
index a84660e326864..a81ec359279d2 100644
--- a/tests/pulsar-client-admin-shade-test/pom.xml
+++ b/tests/pulsar-client-admin-shade-test/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-client-admin-shade-test
diff --git a/tests/pulsar-client-all-shade-test/pom.xml b/tests/pulsar-client-all-shade-test/pom.xml
index 9bdd8fea40493..f3566775a02f6 100644
--- a/tests/pulsar-client-all-shade-test/pom.xml
+++ b/tests/pulsar-client-all-shade-test/pom.xml
@@ -26,7 +26,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-client-all-shade-test
diff --git a/tests/pulsar-client-shade-test/pom.xml b/tests/pulsar-client-shade-test/pom.xml
index 281cf4cd64c02..6937228698a68 100644
--- a/tests/pulsar-client-shade-test/pom.xml
+++ b/tests/pulsar-client-shade-test/pom.xml
@@ -27,7 +27,7 @@
org.apache.pulsar.tests
tests-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
pulsar-client-shade-test
diff --git a/tiered-storage/file-system/pom.xml b/tiered-storage/file-system/pom.xml
index f839ad30810ee..89b880f8e3cc7 100644
--- a/tiered-storage/file-system/pom.xml
+++ b/tiered-storage/file-system/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
tiered-storage-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index 716aed18b98f0..58a877d4cd1b6 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
tiered-storage-parent
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 907aba67b9ef3..d0c9bff9af39e 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -254,6 +254,12 @@ public void asyncOffloadPrefix(Position pos, AsyncCallbacks.OffloadCallback call
}
+ @Override
+ public void asyncOffloadService(String topic, String operationType,
+ AsyncCallbacks.OffloadServiceCallback callback, Object ctx) {
+
+ }
+
@Override
public ManagedCursor getSlowestConsumer() {
return null;
diff --git a/tiered-storage/pom.xml b/tiered-storage/pom.xml
index 5b8ae94a8e323..778f22474ebce 100644
--- a/tiered-storage/pom.xml
+++ b/tiered-storage/pom.xml
@@ -25,7 +25,7 @@
org.apache.pulsar
pulsar
- 2.9.2
+ 2.9.2-offload-SNAPSHOT
..