diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java index a03f6d6aea..dd05e23cc2 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/CmdletManager.java @@ -64,13 +64,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -215,6 +213,13 @@ private void reloadCmdletsInDB() throws IOException{ cmdletInfos = metaStore.getCmdlets(CmdletState.DISPATCHED); if (cmdletInfos != null && cmdletInfos.size() != 0) { for (CmdletInfo cmdletInfo : cmdletInfos) { + //track reload cmdlets + CmdletDescriptor cmdletDescriptor = + CmdletDescriptor.fromCmdletString(cmdletInfo.getParameters()); + cmdletDescriptor.setRuleId(cmdletInfo.getRid()); + if (!tracker.contains(cmdletDescriptor)) { + tracker.track(cmdletInfo.getCid(), cmdletDescriptor); + } List actionInfos = getActions(cmdletInfo.getAids()); for (ActionInfo actionInfo: actionInfos) { actionInfo.setCreateTime(cmdletInfo.getGenerateTime()); @@ -227,6 +232,12 @@ private void reloadCmdletsInDB() throws IOException{ cmdletInfos = metaStore.getCmdlets(CmdletState.PENDING); if (cmdletInfos != null && cmdletInfos.size() != 0) { for (CmdletInfo cmdletInfo : cmdletInfos) { + CmdletDescriptor cmdletDescriptor = + CmdletDescriptor.fromCmdletString(cmdletInfo.getParameters()); + cmdletDescriptor.setRuleId(cmdletInfo.getRid()); + if (!tracker.contains(cmdletDescriptor)) { + tracker.track(cmdletInfo.getCid(), cmdletDescriptor); + } LOG.debug(String.format("Reload pending cmdlet: {}", cmdletInfo)); List actionInfos = getActions(cmdletInfo.getAids()); syncCmdAction(cmdletInfo, actionInfos); @@ -235,6 +246,9 @@ private void reloadCmdletsInDB() throws IOException{ } catch (MetaStoreException e) { LOG.error("DB connection error occurs when ssm is reloading cmdlets!"); return; + } catch (ParseException ie) { + LOG.error("Cmdlet parameters format is not correct when ssm is" + + "reloading cmdlets!", ie); } } @@ -478,7 +492,12 @@ private void batchSyncCmdAction() throws Exception { for (CmdletInfo cmdletInfo : cmdletFinished) { idToCmdlets.remove(cmdletInfo.getCid()); - tracker.untrack(cmdletInfo.getCid()); + try { + tracker.untrack(cmdletInfo.getCid()); + } catch (Exception e) { + LOG.warn("Untracking cmdlet failed.", e); + } + for (Long aid : cmdletInfo.getAids()) { idToActions.remove(aid); } @@ -489,9 +508,9 @@ private void batchSyncCmdAction() throws Exception { LOG.debug("Number of cmds {} to submit", cmdletInfos.size()); try { metaStore.insertActions( - actionInfos.toArray(new ActionInfo[actionInfos.size()])); + actionInfos.toArray(new ActionInfo[actionInfos.size()])); metaStore.insertCmdlets( - cmdletInfos.toArray(new CmdletInfo[cmdletInfos.size()])); + cmdletInfos.toArray(new CmdletInfo[cmdletInfos.size()])); } catch (MetaStoreException e) { LOG.error("CmdletIds -> [ {} ], submit to DB error", cmdletInfos, e); } @@ -1364,8 +1383,6 @@ private class DetectFailedActionTask implements Runnable { public void run() { try { - Set failedCmdlet = new HashSet<>(); - Set succeededCmdlet = new HashSet<>(); List cids = new ArrayList<>(); cids.addAll(idToLaunchCmdlet.keySet()); for (Long cid : cids) { @@ -1383,29 +1400,19 @@ public void run() { // For timeout action, speculate its status and set result // if needed. if (isSuccessfulBySpeculation(actionInfo)) { - succeededCmdlet.add(cmdletInfo); ActionStatus actionStatus = ActionStatusFactory.createSuccessActionStatus( cmdletInfo, actionInfo); - onActionStatusUpdate(actionStatus); + onStatusUpdate(actionStatus); } else { - failedCmdlet.add(cmdletInfo); ActionStatus actionStatus = ActionStatusFactory.createTimeoutActionStatus( cmdletInfo, actionInfo); - onActionStatusUpdate(actionStatus); + onStatusUpdate(actionStatus); } } } } - for (CmdletInfo cmdletInfo: failedCmdlet) { - cmdletInfo.setState(CmdletState.FAILED); - cmdletFinished(cmdletInfo.getCid()); - } - for (CmdletInfo cmdletInfo: succeededCmdlet) { - cmdletInfo.setState(CmdletState.DONE); - cmdletFinished(cmdletInfo.getCid()); - } } catch (ActionException e) { LOG.error(e.getMessage()); } catch (IOException e) { diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CompressionScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CompressionScheduler.java index 05b5ea914e..f3868b8f44 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CompressionScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CompressionScheduler.java @@ -335,9 +335,8 @@ public void takeOverAccessCount(ActionInfo actionInfo) { long newFid = dfsClient.getFileInfo(filePath).getFileId(); metaStore.updateAccessCountTableFid(oldFid, newFid); } catch (Exception e) { - LOG.warn("Failed to take over file access count for all tables, " + - "which may make the measurement for data temperature inaccurate!", - e.getMessage()); + LOG.warn("Failed to take over file access count, which can make the " + + "measure for data temperature inaccurate!", e); } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java index 418ffaa95a..46018f126a 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/ErasureCodingScheduler.java @@ -324,9 +324,8 @@ public void takeOverAccessCount(ActionInfo actionInfo) { long newFid = dfsClient.getFileInfo(filePath).getFileId(); metaStore.updateAccessCountTableFid(oldFid, newFid); } catch (Exception e) { - LOG.warn("Failed to take over file access count for all tables, " + - "which may make the measurement for data temperature inaccurate!", - e.getMessage()); + LOG.warn("Failed to take over file access count, which can make the " + + "measure for data temperature inaccurate!", e); } }