Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions connectors/rocketmq-replicator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mvn clean install -Prelease-all -DskipTest -U
同步topic和消息
````
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
?config={"connector-class":"org.apache.rocketmq.replicator.RmqSourceReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
?config={"connector-class":"org.apache.rocketmq.replicator.RmqSourceReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter"}
````


Expand All @@ -44,7 +44,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop
注:此功能尚不成熟还需要后续版本优化
````
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
?config={"connector-class":"org.apache.rocketmq.replicator.RmqMetaReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","offset.sync.topic":"syncTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
?config={"connector-class":"org.apache.rocketmq.replicator.RmqMetaReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","target-cluster":"test1-rocketmq","source-cluster":"test1-rocketmq","replicator-store-topic":"replicatorTopic","offset.sync.topic":"syncTopic","taskDivideStrategy":"0","white-list":"TestGroup","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQMetaConverter"}
````


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.replicator.common.Utils;
import org.apache.rocketmq.replicator.config.ConfigUtil;
import org.apache.rocketmq.replicator.config.TaskConfig;
Expand All @@ -52,6 +57,7 @@ public class MetaSourceTask extends SourceTask {
private final String taskId;
private final TaskConfig config;
private DefaultMQAdminExt srcMQAdminExt;
private DefaultMQAdminExt tarMQAdminExt;
private volatile boolean started = false;

private OffsetSyncStore store;
Expand All @@ -77,6 +83,7 @@ public void start(SourceTaskContext sourceTaskContext) {

try {
this.srcMQAdminExt = Utils.startMQAdminTool(this.config);
this.tarMQAdminExt = Utils.startTarMQAdminTool(this.config);
} catch (MQClientException e) {
log.error("Replicator task start failed for `startMQAdminTool` exception.", e);
throw new IllegalStateException("Replicator task start failed for `startMQAdminTool` exception.");
Expand All @@ -92,6 +99,7 @@ public void stop() {
started = false;
}
srcMQAdminExt.shutdown();
tarMQAdminExt.shutdown();
}

@Override
Expand Down Expand Up @@ -120,28 +128,38 @@ public void resume() {
List<ConnectRecord> res = new ArrayList<>();
for (String group : groups) {
ConsumeStats stats;
String brokerAddresMaster="";
String brokerName="";
try {
stats = this.srcMQAdminExt.examineConsumeStats(group);
ClusterInfo clusterInfo = this.tarMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
Set<String> clusterNameSet = clusterAddrTable.get(this.config.getTargetCluster());
Iterator<String> it = clusterNameSet.iterator();
while (it.hasNext()){
String clusterName = it.next();
BrokerData brokerData = brokerAddrTable.get(clusterName);
HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
brokerAddresMaster = brokerAddrs.get(new Long(0));
brokerName = brokerData.getBrokerName();
for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : stats.getOffsetTable().entrySet()) {
MessageQueue mq = offsetTable.getKey();
long srcOffset = offsetTable.getValue().getConsumerOffset();
long targetOffset = this.store.convertTargetOffset(mq, group, srcOffset);
try{
if (brokerName.equals(mq.getBrokerName())){
this.tarMQAdminExt.updateConsumeOffset(brokerAddresMaster,group,mq,targetOffset);
}
}catch (Exception e){
log.error("admin update consumer offset err", e);
}
}
}
} catch (Exception e) {
log.error("admin get consumer info failed for consumer groups: " + group, e);
continue;
}

for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : stats.getOffsetTable().entrySet()) {
MessageQueue mq = offsetTable.getKey();
long srcOffset = offsetTable.getValue().getConsumerOffset();
long targetOffset = this.store.convertTargetOffset(mq, group, srcOffset);

List<Field> fields = new ArrayList<Field>();
Schema schema = new Schema(SchemaEnum.OFFSET.name(), FieldType.INT64, fields);
schema.getFields().add(new Field(0, FieldName.OFFSET.getKey(), SchemaBuilder.string().build()));

JSONObject jsonObject = new JSONObject();
jsonObject.put(FieldName.OFFSET.getKey(), targetOffset);
ConnectRecord connectRecord = new ConnectRecord(Utils.offsetKey(mq),
Utils.offsetValue(srcOffset), System.currentTimeMillis(), schema, jsonObject.toJSONString());
res.add(connectRecord);
}
}
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
Expand Down Expand Up @@ -178,6 +179,11 @@ private List<ConnectRecord> pollCommonMessage() {
final Map<String, String> properties = msg.getProperties();
final Set<String> keys = properties.keySet();
keys.forEach(key -> connectRecord.addExtension(key, properties.get(key)));
connectRecord.addExtension("topic",taskTopicConfig.getTargetTopic());
connectRecord.addExtension("brokerName",msg.getBrokerName());
KeyValue kv = new DefaultKeyValue();
kv.put("queueId",msg.getQueueId());
connectRecord.addExtension(kv);
res.add(connectRecord);
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public static List<KeyValue> groupPartitions(List<String> elements, RmqConnector
assigned++;
}
keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
keyValue.put(TaskConfigEnum.TASK_TARGET_ROCKETMQ.getKey(), tdc.getTargetNamesrvs());
keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSrcNamesrvs());
keyValue.put(TaskConfigEnum.TASK_TARGET_CLUSTER.getKey(), tdc.getTargetCluster());
keyValue.put(TaskConfigEnum.TASK_SOURCE_CLUSTER.getKey(), tdc.getSrcCluster());
keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getOffsetSyncTopic());
keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.OFFSET.ordinal());
Expand Down Expand Up @@ -194,14 +196,30 @@ public static DefaultMQAdminExt startTargetMQAdminTool(
}

public static DefaultMQAdminExt startMQAdminTool(TaskConfig taskConfig) throws MQClientException {
RPCHook rpcHook = null;
if (taskConfig.isSrcAclEnable()) {
rpcHook = new AclClientRPCHook(new SessionCredentials(taskConfig.getSrcAccessKey(), taskConfig.getSrcSecretKey()));
}
DefaultMQAdminExt sourceMQAdminExt = new DefaultMQAdminExt(rpcHook);
sourceMQAdminExt.setNamesrvAddr(taskConfig.getSourceRocketmq());
sourceMQAdminExt.setAdminExtGroup(ConstDefine.REPLICATOR_TASK_ADMIN_GROUP);
sourceMQAdminExt.setInstanceName(Utils.createUniqInstanceName(taskConfig.getSourceRocketmq()));

sourceMQAdminExt.start();
log.info("Source: RocketMQ sourceMQAdminExt started.");

return sourceMQAdminExt;
}

public static DefaultMQAdminExt startTarMQAdminTool(TaskConfig taskConfig) throws MQClientException {
RPCHook rpcHook = null;
if (taskConfig.isSrcAclEnable()) {
rpcHook = new AclClientRPCHook(new SessionCredentials(taskConfig.getSrcAccessKey(), taskConfig.getSrcSecretKey()));
}
DefaultMQAdminExt targetMQAdminExt = new DefaultMQAdminExt(rpcHook);
targetMQAdminExt.setNamesrvAddr(taskConfig.getSourceRocketmq());
targetMQAdminExt.setNamesrvAddr(taskConfig.getTargetRocketmq());
targetMQAdminExt.setAdminExtGroup(ConstDefine.REPLICATOR_TASK_ADMIN_GROUP);
targetMQAdminExt.setInstanceName(Utils.createUniqInstanceName(taskConfig.getSourceRocketmq()));
targetMQAdminExt.setInstanceName(Utils.createUniqInstanceName(taskConfig.getTargetRocketmq()));

targetMQAdminExt.start();
log.info("TARGET: RocketMQ targetMQAdminExt started.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
public class TaskConfig {

private String sourceCluster;
private String targetCluster;
private String storeTopic;
private String sourceGroup;
private String sourceRocketmq;
private String targetRocketmq;
private Integer dataType;
private Long nextPosition;
private String taskTopicList;
Expand Down Expand Up @@ -55,6 +57,14 @@ public void setSourceRocketmq(String sourceRocketmq) {
this.sourceRocketmq = sourceRocketmq;
}

public String getTargetRocketmq() {
return targetRocketmq;
}

public void setTargetRocketmq(String targetRocketmq) {
this.targetRocketmq = targetRocketmq;
}

public int getDataType() {
return dataType;
}
Expand Down Expand Up @@ -99,6 +109,14 @@ public String getSourceCluster() {
return this.sourceCluster;
}

public void setTargetCluster(String targetCluster) {
this.targetCluster = targetCluster;
}

public String getTargetCluster() {
return this.targetCluster;
}

public void setOffsetSyncTopic(String offsetSyncTopic) {
this.offsetSyncTopic = offsetSyncTopic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public enum TaskConfigEnum {
TASK_SOURCE_GROUP("sourceGroup"),
TASK_SOURCE_ROCKETMQ("sourceRocketmq"),
TASK_SOURCE_CLUSTER("sourceCluster"),
TASK_TARGET_ROCKETMQ("targetRocketmq"),
TASK_TARGET_CLUSTER("targetCluster"),
TASK_OFFSET_SYNC_TOPIC("offsetSyncTopic"),
TASK_SOURCE_TOPIC("sourceTopic"),
TASK_STORE_ROCKETMQ("storeTopic"),
Expand Down
Loading