From 6971da0d92be4c049102d906dd67211437b65bb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5?= Date: Tue, 24 May 2022 16:36:20 +0800 Subject: [PATCH 1/3] =?UTF-8?q?1=E3=80=81readme=20=E6=96=87=E6=A1=A3?= =?UTF-8?q?=E6=9C=89=E8=AF=AF=202=E3=80=81=E5=90=8C=E6=AD=A5=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=88=B0=E7=99=BD=E5=90=8D=E5=8D=95=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E7=9A=84topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connectors/rocketmq-replicator/README.md | 2 +- .../java/org/apache/rocketmq/replicator/RmqSourceTask.java | 1 + .../connect/runtime/connectorwrapper/WorkerSourceTask.java | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/connectors/rocketmq-replicator/README.md b/connectors/rocketmq-replicator/README.md index 16b01c5e..cc1c7060 100644 --- a/connectors/rocketmq-replicator/README.md +++ b/connectors/rocketmq-replicator/README.md @@ -28,7 +28,7 @@ mvn clean install -Prelease-all -DskipTest -U 同步topic和消息 ```` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name} -?config={"connector-class":"org.apache.rocketmq.replicator.RmqSourceReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"} +?config={"connector-class":"org.apache.rocketmq.replicator.RmqSourceReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter"} ```` diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java index a8fc7d54..60e6c8d3 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java @@ -178,6 +178,7 @@ private List pollCommonMessage() { final Map properties = msg.getProperties(); final Set keys = properties.keySet(); keys.forEach(key -> connectRecord.addExtension(key, properties.get(key))); + connectRecord.addExtension("topic",taskTopicConfig.getTargetTopic()); res.add(connectRecord); } break; diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index 8e62ca3d..43b782c7 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -307,6 +307,10 @@ private void sendRecord() throws InterruptedException, RemotingException, MQClie log.error("Send record, message size is greater than {} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(sourceDataEntry)); continue; } + String targetTopic = sourceDataEntry.getExtension("topic"); + if (targetTopic != null){ + sourceMessage.setTopic(targetTopic); + } sourceMessage.setBody(messageBody); } } else { From 86c1f77e63d749f8f7a55e96c5e5c370a6242548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5?= Date: Fri, 27 May 2022 13:56:22 +0800 Subject: [PATCH 2/3] =?UTF-8?q?1=E3=80=81fix=E5=90=8C=E6=AD=A5=E6=B6=88?= =?UTF-8?q?=E6=81=AF=EF=BC=8C=E4=BF=9D=E6=8C=81=E6=AF=8F=E6=9D=A1=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=89=80=E5=9C=A8=E7=9A=84=E9=98=9F=E5=88=97=E4=B8=80?= =?UTF-8?q?=E8=87=B4=202=E3=80=81=E4=BC=98=E5=8C=96=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=BF=9B=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/replicator/MetaSourceTask.java | 50 +++++--- .../rocketmq/replicator/RmqSourceTask.java | 5 + .../rocketmq/replicator/common/Utils.java | 22 +++- .../replicator/config/TaskConfig.java | 18 +++ .../replicator/config/TaskConfigEnum.java | 2 + .../connectorwrapper/WorkerSourceTask.java | 119 ++++++++++++------ .../converter/RocketMQMetaConverter.java | 34 +++++ 7 files changed, 196 insertions(+), 54 deletions(-) create mode 100644 rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RocketMQMetaConverter.java diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java index 3f4f2da8..a8f2f2cb 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java @@ -30,11 +30,16 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.Iterator; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.replicator.common.Utils; import org.apache.rocketmq.replicator.config.ConfigUtil; import org.apache.rocketmq.replicator.config.TaskConfig; @@ -52,6 +57,7 @@ public class MetaSourceTask extends SourceTask { private final String taskId; private final TaskConfig config; private DefaultMQAdminExt srcMQAdminExt; + private DefaultMQAdminExt tarMQAdminExt; private volatile boolean started = false; private OffsetSyncStore store; @@ -77,6 +83,7 @@ public void start(SourceTaskContext sourceTaskContext) { try { this.srcMQAdminExt = Utils.startMQAdminTool(this.config); + this.tarMQAdminExt = Utils.startTarMQAdminTool(this.config); } catch (MQClientException e) { log.error("Replicator task start failed for `startMQAdminTool` exception.", e); throw new IllegalStateException("Replicator task start failed for `startMQAdminTool` exception."); @@ -92,6 +99,7 @@ public void stop() { started = false; } srcMQAdminExt.shutdown(); + tarMQAdminExt.shutdown(); } @Override @@ -120,28 +128,38 @@ public void resume() { List res = new ArrayList<>(); for (String group : groups) { ConsumeStats stats; + String brokerAddresMaster=""; + String brokerName=""; try { stats = this.srcMQAdminExt.examineConsumeStats(group); + ClusterInfo clusterInfo = this.tarMQAdminExt.examineBrokerClusterInfo(); + HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable(); + HashMap brokerAddrTable = clusterInfo.getBrokerAddrTable(); + Set clusterNameSet = clusterAddrTable.get(this.config.getTargetCluster()); + Iterator it = clusterNameSet.iterator(); + while (it.hasNext()){ + String clusterName = it.next(); + BrokerData brokerData = brokerAddrTable.get(clusterName); + HashMap brokerAddrs = brokerData.getBrokerAddrs(); + brokerAddresMaster = brokerAddrs.get(new Long(0)); + brokerName = brokerData.getBrokerName(); + for (Map.Entry offsetTable : stats.getOffsetTable().entrySet()) { + MessageQueue mq = offsetTable.getKey(); + long srcOffset = offsetTable.getValue().getConsumerOffset(); + long targetOffset = this.store.convertTargetOffset(mq, group, srcOffset); + try{ + if (brokerName.equals(mq.getBrokerName())){ + this.tarMQAdminExt.updateConsumeOffset(brokerAddresMaster,group,mq,targetOffset); + } + }catch (Exception e){ + log.error("admin update consumer offset err", e); + } + } + } } catch (Exception e) { log.error("admin get consumer info failed for consumer groups: " + group, e); continue; } - - for (Map.Entry offsetTable : stats.getOffsetTable().entrySet()) { - MessageQueue mq = offsetTable.getKey(); - long srcOffset = offsetTable.getValue().getConsumerOffset(); - long targetOffset = this.store.convertTargetOffset(mq, group, srcOffset); - - List fields = new ArrayList(); - Schema schema = new Schema(SchemaEnum.OFFSET.name(), FieldType.INT64, fields); - schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(), SchemaBuilder.string().build())); - - JSONObject jsonObject = new JSONObject(); - jsonObject.put(FieldName.OFFSET.getKey(), targetOffset); - ConnectRecord connectRecord = new ConnectRecord(Utils.offsetKey(mq), - Utils.offsetValue(srcOffset), System.currentTimeMillis(), schema, jsonObject.toJSONString()); - res.add(connectRecord); - } } return res; } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java index 60e6c8d3..4e28f5fa 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java @@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import io.openmessaging.KeyValue; +import io.openmessaging.internal.DefaultKeyValue; import io.openmessaging.connector.api.component.task.source.SourceTask; import io.openmessaging.connector.api.component.task.source.SourceTaskContext; import io.openmessaging.connector.api.data.ConnectRecord; @@ -179,6 +180,10 @@ private List pollCommonMessage() { final Set keys = properties.keySet(); keys.forEach(key -> connectRecord.addExtension(key, properties.get(key))); connectRecord.addExtension("topic",taskTopicConfig.getTargetTopic()); + connectRecord.addExtension("brokerName",msg.getBrokerName()); + KeyValue kv = new DefaultKeyValue(); + kv.put("queueId",msg.getQueueId()); + connectRecord.addExtension(kv); res.add(connectRecord); } break; diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java index 290dbff4..e2bf90e5 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/common/Utils.java @@ -145,7 +145,9 @@ public static List groupPartitions(List elements, RmqConnector assigned++; } keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic()); + keyValue.put(TaskConfigEnum.TASK_TARGET_ROCKETMQ.getKey(), tdc.getTargetNamesrvs()); keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSrcNamesrvs()); + keyValue.put(TaskConfigEnum.TASK_TARGET_CLUSTER.getKey(), tdc.getTargetCluster()); keyValue.put(TaskConfigEnum.TASK_SOURCE_CLUSTER.getKey(), tdc.getSrcCluster()); keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getOffsetSyncTopic()); keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.OFFSET.ordinal()); @@ -194,14 +196,30 @@ public static DefaultMQAdminExt startTargetMQAdminTool( } public static DefaultMQAdminExt startMQAdminTool(TaskConfig taskConfig) throws MQClientException { + RPCHook rpcHook = null; + if (taskConfig.isSrcAclEnable()) { + rpcHook = new AclClientRPCHook(new SessionCredentials(taskConfig.getSrcAccessKey(), taskConfig.getSrcSecretKey())); + } + DefaultMQAdminExt sourceMQAdminExt = new DefaultMQAdminExt(rpcHook); + sourceMQAdminExt.setNamesrvAddr(taskConfig.getSourceRocketmq()); + sourceMQAdminExt.setAdminExtGroup(ConstDefine.REPLICATOR_TASK_ADMIN_GROUP); + sourceMQAdminExt.setInstanceName(Utils.createUniqInstanceName(taskConfig.getSourceRocketmq())); + + sourceMQAdminExt.start(); + log.info("Source: RocketMQ sourceMQAdminExt started."); + + return sourceMQAdminExt; + } + + public static DefaultMQAdminExt startTarMQAdminTool(TaskConfig taskConfig) throws MQClientException { RPCHook rpcHook = null; if (taskConfig.isSrcAclEnable()) { rpcHook = new AclClientRPCHook(new SessionCredentials(taskConfig.getSrcAccessKey(), taskConfig.getSrcSecretKey())); } DefaultMQAdminExt targetMQAdminExt = new DefaultMQAdminExt(rpcHook); - targetMQAdminExt.setNamesrvAddr(taskConfig.getSourceRocketmq()); + targetMQAdminExt.setNamesrvAddr(taskConfig.getTargetRocketmq()); targetMQAdminExt.setAdminExtGroup(ConstDefine.REPLICATOR_TASK_ADMIN_GROUP); - targetMQAdminExt.setInstanceName(Utils.createUniqInstanceName(taskConfig.getSourceRocketmq())); + targetMQAdminExt.setInstanceName(Utils.createUniqInstanceName(taskConfig.getTargetRocketmq())); targetMQAdminExt.start(); log.info("TARGET: RocketMQ targetMQAdminExt started."); diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java index 79215851..0208c6f2 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java @@ -19,9 +19,11 @@ public class TaskConfig { private String sourceCluster; + private String targetCluster; private String storeTopic; private String sourceGroup; private String sourceRocketmq; + private String targetRocketmq; private Integer dataType; private Long nextPosition; private String taskTopicList; @@ -55,6 +57,14 @@ public void setSourceRocketmq(String sourceRocketmq) { this.sourceRocketmq = sourceRocketmq; } + public String getTargetRocketmq() { + return targetRocketmq; + } + + public void setTargetRocketmq(String targetRocketmq) { + this.targetRocketmq = targetRocketmq; + } + public int getDataType() { return dataType; } @@ -99,6 +109,14 @@ public String getSourceCluster() { return this.sourceCluster; } + public void setTargetCluster(String targetCluster) { + this.targetCluster = targetCluster; + } + + public String getTargetCluster() { + return this.targetCluster; + } + public void setOffsetSyncTopic(String offsetSyncTopic) { this.offsetSyncTopic = offsetSyncTopic; } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java index 520c31f1..0763a1c2 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java @@ -22,6 +22,8 @@ public enum TaskConfigEnum { TASK_SOURCE_GROUP("sourceGroup"), TASK_SOURCE_ROCKETMQ("sourceRocketmq"), TASK_SOURCE_CLUSTER("sourceCluster"), + TASK_TARGET_ROCKETMQ("targetRocketmq"), + TASK_TARGET_CLUSTER("targetCluster"), TASK_OFFSET_SYNC_TOPIC("offsetSyncTopic"), TASK_SOURCE_TOPIC("sourceTopic"), TASK_STORE_ROCKETMQ("storeTopic"), diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index 43b782c7..079afd56 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -46,10 +46,12 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; import org.apache.rocketmq.connect.runtime.common.LoggerName; import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine; import org.apache.rocketmq.connect.runtime.converter.RocketMQConverter; +import org.apache.rocketmq.connect.runtime.converter.RocketMQMetaConverter; import org.apache.rocketmq.connect.runtime.service.PositionManagementService; import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager; import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService; @@ -265,6 +267,9 @@ public void cleanup() { */ private void sendRecord() throws InterruptedException, RemotingException, MQClientException { for (ConnectRecord sourceDataEntry : toSendRecord) { + if (recordConverter instanceof RocketMQMetaConverter){ + return; + } RecordPosition position = sourceDataEntry.getPosition(); RecordOffset offset = position.getOffset(); @@ -312,6 +317,48 @@ private void sendRecord() throws InterruptedException, RemotingException, MQClie sourceMessage.setTopic(targetTopic); } sourceMessage.setBody(messageBody); + int queueId = sourceDataEntry.getExtensions().getInt("queueId"); + String brokerName = sourceDataEntry.getExtension("brokerName"); + MessageQueue mq = new MessageQueue(targetTopic,brokerName,queueId); + try { + producer.send(sourceMessage,mq, new SendCallback() { + @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) { + log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic()); + connectStatsManager.incSourceRecordWriteTotalNums(); + connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + RecordPartition partition = position.getPartition(); + try { + if (null != partition && null != position) { + Map offsetMap = (Map) offset.getOffset(); + offsetMap.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, String.valueOf(sourceDataEntry.getTimestamp())); + positionStorageWriter.putPosition(partition, offset); + } + + } catch (Exception e) { + log.error("Source task save position info failed. partition {}, offset {}", JSON.toJSONString(partition), JSON.toJSONString(offset), e); + } + } + + @Override public void onException(Throwable throwable) { + log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + } + }); + } catch (MQClientException e) { + log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + } catch (RemotingException e) { + log.error("Send message RemotingException. message: {}, error info: {}.", sourceMessage, e); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + } catch (InterruptedException e) { + log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + throw e; + } } } else { final byte[] messageBody = JSON.toJSONString(sourceDataEntry).getBytes(); @@ -320,45 +367,45 @@ private void sendRecord() throws InterruptedException, RemotingException, MQClie continue; } sourceMessage.setBody(messageBody); - } - try { - producer.send(sourceMessage, new SendCallback() { - @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) { - log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic()); - connectStatsManager.incSourceRecordWriteTotalNums(); - connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); - RecordPartition partition = position.getPartition(); - try { - if (null != partition && null != position) { - Map offsetMap = (Map) offset.getOffset(); - offsetMap.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, String.valueOf(sourceDataEntry.getTimestamp())); - positionStorageWriter.putPosition(partition, offset); + try { + producer.send(sourceMessage, new SendCallback() { + @Override public void onSuccess(org.apache.rocketmq.client.producer.SendResult result) { + log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic()); + connectStatsManager.incSourceRecordWriteTotalNums(); + connectStatsManager.incSourceRecordWriteNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + RecordPartition partition = position.getPartition(); + try { + if (null != partition && null != position) { + Map offsetMap = (Map) offset.getOffset(); + offsetMap.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, String.valueOf(sourceDataEntry.getTimestamp())); + positionStorageWriter.putPosition(partition, offset); + } + + } catch (Exception e) { + log.error("Source task save position info failed. partition {}, offset {}", JSON.toJSONString(partition), JSON.toJSONString(offset), e); } - - } catch (Exception e) { - log.error("Source task save position info failed. partition {}, offset {}", JSON.toJSONString(partition), JSON.toJSONString(offset), e); } - } - @Override public void onException(Throwable throwable) { - log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable); - connectStatsManager.incSourceRecordWriteTotalFailNums(); - connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); - } - }); - } catch (MQClientException e) { - log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e); - connectStatsManager.incSourceRecordWriteTotalFailNums(); - connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); - } catch (RemotingException e) { - log.error("Send message RemotingException. message: {}, error info: {}.", sourceMessage, e); - connectStatsManager.incSourceRecordWriteTotalFailNums(); - connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); - } catch (InterruptedException e) { - log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e); - connectStatsManager.incSourceRecordWriteTotalFailNums(); - connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); - throw e; + @Override public void onException(Throwable throwable) { + log.error("Source task send record failed ,error msg {}. message {}", throwable.getMessage(), JSON.toJSONString(sourceMessage), throwable); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + } + }); + } catch (MQClientException e) { + log.error("Send message MQClientException. message: {}, error info: {}.", sourceMessage, e); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + } catch (RemotingException e) { + log.error("Send message RemotingException. message: {}, error info: {}.", sourceMessage, e); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + } catch (InterruptedException e) { + log.error("Send message InterruptedException. message: {}, error info: {}.", sourceMessage, e); + connectStatsManager.incSourceRecordWriteTotalFailNums(); + connectStatsManager.incSourceRecordWriteFailNums(taskConfig.getString(RuntimeConfigDefine.TASK_ID)); + throw e; + } } } toSendRecord = null; diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RocketMQMetaConverter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RocketMQMetaConverter.java new file mode 100644 index 00000000..c9de14e3 --- /dev/null +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RocketMQMetaConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.runtime.converter; + +import io.openmessaging.connector.api.data.Converter; + +/** + * This converter does nothing, only for marking + */ +public class RocketMQMetaConverter implements Converter { + + @Override public byte[] objectToByte(Object o) { + return null; + } + + @Override public Object byteToObject(byte[] bytes) { + return null; + } +} From 219dec8c6966e2272efbcd26a56a96c1de01d2f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5?= Date: Fri, 27 May 2022 14:03:30 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96ReadMe?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connectors/rocketmq-replicator/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/rocketmq-replicator/README.md b/connectors/rocketmq-replicator/README.md index cc1c7060..e39b183d 100644 --- a/connectors/rocketmq-replicator/README.md +++ b/connectors/rocketmq-replicator/README.md @@ -44,7 +44,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop 注:此功能尚不成熟还需要后续版本优化 ```` http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name} -?config={"connector-class":"org.apache.rocketmq.replicator.RmqMetaReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","offset.sync.topic":"syncTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"} +?config={"connector-class":"org.apache.rocketmq.replicator.RmqMetaReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","target-cluster":"test1-rocketmq","source-cluster":"test1-rocketmq","replicator-store-topic":"replicatorTopic","offset.sync.topic":"syncTopic","taskDivideStrategy":"0","white-list":"TestGroup","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQMetaConverter"} ````