Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -215,6 +213,13 @@ private void reloadCmdletsInDB() throws IOException{
cmdletInfos = metaStore.getCmdlets(CmdletState.DISPATCHED);
if (cmdletInfos != null && cmdletInfos.size() != 0) {
for (CmdletInfo cmdletInfo : cmdletInfos) {
//track reload cmdlets
CmdletDescriptor cmdletDescriptor =
CmdletDescriptor.fromCmdletString(cmdletInfo.getParameters());
cmdletDescriptor.setRuleId(cmdletInfo.getRid());
if (!tracker.contains(cmdletDescriptor)) {
tracker.track(cmdletInfo.getCid(), cmdletDescriptor);
}
List<ActionInfo> actionInfos = getActions(cmdletInfo.getAids());
for (ActionInfo actionInfo: actionInfos) {
actionInfo.setCreateTime(cmdletInfo.getGenerateTime());
Expand All @@ -227,6 +232,12 @@ private void reloadCmdletsInDB() throws IOException{
cmdletInfos = metaStore.getCmdlets(CmdletState.PENDING);
if (cmdletInfos != null && cmdletInfos.size() != 0) {
for (CmdletInfo cmdletInfo : cmdletInfos) {
CmdletDescriptor cmdletDescriptor =
CmdletDescriptor.fromCmdletString(cmdletInfo.getParameters());
cmdletDescriptor.setRuleId(cmdletInfo.getRid());
if (!tracker.contains(cmdletDescriptor)) {
tracker.track(cmdletInfo.getCid(), cmdletDescriptor);
}
LOG.debug(String.format("Reload pending cmdlet: {}", cmdletInfo));
List<ActionInfo> actionInfos = getActions(cmdletInfo.getAids());
syncCmdAction(cmdletInfo, actionInfos);
Expand All @@ -235,6 +246,9 @@ private void reloadCmdletsInDB() throws IOException{
} catch (MetaStoreException e) {
LOG.error("DB connection error occurs when ssm is reloading cmdlets!");
return;
} catch (ParseException ie) {
LOG.error("Cmdlet parameters format is not correct when ssm is"
+ "reloading cmdlets!", ie);
}
}

Expand Down Expand Up @@ -478,7 +492,12 @@ private void batchSyncCmdAction() throws Exception {

for (CmdletInfo cmdletInfo : cmdletFinished) {
idToCmdlets.remove(cmdletInfo.getCid());
tracker.untrack(cmdletInfo.getCid());
try {
tracker.untrack(cmdletInfo.getCid());
} catch (Exception e) {
LOG.warn("Untracking cmdlet failed.", e);
}

for (Long aid : cmdletInfo.getAids()) {
idToActions.remove(aid);
}
Expand All @@ -489,9 +508,9 @@ private void batchSyncCmdAction() throws Exception {
LOG.debug("Number of cmds {} to submit", cmdletInfos.size());
try {
metaStore.insertActions(
actionInfos.toArray(new ActionInfo[actionInfos.size()]));
actionInfos.toArray(new ActionInfo[actionInfos.size()]));
metaStore.insertCmdlets(
cmdletInfos.toArray(new CmdletInfo[cmdletInfos.size()]));
cmdletInfos.toArray(new CmdletInfo[cmdletInfos.size()]));
} catch (MetaStoreException e) {
LOG.error("CmdletIds -> [ {} ], submit to DB error", cmdletInfos, e);
}
Expand Down Expand Up @@ -1364,8 +1383,6 @@ private class DetectFailedActionTask implements Runnable {

public void run() {
try {
Set<CmdletInfo> failedCmdlet = new HashSet<>();
Set<CmdletInfo> succeededCmdlet = new HashSet<>();
List<Long> cids = new ArrayList<>();
cids.addAll(idToLaunchCmdlet.keySet());
for (Long cid : cids) {
Expand All @@ -1383,29 +1400,19 @@ public void run() {
// For timeout action, speculate its status and set result
// if needed.
if (isSuccessfulBySpeculation(actionInfo)) {
succeededCmdlet.add(cmdletInfo);
ActionStatus actionStatus =
ActionStatusFactory.createSuccessActionStatus(
cmdletInfo, actionInfo);
onActionStatusUpdate(actionStatus);
onStatusUpdate(actionStatus);
} else {
failedCmdlet.add(cmdletInfo);
ActionStatus actionStatus =
ActionStatusFactory.createTimeoutActionStatus(
cmdletInfo, actionInfo);
onActionStatusUpdate(actionStatus);
onStatusUpdate(actionStatus);
}
}
}
}
for (CmdletInfo cmdletInfo: failedCmdlet) {
cmdletInfo.setState(CmdletState.FAILED);
cmdletFinished(cmdletInfo.getCid());
}
for (CmdletInfo cmdletInfo: succeededCmdlet) {
cmdletInfo.setState(CmdletState.DONE);
cmdletFinished(cmdletInfo.getCid());
}
} catch (ActionException e) {
LOG.error(e.getMessage());
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,8 @@ public void takeOverAccessCount(ActionInfo actionInfo) {
long newFid = dfsClient.getFileInfo(filePath).getFileId();
metaStore.updateAccessCountTableFid(oldFid, newFid);
} catch (Exception e) {
LOG.warn("Failed to take over file access count for all tables, " +
"which may make the measurement for data temperature inaccurate!",
e.getMessage());
LOG.warn("Failed to take over file access count, which can make the " +
"measure for data temperature inaccurate!", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,8 @@ public void takeOverAccessCount(ActionInfo actionInfo) {
long newFid = dfsClient.getFileInfo(filePath).getFileId();
metaStore.updateAccessCountTableFid(oldFid, newFid);
} catch (Exception e) {
LOG.warn("Failed to take over file access count for all tables, " +
"which may make the measurement for data temperature inaccurate!",
e.getMessage());
LOG.warn("Failed to take over file access count, which can make the " +
"measure for data temperature inaccurate!", e);
}
}

Expand Down