@@ -86,10 +86,9 @@ class TaskMonitorService extends Logging {
8686 streamTasks.filter(shouldMonitor).foreach { streamTask =>
8787 val job = streamJobMapper.getJobById(streamTask.getJobId)
8888 if (! JobConf .SUPPORTED_MANAGEMENT_JOB_TYPES .getValue.contains(job.getJobType)) {
89- val userList = Sets .newHashSet(job.getSubmitUser, job.getCreateBy)
90- userList.addAll(getAlertUsers(job))
89+ val userList = getAlertUsers(job)
9190 val alertMsg = s " Spark Streaming应用[ ${job.getName}]已经超过 ${Utils .msDurationToString(System .currentTimeMillis - streamTask.getLastUpdateTime.getTime)} 没有更新状态, 请及时确认应用是否正常! "
92- alert(jobService.getAlertLevel(job), alertMsg, new util. ArrayList [ String ]( userList) , streamTask)
91+ alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask)
9392 } else {
9493 streamTask.setLastUpdateTime(new Date )
9594 streamTaskMapper.updateTask(streamTask)
@@ -110,7 +109,6 @@ class TaskMonitorService extends Logging {
110109 } else {
111110 // 连续三次还是出现异常,说明Linkis的Manager已经不能正常提供服务,告警并不再尝试获取状态,等待下次尝试
112111 val users = getAlertUsers(job)
113- users.add(job.getCreateBy)
114112 alert(jobService.getAlertLevel(job), s " 请求LinkisManager失败,Linkis集群出现异常,请关注!影响任务[ ${job.getName}] " , users, streamTask)
115113 }
116114 }
@@ -138,9 +136,8 @@ class TaskMonitorService extends Logging {
138136 }
139137 case _ =>
140138 }
141- val userList = Sets .newHashSet(job.getSubmitUser, job.getCreateBy)
142- userList.addAll(getAlertUsers(job))
143- alert(jobService.getAlertLevel(job), alertMsg, new util.ArrayList [String ](userList), streamTask)
139+ val userList = getAlertUsers(job)
140+ alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask)
144141 }
145142 }
146143 }
@@ -159,12 +156,19 @@ class TaskMonitorService extends Logging {
159156 }
160157
161158 protected def getAlertUsers (job : StreamJob ): util.List [String ] = {
162- var users = jobService.getAlertUsers(job)
163- if (users == null ) {
164- users = new util.ArrayList [String ]()
159+ val allUsers = new util.LinkedHashSet [String ]()
160+ val alertUsers = jobService.getAlertUsers(job)
161+ if (alertUsers!= null ) {
162+ alertUsers.foreach(user => {
163+ allUsers.add(user)
164+ })
165165 }
166- users.addAll(util.Arrays .asList(JobConf .STREAMIS_DEVELOPER .getValue.split(" ," ):_* ))
167- users
166+ allUsers.add(job.getSubmitUser)
167+ allUsers.add(job.getCreateBy)
168+ util.Arrays .asList(JobConf .STREAMIS_DEVELOPER .getValue.split(" ," ):_* ).foreach(user => {
169+ allUsers.add(user)
170+ })
171+ new util.ArrayList [String ](allUsers)
168172 }
169173
170174 protected def alert (alertLevel : AlertLevel , alertMsg : String , users : util.List [String ], streamTask: StreamTask ): Unit = alerters.foreach{ alerter =>
0 commit comments