From 8f1178c329bc6f8a59c792fd9f599c58c781db2c Mon Sep 17 00:00:00 2001
From: Pavel Berezhnoy
Date: Thu, 24 Sep 2015 20:19:30 +0300
Subject: [PATCH] Rima.lua: fast tasks statistics added
---
rima.lua | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 115 insertions(+)
diff --git a/rima.lua b/rima.lua
index 7431fa5..66d00f1 100644
--- a/rima.lua
+++ b/rima.lua
@@ -30,8 +30,13 @@
-- Index 0: TREE { task_id }
-- Index 1: TREE { key }
--
+-- Space 4: Fast tasks monitoring
+-- Tuple: { key (NUM), added_tasks_count (NUM), deleted_tasks_count (NUM) }
+-- Index 0: TREE { key }
+--
local EXPIRATION_TIME = 30 * 60 -- seconds
+local FAST_TASKS_STAT_EXP_TIME = 60 * 60 * 24 * 365 -- seconds (1 year)
--
-- Put task to the queue.
@@ -72,11 +77,120 @@ function rima_put_sync(key, data, prio)
return rima_put_impl(key, data, prio, box.time())
end
+--
+-- Get a key for fast tasks monitoring
+--
+local function rima_get_monitoring_key(t)
+ if t == nil or t == 0 then
+ t = box.time()
+ end
+ return t - (t % 60)
+end
+
+--
+-- Increment fast tasks count
+--
+local function rima_inc_fast_tasks_count()
+ local key = rima_get_monitoring_key()
+ local update_result = box.update(4, key, '+p', 1, 1)
+
+ if update_result == nil then
+ -- no stat for current key
+ box.insert(4, key, 1, 0)
+ end
+
+ -- increment total number of fast tasks in queue
+ update_result = box.update(4, 0, '+p', 1, 1)
+ if update_result == nil then
+ box.insert(4, 0, 1)
+ end
+end
+
+--
+-- Decrement fast tasks count
+--
+local function rima_dec_fast_tasks_count()
+ local key = rima_get_monitoring_key()
+ local update_result = box.update(4, key, '+p', 2, 1)
+
+ if update_result == nil then
+ -- no stat for current key
+ box.insert(4, key, 0, 1)
+ end
+
+ -- decrement total number of fast tasks
+ box.update(4, 0, '-p', 1, 1) -- ignore case when no tasks was in queue
+end
+
+--
+-- Remove old records from tarantool
+--
+local function rima_clear_expired_stat()
+ local cur_key = rima_get_monitoring_key(box.time())
+ local expired_key = cur_key - FAST_TASKS_STAT_EXP_TIME
+ local iter = box.space[4].index[0]:iterator(box.index.LE, expired_key)
+
+ for tuple in iter do
+ box.delete(4, tuple[0])
+ end
+end
+
+--
+-- Get statistic from monitoring for given time interval
+--
+function rima_get_monitoring_statistics(start_time, end_time)
+ if start_time ~= nil then
+ start_time = box.unpack('i', start_time)
+ end
+ if end_time ~= nil then
+ end_time = box.unpack('i', end_time)
+ end
+
+ rima_clear_expired_stat()
+
+ if start_time == nil or start_time == 0 then
+ -- request all statistics, but 0 is global tasks counter
+ start_time = 1
+ else
+ start_time = rima_get_monitoring_key(start_time)
+ end
+
+ end_time = rima_get_monitoring_key(end_time)
+ if end_time < start_time then
+ end_time, start_time = start_time, end_time
+ end
+
+ local tuple = box.select(4, 0, 0)
+ local currently_in_queue = 0
+ if tuple ~= nil then
+ currently_in_queue = box.unpack('i', tuple[1])
+ end
+
+ local total_added, total_deleted = 0, 0
+ local iter = box.space[4].index[0]:iterator(box.index.GE, start_time)
+ for tuple in iter do
+ if box.unpack('i', tuple[0]) > end_time then
+ break
+ end
+
+ total_added = total_added + box.unpack('i', tuple[1])
+ total_deleted = total_deleted + box.unpack('i', tuple[2])
+ end
+
+ -- string will be returned
+ return box.cjson.encode({
+ fast_tasks_in_queue = currently_in_queue,
+ fast_tasks_added = total_added,
+ fast_tasks_deleted = total_deleted,
+ })
+end
+
--
-- Put fetch single mail task to the queue.
--
function rima_put_fetchmail(key, data)
box.auto_increment(3, key, data, box.time())
+ rima_inc_fast_tasks_count()
end
local function get_prio_key(prio, source)
@@ -132,6 +246,7 @@ function rima_get_fetchmail()
local tuples = { box.select_limit(3, 1, 0, 1000, key) }
for _, tuple in pairs(tuples) do
tuple = box.delete(3, box.unpack('l', tuple[0]))
+ rima_dec_fast_tasks_count()
if tuple ~= nil then
table.insert(result, { box.unpack('i', tuple[3]), tuple[2] })
n = 1