Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,20 +213,23 @@ public void processRequest(Request request) {
switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
incrementOpCount(ServerMetrics.getMetrics().PING_OP_COUNT);
updateStats(request, lastOp, lastZxid);

responseSize = cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
return;
}
case OpCode.createSession: {
lastOp = "SESS";
incrementOpCount(ServerMetrics.getMetrics().CREATE_SESSION_OP_COUNT);
updateStats(request, lastOp, lastZxid);

zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
incrementOpCount(ServerMetrics.getMetrics().MULTI_OP_COUNT);
rsp = new MultiResponse();

for (ProcessTxnResult subTxnResult : rc.multiResult) {
Expand Down Expand Up @@ -269,6 +272,7 @@ public void processRequest(Request request) {
}
case OpCode.multiRead: {
lastOp = "MLTR";
incrementOpCount(ServerMetrics.getMetrics().MULTI_READ_OP_COUNT);
MultiOperationRecord multiReadRecord = request.readRequestRecord(MultiOperationRecord::new);
rsp = new MultiResponse();
OpResult subResult;
Expand Down Expand Up @@ -297,6 +301,7 @@ public void processRequest(Request request) {
}
case OpCode.create: {
lastOp = "CREA";
incrementOpCount(ServerMetrics.getMetrics().CREATE_OP_COUNT);
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
Expand All @@ -306,6 +311,7 @@ public void processRequest(Request request) {
case OpCode.createTTL:
case OpCode.createContainer: {
lastOp = "CREA";
incrementOpCount(ServerMetrics.getMetrics().CREATE_OP_COUNT);
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
Expand All @@ -314,19 +320,22 @@ public void processRequest(Request request) {
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
incrementOpCount(ServerMetrics.getMetrics().DELETE_OP_COUNT);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setData: {
lastOp = "SETD";
incrementOpCount(ServerMetrics.getMetrics().SET_DATA_OP_COUNT);
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig: {
lastOp = "RECO";
incrementOpCount(ServerMetrics.getMetrics().RECONFIG_OP_COUNT);
rsp = new GetDataResponse(
((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(UTF_8),
rc.stat);
Expand All @@ -335,31 +344,36 @@ public void processRequest(Request request) {
}
case OpCode.setACL: {
lastOp = "SETA";
incrementOpCount(ServerMetrics.getMetrics().SET_ACL_OP_COUNT);
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
incrementOpCount(ServerMetrics.getMetrics().CLOSE_SESSION_OP_COUNT);
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
incrementOpCount(ServerMetrics.getMetrics().SYNC_OP_COUNT);
SyncRequest syncRequest = request.readRequestRecord(SyncRequest::new);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
incrementOpCount(ServerMetrics.getMetrics().CHECK_OP_COUNT);
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
incrementOpCount(ServerMetrics.getMetrics().EXISTS_OP_COUNT);
ExistsRequest existsRequest = request.readRequestRecord(ExistsRequest::new);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
Expand All @@ -382,6 +396,7 @@ public void processRequest(Request request) {
}
case OpCode.getData: {
lastOp = "GETD";
incrementOpCount(ServerMetrics.getMetrics().GET_DATA_OP_COUNT);
GetDataRequest getDataRequest = request.readRequestRecord(GetDataRequest::new);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
Expand All @@ -390,6 +405,7 @@ public void processRequest(Request request) {
}
case OpCode.setWatches: {
lastOp = "SETW";
incrementOpCount(ServerMetrics.getMetrics().SET_WATCHES_OP_COUNT);
SetWatches setWatches = request.readRequestRecord(SetWatches::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
Expand All @@ -405,6 +421,7 @@ public void processRequest(Request request) {
}
case OpCode.setWatches2: {
lastOp = "STW2";
incrementOpCount(ServerMetrics.getMetrics().SET_WATCHES_OP_COUNT);
SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
Expand All @@ -418,13 +435,15 @@ public void processRequest(Request request) {
}
case OpCode.addWatch: {
lastOp = "ADDW";
incrementOpCount(ServerMetrics.getMetrics().ADD_WATCH_OP_COUNT);
AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
incrementOpCount(ServerMetrics.getMetrics().GET_ACL_OP_COUNT);
GetACLRequest getACLRequest = request.readRequestRecord(GetACLRequest::new);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
Expand Down Expand Up @@ -467,6 +486,7 @@ public void processRequest(Request request) {
}
case OpCode.getChildren: {
lastOp = "GETC";
incrementOpCount(ServerMetrics.getMetrics().GET_CHILDREN_OP_COUNT);
GetChildrenRequest getChildrenRequest = request.readRequestRecord(GetChildrenRequest::new);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
Expand All @@ -475,6 +495,7 @@ public void processRequest(Request request) {
}
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
incrementOpCount(ServerMetrics.getMetrics().GET_ALL_CHILDREN_NUMBER_OP_COUNT);
GetAllChildrenNumberRequest getAllChildrenNumberRequest = request.readRequestRecord(GetAllChildrenNumberRequest::new);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
Expand All @@ -494,6 +515,7 @@ public void processRequest(Request request) {
}
case OpCode.getChildren2: {
lastOp = "GETC";
incrementOpCount(ServerMetrics.getMetrics().GET_CHILDREN_OP_COUNT);
GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
Expand All @@ -515,6 +537,7 @@ public void processRequest(Request request) {
}
case OpCode.checkWatches: {
lastOp = "CHKW";
incrementOpCount(ServerMetrics.getMetrics().CHECK_WATCHES_OP_COUNT);
CheckWatchesRequest checkWatches = request.readRequestRecord(CheckWatchesRequest::new);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
Expand All @@ -528,6 +551,7 @@ public void processRequest(Request request) {
}
case OpCode.removeWatches: {
lastOp = "REMW";
incrementOpCount(ServerMetrics.getMetrics().REMOVE_WATCHES_OP_COUNT);
RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
Expand All @@ -541,11 +565,13 @@ public void processRequest(Request request) {
}
case OpCode.whoAmI: {
lastOp = "HOMI";
incrementOpCount(ServerMetrics.getMetrics().WHO_AM_I_OP_COUNT);
rsp = new WhoAmIResponse(AuthUtil.getClientInfos(request.authInfo));
break;
}
case OpCode.getEphemerals: {
lastOp = "GETE";
incrementOpCount(ServerMetrics.getMetrics().GET_EPHEMERALS_OP_COUNT);
GetEphemeralsRequest getEphemerals = request.readRequestRecord(GetEphemeralsRequest::new);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
Expand Down Expand Up @@ -677,4 +703,9 @@ private void updateStats(Request request, String lastOp, long lastZxid) {
request.cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, currentTime);
}

private void incrementOpCount(org.apache.zookeeper.metrics.Counter specificCounter) {
ServerMetrics metrics = ServerMetrics.getMetrics();
specificCounter.add(1);
metrics.TOTAL_OP_COUNT.add(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,36 @@ private ServerMetrics(MetricsProvider metricsProvider) {
QUOTA_EXCEEDED_ERROR_PER_NAMESPACE = metricsContext.getCounterSet(QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE);

TTL_NODE_DELETED_COUNT = metricsContext.getCounter("ttl_node_deleted_count");

/**
* Operation count metrics
*/
TOTAL_OP_COUNT = metricsContext.getCounter("total_op_count");
PING_OP_COUNT = metricsContext.getCounter("ping_op_count");
CREATE_SESSION_OP_COUNT = metricsContext.getCounter("create_session_op_count");
CLOSE_SESSION_OP_COUNT = metricsContext.getCounter("close_session_op_count");
MULTI_OP_COUNT = metricsContext.getCounter("multi_op_count");
CREATE_OP_COUNT = metricsContext.getCounter("create_op_count");
DELETE_OP_COUNT = metricsContext.getCounter("delete_op_count");
SET_DATA_OP_COUNT = metricsContext.getCounter("set_data_op_count");
EXISTS_OP_COUNT = metricsContext.getCounter("exists_op_count");
GET_DATA_OP_COUNT = metricsContext.getCounter("get_data_op_count");
GET_CHILDREN_OP_COUNT = metricsContext.getCounter("get_children_op_count");
SYNC_OP_COUNT = metricsContext.getCounter("sync_op_count");
GET_ACL_OP_COUNT = metricsContext.getCounter("get_acl_op_count");
SET_ACL_OP_COUNT = metricsContext.getCounter("set_acl_op_count");
CHECK_OP_COUNT = metricsContext.getCounter("check_op_count");
SET_WATCHES_OP_COUNT = metricsContext.getCounter("set_watches_op_count");
MULTI_READ_OP_COUNT = metricsContext.getCounter("multi_read_op_count");
RECONFIG_OP_COUNT = metricsContext.getCounter("reconfig_op_count");
ADD_WATCH_OP_COUNT = metricsContext.getCounter("add_watch_op_count");
CHECK_WATCHES_OP_COUNT = metricsContext.getCounter("check_watches_op_count");
REMOVE_WATCHES_OP_COUNT = metricsContext.getCounter("remove_watches_op_count");
WHO_AM_I_OP_COUNT = metricsContext.getCounter("who_am_i_op_count");
GET_EPHEMERALS_OP_COUNT = metricsContext.getCounter("get_ephemerals_op_count");
GET_ALL_CHILDREN_NUMBER_OP_COUNT = metricsContext.getCounter("get_all_children_number_op_count");
SASL_OP_COUNT = metricsContext.getCounter("sasl_op_count");
AUTH_OP_COUNT = metricsContext.getCounter("auth_op_count");
}

/**
Expand Down Expand Up @@ -554,6 +584,36 @@ private ServerMetrics(MetricsProvider metricsProvider) {
*/
public final Counter TTL_NODE_DELETED_COUNT;

/**
* Operation count metrics
*/
public final Counter TOTAL_OP_COUNT;
public final Counter PING_OP_COUNT;
public final Counter CREATE_SESSION_OP_COUNT;
public final Counter CLOSE_SESSION_OP_COUNT;
public final Counter MULTI_OP_COUNT;
public final Counter CREATE_OP_COUNT;
public final Counter DELETE_OP_COUNT;
public final Counter SET_DATA_OP_COUNT;
public final Counter EXISTS_OP_COUNT;
public final Counter GET_DATA_OP_COUNT;
public final Counter GET_CHILDREN_OP_COUNT;
public final Counter SYNC_OP_COUNT;
public final Counter GET_ACL_OP_COUNT;
public final Counter SET_ACL_OP_COUNT;
public final Counter CHECK_OP_COUNT;
public final Counter SET_WATCHES_OP_COUNT;
public final Counter MULTI_READ_OP_COUNT;
public final Counter RECONFIG_OP_COUNT;
public final Counter ADD_WATCH_OP_COUNT;
public final Counter CHECK_WATCHES_OP_COUNT;
public final Counter REMOVE_WATCHES_OP_COUNT;
public final Counter WHO_AM_I_OP_COUNT;
public final Counter GET_EPHEMERALS_OP_COUNT;
public final Counter GET_ALL_CHILDREN_NUMBER_OP_COUNT;
public final Counter SASL_OP_COUNT;
public final Counter AUTH_OP_COUNT;

private final MetricsProvider metricsProvider;

public void resetAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,7 @@ public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord reques

if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
ServerMetrics.getMetrics().AUTH_OP_COUNT.add(1);
AuthPacket authPacket = request.readRecord(AuthPacket::new);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Expand Down Expand Up @@ -1737,6 +1738,7 @@ public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord reques
}
return;
} else if (h.getType() == OpCode.sasl) {
ServerMetrics.getMetrics().SASL_OP_COUNT.add(1);
processSasl(request, cnxn, h);
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
Expand Down
Loading