Queue<Runnable> queue = entry.getValue();
if (queue == null)
continue;
Runnable task = this.peek(queue);
flag = flag ? flag : (queue.size() - 1 > 0);
if (task == null)
continue;
try {
this.threadPool.execute(task);
this.poll(queue);
c++;
} catch (RejectedExecutionException e) {
if (this.logger.isDebugEnabled())
this.logger.debug(e);
break;
}
pending client maybe offline or broken, add following clean logic
if(isOffline(entry.getKey)){
drop();
}
pending client maybe offline or broken, add following clean logic