diff --git a/pom.xml b/pom.xml
index 15a96e413..95b5ec950 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
${project.basedir}
2.2.0
2.3.3
- 2.4.3
+ 2.4.0
1.3
0.6.3
UTF-8
@@ -92,9 +92,9 @@
1.2.58
3.3.1
0.9
- 2.2
-
- 3.2.11
+ 2.4
+ 3.5.3
+
19.0
jacoco
diff --git a/rider/conf/application.conf b/rider/conf/application.conf
index e23b6aa2b..137dcc809 100644
--- a/rider/conf/application.conf
+++ b/rider/conf/application.conf
@@ -3,7 +3,7 @@ akka.http.server.request-timeout = 120s
wormholeServer {
cluster.id = "" #optional global uuid
- host = "localhost"
+ host = "master"
port = 8989
ui.default.language = "Chinese"
token.timeout = 1
@@ -18,8 +18,8 @@ mysql = {
db = {
driver = "com.mysql.jdbc.Driver"
user = "root"
- password = "root"
- url = "jdbc:mysql://localhost:3306/wormhole?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
+ password = "Your@pwd123"
+ url = "jdbc:mysql://master:3306/wormhole?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
numThreads = 4
minConnections = 4
maxConnections = 10
@@ -42,11 +42,11 @@ ldap = {
}
spark = {
- spark.home = "/usr/local/spark"
+ spark.home = "/usr"
yarn.queue.name = "default" #WormholeServer submit spark streaming/job queue
- wormhole.hdfs.root.path = "hdfs://nn1/wormhole" #WormholeServer hdfslog data default hdfs root path
- yarn.rm1.http.url = "localhost:8088" #Yarn ActiveResourceManager address
- yarn.rm2.http.url = "localhost:8088" #Yarn StandbyResourceManager address
+ wormhole.hdfs.root.path = "hdfs://master:8020/wormhole" #WormholeServer hdfslog data default hdfs root path
+ yarn.rm1.http.url = "slave01:8088" #Yarn ActiveResourceManager address
+ #yarn.rm2.http.url = "slave01:8088" #Yarn StandbyResourceManager address
#yarn.web-proxy.port = 8888 #Yarn web proxy port, just set if yarn service set yarn.web-proxy.address config
}
@@ -64,13 +64,13 @@ flink = {
}
zookeeper = {
- connection.url = "localhost:2181" #WormholeServer stream and flow interaction channel
+ connection.url = "slave01:2181,master:2181,slave02:2181" #WormholeServer stream and flow interaction channel
wormhole.root.path = "/wormhole" #zookeeper
}
kafka = {
- brokers.url = "localhost:6667" #WormholeServer feedback data store
- zookeeper.url = "localhost:2181"
+ brokers.url = "master:9092,slave01:9092,slave02:9092" #WormholeServer feedback data store
+ zookeeper.url = "slave01:2181,master:2181,slave02:2181"
topic.refactor = 3
using.cluster.suffix = false #if true, _${cluster.id} will be concatenated to consumer.feedback.topic
consumer = {
diff --git a/rider/conf/log4j.properties b/rider/conf/log4j.properties
index 483c88a01..c289519d9 100644
--- a/rider/conf/log4j.properties
+++ b/rider/conf/log4j.properties
@@ -1,4 +1,4 @@
-log4j.rootLogger=INFO, FILE
+log4j.rootLogger=INFO, FILE, CONSOLE
## for console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
diff --git a/rider/rider-server/src/main/scala/edp/rider/common/RiderConfig.scala b/rider/rider-server/src/main/scala/edp/rider/common/RiderConfig.scala
index 1c65f4351..33ab5fa62 100644
--- a/rider/rider-server/src/main/scala/edp/rider/common/RiderConfig.scala
+++ b/rider/rider-server/src/main/scala/edp/rider/common/RiderConfig.scala
@@ -159,6 +159,7 @@ case class Monitor(databaseType: String)
object RiderConfig {
lazy val riderRootPath = s"${System.getProperty("WORMHOLE_HOME")}"
+ //lazy val riderRootPath ="D:\\workspaces\\gaoji\\wormhole\\rider"
lazy val riderServer = RiderServer(
getStringConfig("wormholeServer.cluster.id", ""),
diff --git a/sparkextension/spark_extension_2_4/pom.xml b/sparkextension/spark_extension_2_4/pom.xml
index a2d244643..f42cd54c7 100644
--- a/sparkextension/spark_extension_2_4/pom.xml
+++ b/sparkextension/spark_extension_2_4/pom.xml
@@ -20,11 +20,13 @@
+
edp.wormhole
wormhole-hadoop
${project.version}
+
org.apache.spark
spark-core_2.11
@@ -48,6 +50,7 @@
+
org.apache.spark
spark-streaming_2.11
diff --git a/sparkx/pom.xml b/sparkx/pom.xml
index e8b72ce75..00c5bfb2c 100644
--- a/sparkx/pom.xml
+++ b/sparkx/pom.xml
@@ -28,6 +28,10 @@
junit
junit
+
+ net.jpountz.lz4
+ lz4
+
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowDirective.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowDirective.scala
index 057a8f5c2..3fe44be62 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowDirective.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowDirective.scala
@@ -40,6 +40,7 @@ import scala.collection.mutable
object BatchflowDirective extends Directive {
private def registerFlowStartDirective(flowDirectiveConfig: FlowDirectiveConfig): String = {
+
val consumptionDataMap = mutable.HashMap.empty[String, Boolean]
val consumption = JSON.parseObject(flowDirectiveConfig.consumptionDataStr)
val initial = consumption.getString(InputDataProtocolBaseType.INITIAL.toString).trim.toLowerCase.toBoolean
@@ -176,16 +177,82 @@ object BatchflowDirective extends Directive {
}
override def flowStartProcess(ums: Ums): String = {
+ """
+ {
+ "protocol": {
+ "type": "directive_flow_start"
+ },
+ "schema": {
+ "namespace": "kafka.hdp-kafka.test_source.test_table.*.*.*",
+ "fields": [{
+ "name": "directive_id",
+ "type": "long",
+ "nullable": false
+ }, {
+ "name": "stream_id",
+ "type": "long",
+ "nullable": false
+ }, {
+ "name": "flow_id",
+ "type": "long",
+ "nullable": false
+ }, {
+ "name": "source_increment_topic",
+ "type": "string",
+ "nullable": false
+ }, {
+ "name": "ums_ts_",
+ "type": "datetime",
+ "nullable": false
+ }, {
+ "name": "data_type",
+ "type": "string",
+ "nullable": false
+ }, {
+ "name": "data_parse",
+ "type": "string",
+ "nullable": true
+ }, {
+ "name": "sink_namespace",
+ "type": "string",
+ "nullable": false
+ }, {
+ "name": "consumption_protocol",
+ "type": "string",
+ "nullable": false
+ }, {
+ "name": "sinks",
+ "type": "string",
+ "nullable": false
+ }, {
+ "name": "swifts",
+ "type": "string",
+ "nullable": true
+ }, {
+ "name": "kerberos",
+ "type": "boolean",
+ "nullable": true
+ }, {
+ "name": "priority_id",
+ "type": "long",
+ "nullable": true
+ }]
+ },
+ "payload": [{
+ "tuple": [35, 1, 2, "test_source", "2020-05-14 11:53:31.000000", "ums_extension", "eyJmaWVsZHMiOlt7Im5hbWUiOiJpZCIsInR5cGUiOiJsb25nIiwibnVsbGFibGUiOnRydWV9LHsibmFtZSI6Im5hbWUiLCJ0eXBlIjoic3RyaW5nIiwibnVsbGFibGUiOnRydWV9LHsibmFtZSI6InBob25lIiwidHlwZSI6InN0cmluZyIsIm51bGxhYmxlIjp0cnVlfSx7Im5hbWUiOiJjaXR5IiwidHlwZSI6InN0cmluZyIsIm51bGxhYmxlIjp0cnVlfSx7Im5hbWUiOiJ0aW1lIiwidHlwZSI6ImRhdGV0aW1lIiwibnVsbGFibGUiOnRydWV9LHsibmFtZSI6InRpbWUiLCJ0eXBlIjoiZGF0ZXRpbWUiLCJudWxsYWJsZSI6dHJ1ZSwicmVuYW1lIjoidW1zX3RzXyJ9XX0=", "mysql.hdp-mysql.testdb.user.*.*.*", "eyJpbml0aWFsIjogdHJ1ZSwgImluY3JlbWVudCI6IHRydWUsICJiYXRjaCI6IGZhbHNlfQ==", "ew0ic2lua19jb25uZWN0aW9uX3VybCI6ICJqZGJjOm15c3FsOi8vbWFzdGVyOjMzMDYvdGVzdGRiIiwNInNpbmtfY29ubmVjdGlvbl91c2VybmFtZSI6ICJyb290IiwNInNpbmtfY29ubmVjdGlvbl9wYXNzd29yZCI6ICJZb3VyQHB3ZDEyMyIsDSJzaW5rX3RhYmxlX2tleXMiOiAiaWQiLA0ic2lua19vdXRwdXQiOiAiIiwNInNpbmtfY29ubmVjdGlvbl9jb25maWciOiAiIiwNInNpbmtfcHJvY2Vzc19jbGFzc19mdWxsbmFtZSI6ICJlZHAud29ybWhvbGUuc2lua3MuZGJzaW5rLkRhdGEyRGJTaW5rIiwNInNpbmtfc3BlY2lmaWNfY29uZmlnIjogeyJtdXRhdGlvbl90eXBlIjoiaSJ9LA0ic2lua19yZXRyeV90aW1lcyI6ICIzIiwNInNpbmtfcmV0cnlfc2Vjb25kcyI6ICIzMDAiDX0=", "eyJwdXNoZG93bl9jb25uZWN0aW9uIjpbeyJwYXNzd29yZCI6IllvdXJAcHdkMTIzIiwibmFtZV9zcGFjZSI6Im15c3FsLmhkcC1teXNxbC5sb29rdXAiLCJjb25uZWN0aW9uX2NvbmZpZyI6W10sImpkYmNfdXJsIjoiamRiYzpteXNxbDovL21hc3RlcjozMzA2L2xvb2t1cD91c2VVbmljb2RlPXRydWUmY2hhcmFjdGVyRW5jb2Rpbmc9dXRmOCZhdXRvUmVjb25uZWN0PXRydWUmZmFpbE92ZXJSZWFkT25seT1mYWxzZSZub0FjY2Vzc1RvUHJvY2VkdXJlQm9kaWVzPXRydWUmemVyb0RhdGVUaW1lQmVoYXZpb3I9Y29udmVydFRvTnVsbCZ0aW55SW50MWlzQml0PWZhbHNlIiwidXNlcm5hbWUiOiJyb290In1dLCJkYXRhZnJhbWVfc2hvdyI6InRydWUiLCJhY3Rpb24iOiJjSFZ6YUdSdmQyNWZjM0ZzSUd4bFpuUWdhbTlwYmlCM2FYUm9JRzE1YzNGc0xtaGtjQzF0ZVhOeGJDNXNiMjlyZFhBZ1BTQnpaV3hsXG5ZM1FnYVdRZ1lYTWdhV1F4TEdOaGNtUkNZVzVySUdaeWIyMGdkWE5sY2tOaGNtUWdkMmhsY21VZ0tHbGtLU0JwYmlBb2EyRm1hMkV1XG5hR1J3TFd0aFptdGhMblJsYzNSZmMyOTFjbU5sTG5SbGMzUmZkR0ZpYkdVdWFXUXBPM053WVhKclgzTnhiQ0E5SUhObGJHVmpkQ0JwXG5aQ3h1WVcxbExHTmhjbVJDWVc1ckxIQm9iMjVsTEdOcGRIa2dabkp2YlNCMFpYTjBYM1JoWW14bE93PT0iLCJkYXRhZnJhbWVfc2hvd19udW0iOjEwfQ==", "false", "1"]
+ }]
+ }
+ """
val payloads = ums.payload_get
val schemas = ums.schema.fields_get
val sourceNamespace = ums.schema.namespace.toLowerCase
- val tuple = payloads.head
+ val tuple = payloads.head // 取第一条
val streamId = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "stream_id").toString.toLong
val directiveId = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "directive_id").toString.toLong
val flowId = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "flow_id").toString.toLong
try {
- val swiftsEncoded = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "swifts")
+ val swiftsEncoded = UmsFieldType.umsFieldValue(tuple.tuple, schemas, "swifts") // base64编码的
val swiftsStr = if (swiftsEncoded != null && !swiftsEncoded.toString.isEmpty) new String(new sun.misc.BASE64Decoder().decodeBuffer(swiftsEncoded.toString)) else null
logInfo("swiftsStr:" + swiftsStr)
@@ -206,8 +273,9 @@ object BatchflowDirective extends Directive {
if(null != sourceIncrementTopic) sourceIncrementTopic.toString.split(",").toList
else null
+ // 定义config对象
val flowDirectiveConfig = FlowDirectiveConfig(sourceNamespace, fullSinkNamespace, streamId, flowId, directiveId, swiftsStr, sinksStr, consumptionDataStr, dataType, dataParseStr, kerberos, priorityId, sourceIncrementTopicList)
-
+ // 初始化各种配置
registerFlowStartDirective(flowDirectiveConfig)
} catch {
case e: Throwable =>
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowMainProcess.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowMainProcess.scala
index 03df1847b..36b0f3f0e 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowMainProcess.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowMainProcess.scala
@@ -95,8 +95,9 @@ object BatchflowMainProcess extends EdpLogging {
UdfDirective.registerUdfProcess(config.kafka_output.feedback_topic_name, config.kafka_output.brokers, session)
logInfo("start create classifyRdd")
+ //将rdd中的row进行分类,mainNamespace lookupNameSpace, OthrNameSpace
val classifyRdd: RDD[(ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[String], Array[((UmsProtocolType, String), Seq[UmsField])])] = getClassifyRdd(dataRepartitionRdd).cache()
- val distinctSchema: mutable.Map[(UmsProtocolType, String), (Seq[UmsField], Long)] = getDistinctSchema(classifyRdd)
+ val distinctSchema: mutable.Map[(UmsProtocolType, String), (Seq[UmsField], Long)] = getDistinctSchema(classifyRdd) // 对所有的namespace去重
logInfo("start doStreamLookupData")
doStreamLookupData(session, classifyRdd, config, distinctSchema)
@@ -156,27 +157,27 @@ object BatchflowMainProcess extends EdpLogging {
)
}
-
+ // 将batch中的rdd数据,按照namespace的不同,进行区分,生成不同的rdd
private def getClassifyRdd(dataRepartitionRdd: RDD[(String, String)]): RDD[(ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[String], Array[((UmsProtocolType, String), Seq[UmsField])])] = {
val streamLookupNamespaceSet = ConfMemoryStorage.getAllLookupNamespaceSet
- val mainNamespaceSet = ConfMemoryStorage.getAllMainNamespaceSet
- val jsonSourceParseMap: Map[(UmsProtocolType, String), (Seq[UmsField], Seq[FieldInfo], ArrayBuffer[(String, String)])] = ConfMemoryStorage.getAllSourceParseMap
+ val mainNamespaceSet = ConfMemoryStorage.getAllMainNamespaceSet // 获取source的命名空间
+ val jsonSourceParseMap: Map[(UmsProtocolType, String), (Seq[UmsField], Seq[FieldInfo], ArrayBuffer[(String, String)])] = ConfMemoryStorage.getAllSourceParseMap // flow的source的两种处理协议 increment和initial,页面上配置即可
//log.info(s"streamLookupNamespaceSet: $streamLookupNamespaceSet, mainNamespaceSet $mainNamespaceSet, jsonSourceParseMap $jsonSourceParseMap")
- dataRepartitionRdd.mapPartitions(partition => {
+ dataRepartitionRdd.mapPartitions(partition => { // 遍历rdd的分区
val mainDataList = ListBuffer.empty[((UmsProtocolType, String), Seq[UmsTuple])]
val lookupDataList = ListBuffer.empty[((UmsProtocolType, String), Seq[UmsTuple])]
val otherList = ListBuffer.empty[String]
- val nsSchemaMap = mutable.HashMap.empty[(UmsProtocolType, String), Seq[UmsField]]
- partition.foreach(row => {
+ val nsSchemaMap = mutable.HashMap.empty[(UmsProtocolType, String), Seq[UmsField]] // 记录所有的(protocalType,namesapce)和其type的映射
+ partition.foreach(row => { // 遍历分区的每一个row
try {
- val (protocolType, namespace) = UmsCommonUtils.getTypeNamespaceFromKafkaKey(row._1)
+ val (protocolType, namespace) = UmsCommonUtils.getTypeNamespaceFromKafkaKey(row._1) // row 是个tuple _1是 data_increment_data.kafka.hdp-kafka.test_source.test_table.*.*.*
if (protocolType == UmsProtocolType.DATA_INCREMENT_DATA || protocolType == UmsProtocolType.DATA_BATCH_DATA || protocolType == UmsProtocolType.DATA_INITIAL_DATA) {
- if (ConfMemoryStorage.existNamespace(mainNamespaceSet, namespace)) {
+ if (ConfMemoryStorage.existNamespace(mainNamespaceSet, namespace)) { // 如果当前的namespace属于mainNameSpace
val schemaValueTuple: (Seq[UmsField], Seq[UmsTuple]) = SparkxUtils.jsonGetValue(namespace, protocolType, row._2, jsonSourceParseMap)
if (!nsSchemaMap.contains((protocolType, namespace))) nsSchemaMap((protocolType, namespace)) = schemaValueTuple._1.map(f => UmsField(f.name.toLowerCase, f.`type`, f.nullable))
mainDataList += (((protocolType, namespace), schemaValueTuple._2))
}
- if (ConfMemoryStorage.existNamespace(streamLookupNamespaceSet, namespace)) {
+ if (ConfMemoryStorage.existNamespace(streamLookupNamespaceSet, namespace)) { // 如果当前的namespace属于lookupNameSpace
//todo change if back to if, efficiency
val schemaValueTuple: (Seq[UmsField], Seq[UmsTuple]) = SparkxUtils.jsonGetValue(namespace, protocolType, row._2, jsonSourceParseMap)
if (!nsSchemaMap.contains((protocolType, namespace))) nsSchemaMap((protocolType, namespace)) = schemaValueTuple._1.map(f => UmsField(f.name.toLowerCase, f.`type`, f.nullable))
@@ -188,7 +189,7 @@ object BatchflowMainProcess extends EdpLogging {
case e1: Throwable => logAlert("do classifyRdd,one data has error,row:" + row, e1)
}
})
- List((mainDataList, lookupDataList, otherList, nsSchemaMap.toArray)).toIterator
+ List((mainDataList, lookupDataList, otherList, nsSchemaMap.toArray)).toIterator // 统一成tuple,返回rdd的迭代
})
}
@@ -238,8 +239,8 @@ object BatchflowMainProcess extends EdpLogging {
val umsRdd: RDD[(UmsProtocolType, String, ArrayBuffer[Seq[String]])] = formatRdd(allDataRdd, "lookup")
distinctSchema.foreach(schema => {
val namespace = schema._1._2
- val matchLookupNamespace = ConfMemoryStorage.getMatchLookupNamespaceRule(namespace)
- if (matchLookupNamespace != null) {
+ val matchLookupNamespace = ConfMemoryStorage.getMatchLookupNamespaceRule(namespace) // 查看下是否是look up的namespace
+ if (matchLookupNamespace != null) { // 如果不为空,则为是
val protocolType: UmsProtocolType = schema._1._1
val lookupDf = createSourceDf(session, namespace, schema._2._1, umsRdd.filter(row => {
row._1 == protocolType && row._2 == namespace
@@ -321,7 +322,7 @@ object BatchflowMainProcess extends EdpLogging {
val flowConfig: FlowConfig = flowConfigMap(sinkNamespace)
if (swiftsProcessConfig.nonEmpty && swiftsProcessConfig.get.swiftsSql.nonEmpty) {
-
+ // 如果有swift计算,则进入
val (returnUmsFields, tuplesRDD, unionDf) = swiftsProcess(protocolType, flowConfig, swiftsProcessConfig, uuid, session, sourceTupleRDD, config, sourceNamespace, sinkNamespace, minTs, maxTs, count, sinkFields, batchId, topicPartitionOffset)
sinkFields = returnUmsFields
sinkRDD = tuplesRDD
@@ -429,7 +430,7 @@ object BatchflowMainProcess extends EdpLogging {
if (dataSetShow.get) {
sourceDf.show(swiftsProcessConfig.get.datasetShowNum.get)
}
-
+ //是否需要对hdfs上的parquet rdd进行union
val afterUnionDf = unionParquetNonTimeoutDf(swiftsProcessConfig, uuid, session, sourceDf, config, sourceNamespace, sinkNamespace).cache
try {
@@ -470,7 +471,7 @@ object BatchflowMainProcess extends EdpLogging {
private def getDistinctSchema(umsRdd: RDD[(ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[((UmsProtocolType, String), Seq[UmsTuple])], ListBuffer[String], Array[((UmsProtocolType, String), Seq[UmsField])])]): mutable.Map[(UmsProtocolType.UmsProtocolType, String), (Seq[UmsField], Long)] = {
val schemaMap = mutable.HashMap.empty[(UmsProtocolType, String), (Seq[UmsField], Long)]
- umsRdd.map(_._4).collect().foreach(_.foreach {
+ umsRdd.map(_._4).collect().foreach(_.foreach { // _4 是所有(protocol,ns)和field type的map关系
case ((protocol, ns), schema) =>
if (!schemaMap.contains((protocol, ns))) {
val matchSourceNs = ConfMemoryStorage.getMatchSourceNamespaceRule(ns)
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowStarter.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowStarter.scala
index 7d353baa4..79103846e 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowStarter.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/batchflow/BatchflowStarter.scala
@@ -39,7 +39,11 @@ object BatchflowStarter extends App with EdpLogging {
SparkContextUtils.setLoggerLevel()
logInfo("swiftsConfig:" + args(0))
- val config: WormholeConfig = JsonUtils.json2caseClass[WormholeConfig](args(0))
+ //val config: WormholeConfig = JsonUtils.json2caseClass[WormholeConfig](args(0))
+
+ System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master")
+ var s="""{"kafka_input":{"group_id":"wormhole_demo_test_stream","batch_duration_seconds":30,"brokers":"master:9092,slave01:9092,slave02:9091","kerberos":false,"max.partition.fetch.bytes":10485760,"session.timeout.ms":30000,"group.max.session.timeout.ms":60000,"auto.offset.reset":"earliest","key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer":"org.apache.kafka.common.serialization.StringDeserializer","enable.auto.commit":false},"kafka_output":{"feedback_topic_name":"wormhole_feedback","brokers":"master:9092,slave01:9092,slave02:9092","kerberos":false},"spark_config":{"stream_id":1,"stream_name":"wormhole_demo_test_stream","master":"local[1]","spark.sql.shuffle.partitions":3},"rdd_partition_number":3,"zookeeper_address":"slave01:2181,master:2181,slave02:2181","zookeeper_path":"/wormhole","kafka_persistence_config_isvalid":false,"stream_hdfs_address":"hdfs://master:8020/wormhole","kerberos":false}"""
+ val config: WormholeConfig = JsonUtils.json2caseClass[WormholeConfig](s)
val appId = SparkUtils.getAppId
WormholeKafkaProducer.initWithoutAcksAll(config.kafka_output.brokers, config.kafka_output.config,config.kafka_output.kerberos)
val sparkConf = new SparkConf()
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkContextUtils.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkContextUtils.scala
index a1d286b72..82ce40a8b 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkContextUtils.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/common/SparkContextUtils.scala
@@ -93,12 +93,15 @@ object SparkContextUtils extends EdpLogging{
val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
+ // 消费策略
val perConfig: WormholePerPartitionConfig = new WormholePerPartitionConfig(partitionRateMap)
val consumerStrategy: ConsumerStrategy[String, String] = if(kafkaInput.inWatch){
ConsumerStrategies.Subscribe[String, String](topicList, kafkaInput.inputBrokers, partitionOffsetMap.toMap)
}else{
ConsumerStrategies.Subscribe[String, String](topicList, kafkaInput.inputBrokers)
}
+
+ // 同时订阅多个topic, topic中的数据,不是淡出的数据,而是有namespace的,
WormholeKafkaUtils.createWormholeDirectStream[String, String](ssc, locationStrategy, consumerStrategy, perConfig)
}
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/directive/DirectiveFlowWatch.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/directive/DirectiveFlowWatch.scala
index 3fd2563da..2cb4fcce2 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/directive/DirectiveFlowWatch.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/directive/DirectiveFlowWatch.scala
@@ -41,6 +41,7 @@ object DirectiveFlowWatch extends EdpLogging {
def initFlow(config: WormholeConfig, appId: String): Unit = {
logInfo("init flow,appId=" + appId)
val watchPath = config.zookeeper_path + "/" + config.spark_config.stream_id + flowRelativePath
+ // 如果zk中不存在此路径,则创建
if (!WormholeZkClient.checkExist(config.zookeeper_address, watchPath)) WormholeZkClient.createPath(config.zookeeper_address, watchPath)
val flowList = WormholeZkClient.getChildren(config.zookeeper_address, watchPath)
flowList.toArray.foreach(flow => {
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/memorystorage/OffsetPersistenceManager.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/memorystorage/OffsetPersistenceManager.scala
index a824e046b..343cd3f0e 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/memorystorage/OffsetPersistenceManager.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/memorystorage/OffsetPersistenceManager.scala
@@ -39,7 +39,7 @@ import scala.collection.mutable.ListBuffer
object OffsetPersistenceManager extends EdpLogging {
- val directiveList = new ConcurrentLinkedQueue[(Ums, Ums)]
+ val directiveList = new ConcurrentLinkedQueue[(Ums, Ums)]
val topicTypePath = "topictype"
val rateRelativePath = "rate"
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataFrameTransform.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataFrameTransform.scala
index 21d49d5a8..e89d60d87 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataFrameTransform.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataFrameTransform.scala
@@ -133,22 +133,22 @@ object DataFrameTransform extends EdpLogging {
def getDbJoinOrUnionDf(session: SparkSession, currentDf: DataFrame, sourceTableFields: Array[String], lookupTableFields: Array[String], sql: String, connectionConfig: ConnectionConfig, schemaStr: String, operate: SwiftsSql, sqlType: UmsDataSystem.Value, batchSize: Option[Int] = None): DataFrame = {
var index = -1
- val dbOutPutSchemaMap: Map[String, (String, Int)] = schemaStr.split(",").map(str => {
+ val dbOutPutSchemaMap: Map[String, (String, Int)] = schemaStr.split(",").map(str => { // 将sql中涉及的字段,分装成输出的rdd字段
val arr = str.split(":")
index += 1
(arr(0), (arr(1), index))
}).toMap //order is not same as input order !!!
val inputDfSchema = currentDf.schema
- val resultSchema: StructType = SqlOptType.toSqlOptType(operate.optType) match {
+ val resultSchema: StructType = SqlOptType.toSqlOptType(operate.optType) match { // 根据opt type的不同,判定最终rdd的字段类型
case SqlOptType.JOIN | SqlOptType.INNER_JOIN | SqlOptType.LEFT_JOIN =>
var afterJoinSchema: StructType = inputDfSchema
- val addColumnType = dbOutPutSchemaMap.map { case (name, (dataType, _)) => StructField(name, ums2sparkType(umsFieldType(dataType))) }
+ val addColumnType = dbOutPutSchemaMap.map { case (name, (dataType, _)) => StructField(name, ums2sparkType(umsFieldType(dataType))) } // join inner join left join 以sql的字段为主
addColumnType.foreach(column => afterJoinSchema = afterJoinSchema.add(column))
afterJoinSchema
case SqlOptType.UNION => inputDfSchema
}
-
+ // 真正执行join sql的地方
val joinedRow: RDD[Row] = currentDf.rdd.mapPartitions(partition => {
val originalDatas: ListBuffer[Row] = partition.to[ListBuffer]
if (batchSize.nonEmpty)
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataframeObtain.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataframeObtain.scala
index da3564e35..fc3b65f05 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataframeObtain.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/DataframeObtain.scala
@@ -79,15 +79,16 @@ object DataframeObtain extends EdpLogging {
}
val lookupTableFieldsAlias = operate.lookupTableFieldsAlias.get
- if (sourceTableFields.length == 1) {
+ if (sourceTableFields.length == 1) { // 如果是一个join的列
tmpLastDf = tmpLastDf.join(df1, tmpLastDf(sourceTableFields(0)) === df1(lookupTableFieldsAlias(0)), joinType)
} else {
+ // 如果是多个join的列
val tmpLength = sourceTableFields.length
var condition = tmpLastDf(sourceTableFields(0)) === df1(lookupTableFieldsAlias(0))
for (i <- 1 until tmpLength) {
condition = condition && tmpLastDf(sourceTableFields(i)) === df1(lookupTableFieldsAlias(i))
- }
- tmpLastDf = tmpLastDf.join(df1, condition, joinType)
+ }// 拼接到一起作为条件
+ tmpLastDf = tmpLastDf.join(df1, condition, joinType) // 执行两个df的join
}
tmpLastDf
}
diff --git a/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/SwiftsTransform.scala b/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/SwiftsTransform.scala
index e42916100..cb58fd146 100644
--- a/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/SwiftsTransform.scala
+++ b/sparkx/src/main/scala/edp/wormhole/sparkx/swifts/transform/SwiftsTransform.scala
@@ -56,7 +56,7 @@ object SwiftsTransform extends EdpLogging {
if (swiftsSqlArr.isDefined) {
val swiftsArr: Array[SwiftsSql] = swiftsSqlArr.get
- swiftsArr.foreach(f = operate => {
+ swiftsArr.foreach(f = operate => { // 每一个sql
val lookupNamespace = if (operate.lookupNamespace.isDefined) operate.lookupNamespace.get else null
val sourceTableFields = if (operate.sourceTableFields.isDefined) operate.sourceTableFields.get else null
val lookupTableFields = if (operate.lookupTableFields.isDefined) operate.lookupTableFields.get else null
@@ -83,10 +83,12 @@ object SwiftsTransform extends EdpLogging {
if (ConfMemoryStorage.existStreamLookup(matchSourceNamespace, sinkNamespace, lookupNamespace)) {
// lookup Namespace is also match rule format .*.*
val path = config.stream_hdfs_address.get + "/" + "swiftsparquet" + "/" + config.spark_config.stream_id + "/" + matchSourceNamespace.replaceAll("\\*", "-") + "/" + sinkNamespace + "/streamLookupNamespace"
+ // 查看lookup表的数据是否存在hdfs上存在,如果存在则通过spark sql生成临时表,表名字md5生成。
val (tableNameArrMD5, allTableReady) = DataframeObtain.createDfAndViewFromParquet(matchSourceNamespace, lookupNamespace, sinkNamespace, session, path)
if (allTableReady) {
try {
tmpTableNameList ++= tableNameArrMD5
+ // 获取真正要执行的sql
val newSql = SqlBinding.getSlidingUnionSql(session, currentDf, sourceTableFields, lookupTableFields, sql)
logInfo(uuid + ",lookupStreamMap JOIN newSql@:" + newSql)
val df1 = session.sql(newSql)
@@ -94,7 +96,7 @@ object SwiftsTransform extends EdpLogging {
} catch {
case e: Throwable =>
logError("getJoinDf", e)
- tmpTableNameList.foreach(name => session.sqlContext.dropTempTable(name))
+ tmpTableNameList.foreach(name => session.sqlContext.dropTempTable(name)) // 如果出错,则生产临时表
throw e
}
} else {
diff --git a/ums/src/main/scala/edp/wormhole/ums/UmsCommonUtils.scala b/ums/src/main/scala/edp/wormhole/ums/UmsCommonUtils.scala
index a9d8197e2..340c3e209 100644
--- a/ums/src/main/scala/edp/wormhole/ums/UmsCommonUtils.scala
+++ b/ums/src/main/scala/edp/wormhole/ums/UmsCommonUtils.scala
@@ -132,16 +132,16 @@ object UmsCommonUtils extends Serializable {
} else key
}
- def checkAndGetProtocolNamespace(key: String, umsStr: String): (String,String) = {
+ def checkAndGetProtocolNamespace(key: String, umsStr: String): (String, String) = {
if (key == null || key.trim.isEmpty) {
val protocolType = getProtocolTypeFromUms(umsStr)
val namespace = getFieldContentFromJson(umsStr, "namespace")
if (protocolType == null)
- (UmsProtocolType.DATA_INCREMENT_DATA.toString , getFieldContentFromJson(umsStr, "namespace"))
- else (protocolType , namespace)
+ (UmsProtocolType.DATA_INCREMENT_DATA.toString, getFieldContentFromJson(umsStr, "namespace"))
+ else (protocolType, namespace)
} else {
val typeNamespace = UmsCommonUtils.getTypeNamespaceFromKafkaKey(key)
- (typeNamespace._1.toString,typeNamespace._2)
+ (typeNamespace._1.toString, typeNamespace._2)
}
}
}