diff --git a/lib/cli/local/index.js b/lib/cli/local/index.js index 5acc202e36..83e57c0b94 100644 --- a/lib/cli/local/index.js +++ b/lib/cli/local/index.js @@ -302,6 +302,16 @@ export const builder = function(yargs) { type: 'boolean', default: true }) + .option('time-to-device-cleanup', { + describe: 'Time in seconds after which connected devices should be deleted. 0 - do not delete', + type: 'number', + default: 0 + }) + .option('device-cleanup-interval', { + describe: 'Interval for checking devices for cleanup in minutes', + type: 'number', + default: 120_000 + }) .epilog('Each option can be be overwritten with an environment variable ' + 'by converting the option to uppercase, replacing dashes with ' + 'underscores and prefixing it with `STF_LOCAL_` (e.g. ' + @@ -343,7 +353,9 @@ export const handler = function(argv) { [ // reaper one 'reaper', 'reaper001', '--connect-push', argv.bindDevPull, - '--connect-sub', argv.bindDevPub + '--connect-sub', argv.bindDevPub, + '--time-to-device-cleanup', argv.timeToDeviceCleanup, + '--device-cleanup-interval', argv.deviceCleanupInterval ], [ // provider 'provider', diff --git a/lib/cli/reaper/index.js b/lib/cli/reaper/index.js index 651889b528..b741bc2214 100644 --- a/lib/cli/reaper/index.js +++ b/lib/cli/reaper/index.js @@ -30,6 +30,16 @@ export const builder = function(yargs) { type: 'string', default: os.hostname() }) + .option('time-to-device-cleanup', { + describe: 'Time in minutes after which connected devices should be deleted. 0 - do not delete', + type: 'number', + default: 0 + }) + .option('device-cleanup-interval', { + describe: 'Interval for checking devices for cleanup in minutes', + type: 'number', + default: 2 + }) .epilog('Each option can be be overwritten with an environment variable ' + 'by converting the option to uppercase, replacing dashes with ' + 'underscores and prefixing it with `STF_REAPER_` (e.g. ' + @@ -39,6 +49,8 @@ export const handler = function(argv) { return reaper({ name: argv.name, heartbeatTimeout: argv.heartbeatTimeout, + timeToDeviceCleanup: argv.timeToDeviceCleanup, + deviceCleanupInterval: argv.deviceCleanupInterval, endpoints: { push: argv.connectPush, sub: argv.connectSub diff --git a/lib/db/models/all/model.js b/lib/db/models/all/model.js index 1082680ac9..0dd60610b3 100644 --- a/lib/db/models/all/model.js +++ b/lib/db/models/all/model.js @@ -9,7 +9,7 @@ import {v4 as uuidv4} from 'uuid' import * as apiutil from '../../../util/apiutil.js' import GroupModel from '../group/index.js' import UserModel from '../user/index.js' - +import DeviceModel from '../device/index.js' import logger from '../../../util/logger.js' import {getRootGroup, getGroup} from '../group/model.js' @@ -887,16 +887,6 @@ export const loadStandardDevices = function(groups, fields) { }, fields) } -// dbapi.loadPresentDevices = function() { -export const loadPresentDevices = function() { - return db.devices.find({present: true}).toArray() -} - -// dbapi.loadDeviceBySerial = function(serial) { -export const loadDeviceBySerial = function(serial) { - return findDevice({serial: serial}) -} - // dbapi.loadDevice = function(groups, serial) { export const loadDevice = function(groups, serial) { return findDevice({ @@ -1186,7 +1176,7 @@ export const setAbsentDisconnectedDevices = function() { // dbapi.getInstalledApplications = function(message) { export const getInstalledApplications = function(message) { - return loadDeviceBySerial(message.serial) + return DeviceModel.loadDeviceBySerial(message.serial) } // dbapi.setDeviceType = function(serial, type) { diff --git a/lib/db/models/device/model.js b/lib/db/models/device/model.js index ef29f27e49..b00c715014 100644 --- a/lib/db/models/device/model.js +++ b/lib/db/models/device/model.js @@ -32,6 +32,10 @@ export const loadPresentDevices = function() { return db.devices.find({present: true}).toArray() } +export const loadDeviceBySerial = function(serial) { + return db.devices.findOne({serial}) +} + // dbapi.loadDevicesBySerials = function(serials) { export const loadDevicesBySerials = function(serials) { return db.devices.find({serial: {$in: serials}}).toArray() @@ -97,6 +101,15 @@ export const getDeviceType = function(serial) { }) } +export const getDeadDevice = function(time = 60_000) { + return db.devices.find( + { + present: false, + presenceChangedAt: {$lt: new Date(Date.now() - time)} + } + ).project({serial: 1, present: 1, presenceChangedAt: 1}).toArray() +} + // dbapi.generateIndexes = function() { export const generateIndexes = function() { db.devices.createIndex({serial: -1}).then((result) => { @@ -111,8 +124,6 @@ export const generateIndexes = function() { */ - - // dbapi.deleteDevice = function(serial) { export const deleteDevice = function(serial) { return db.devices.deleteOne({serial: serial}) diff --git a/lib/db/models/group/model.js b/lib/db/models/group/model.js index a68fd84eb8..045093bc58 100644 --- a/lib/db/models/group/model.js +++ b/lib/db/models/group/model.js @@ -568,7 +568,7 @@ export const deleteUserGroup = async(id) => { await db.groups.deleteOne({id}) await Promise.all( - group?.users.map(email => db.groups.updateOne( + group?.users.map(email => db.users.updateOne( {email} , { $pull: {'groups.subscribed': id} diff --git a/lib/units/processor/index.ts b/lib/units/processor/index.ts index e98df9ab83..0e4578293b 100644 --- a/lib/units/processor/index.ts +++ b/lib/units/processor/index.ts @@ -9,6 +9,7 @@ import lifecycle from '../../util/lifecycle.js' import srv from '../../util/srv.js' import * as zmqutil from '../../util/zmqutil.js' import UserModel from '../../db/models/user/index.js' +import DeviceModel from '../../db/models/device/index.js' import { UserChangeMessage, GroupChangeMessage, @@ -53,8 +54,9 @@ import { DeleteDevice, SetAbsentDisconnectedDevices, GetServicesAvailabilityMessage, - DeviceRegisteredMessage, GetPresentDevices, DeviceGetIsInOrigin + DeviceRegisteredMessage, GetPresentDevices, DeviceGetIsInOrigin, GetDeadDevices } from '../../wire/wire.js' +import {getDeadDevice} from "../../db/models/device/model.js"; interface Options { name: string @@ -228,8 +230,8 @@ export default db.ensureConnectivity(async(options: Options) => { appDealer.send([channel, data]) }) .on(DeviceGetIsInOrigin, async (channel, message) => { - const device = await dbapi.loadDeviceBySerial(message.serial) - const isInOrigin = device.group.id === device.group.origin + const device = await DeviceModel.loadDeviceBySerial(message.serial) + const isInOrigin = device ? device.group.id === device.group.origin : false devDealer.send([ channel, reply.okay('success', {isInOrigin}) @@ -289,7 +291,7 @@ export default db.ensureConnectivity(async(options: Options) => { appDealer.send([channel, data]) }) .on(GetPresentDevices, async (channel, message, data) => { - const devices = await dbapi.loadPresentDevices() + const devices = await DeviceModel.loadPresentDevices() .then(devices => devices.map(d => d.serial)) devDealer.send([ channel, @@ -299,6 +301,16 @@ export default db.ensureConnectivity(async(options: Options) => { .on(DeviceHeartbeatMessage, (channel, message, data) => { devDealer.send([ channel, data ]) }) + .on(GetDeadDevices, async(channel, message, data) => { + const deadDevices = await DeviceModel.getDeadDevice(message.time) + devDealer.send([ + channel, + reply.okay('success', {deadDevices}) + ]) + }) + .on(DeleteDevice, async(channel, message, data) => { + DeviceModel.deleteDevice(message.serial) + }) .handler(); devDealer.on('message', router) diff --git a/lib/units/reaper/index.ts b/lib/units/reaper/index.ts index 0db7898f36..2fde098619 100644 --- a/lib/units/reaper/index.ts +++ b/lib/units/reaper/index.ts @@ -1,8 +1,9 @@ import logger from '../../util/logger.js' import { + DeleteDevice, DeviceAbsentMessage, DeviceHeartbeatMessage, - DeviceIntroductionMessage, DevicePresentMessage, + DeviceIntroductionMessage, DevicePresentMessage, GetDeadDevices, GetPresentDevices } from '../../wire/wire.js' import wireutil from '../../wire/util.js' @@ -17,6 +18,8 @@ const log = logger.createLogger('reaper') interface Options { heartbeatTimeout: number + timeToDeviceCleanup: number // in minutes + deviceCleanupInterval: number // in minutes endpoints: { sub: string[] push: string[] @@ -87,34 +90,85 @@ export default (async(options: Options) => { ttlset.stop() }) - try { - log.info('Reaping devices with no heartbeat') + const router = new WireRouter() + .on(DeviceIntroductionMessage, (channel, message) => { + ttlset.drop(message.serial, TTLSet.SILENT) + ttlset.bump(message.serial, Date.now()) + }) + .on(DeviceHeartbeatMessage, (channel, message) => { + ttlset.bump(message.serial, Date.now()) + }) + .on(DeviceAbsentMessage, (channel, message) => { + ttlset.drop(message.serial, TTLSet.SILENT) + }) - const router = new WireRouter() - .on(DeviceIntroductionMessage, (channel, message) => { - ttlset.drop(message.serial, TTLSet.SILENT) - ttlset.bump(message.serial, Date.now()) - }) - .on(DeviceHeartbeatMessage, (channel, message) => { - ttlset.bump(message.serial, Date.now()) - }) - .on(DeviceAbsentMessage, (channel, message) => { - ttlset.drop(message.serial, TTLSet.SILENT) - }) + if (options.timeToDeviceCleanup) { + log.info('deviceCleanerLoop enabled') - // Listen to changes - sub.on('message', router.handler()) + // This functionality is implemented in the Reaper unit because this unit cannot be replicated + const deviceCleanerLoop = () => setTimeout(async() => { + log.info('Checking dead devices [interval: %s]', options.deviceCleanupInterval) + try { + const absenceDuration = options.timeToDeviceCleanup + const {deadDevices} = await runTransactionDev(wireutil.global, GetDeadDevices, { + time: options.timeToDeviceCleanup * 60 * 1000 + }, {sub, push, router}) - // Load initial state - const {devices} = await runTransactionDev(wireutil.global, GetPresentDevices, {}, {sub, push, router}) + for (const {serial, present} of deadDevices) { + if (present) { + continue + } - const now = Date.now() - devices.forEach((serial: string) => { - ttlset.bump(serial, now, TTLSet.SILENT) - }) + log.info( // @ts-ignore + 'Removing a dead device [serial: %s, absence_duration: %.1f %s]', + serial, + ... ( + absenceDuration >= 60 // if more 1 hour + ? [absenceDuration / 60, 'hrs'] + : [absenceDuration, 'min'] + ) + ) + + push.send([ + wireutil.global, + wireutil.pack(DeleteDevice, {serial}) + ]) + } + } catch (err: any) { + log.error('Dead device check failed with error: %s', err?.message) + } finally { + deviceCleanerLoop() + } + }, options.deviceCleanupInterval * 60 * 1000) + + deviceCleanerLoop() } - catch (err: any) { - log.fatal('Unable to load initial state: %s', err?.message) - lifecycle.fatal() + + const init = async() => { + try { + log.info('Reaping devices with no heartbeat') + + // Listen to changes + sub.on('message', router.handler()) + + // Load initial state + const {devices} = await runTransactionDev(wireutil.global, GetPresentDevices, {}, {sub, push, router}) + + const now = Date.now() + devices?.forEach((serial: string) => { + ttlset.bump(serial, now, TTLSet.SILENT) + }) + } + catch (err: any) { + if (err?.message === 'Timeout when running transaction') { + log.error('Load initial state error: Timeout when running transaction, retry') + setTimeout(init, 2000) + return + } + log.fatal('Unable to load initial state: %s', err?.message) + lifecycle.fatal() + } } + + init() }) diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index 53be0505e9..655b7138a8 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -909,3 +909,7 @@ message DeviceGetIsInOrigin { message GetPresentDevices { } + +message GetDeadDevices { + required uint32 time = 1; +} diff --git a/lib/wire/wire.ts b/lib/wire/wire.ts index 85edad04eb..438ab51313 100644 --- a/lib/wire/wire.ts +++ b/lib/wire/wire.ts @@ -2397,6 +2397,15 @@ export interface DeviceGetIsInOrigin { */ export interface GetPresentDevices { } +/** + * @generated from protobuf message GetDeadDevices + */ +export interface GetDeadDevices { + /** + * @generated from protobuf field: required uint32 time = 1 + */ + time: number; +} /** * @generated from protobuf enum DeviceStatus */ @@ -11744,3 +11753,50 @@ class GetPresentDevices$Type extends MessageType { * @generated MessageType for protobuf message GetPresentDevices */ export const GetPresentDevices = new GetPresentDevices$Type(); +// @generated message type with reflection information, may provide speed optimized methods +class GetDeadDevices$Type extends MessageType { + constructor() { + super("GetDeadDevices", [ + { no: 1, name: "time", kind: "scalar", T: 13 /*ScalarType.UINT32*/ } + ]); + } + create(value?: PartialMessage): GetDeadDevices { + const message = globalThis.Object.create((this.messagePrototype!)); + message.time = 0; + if (value !== undefined) + reflectionMergePartial(this, message, value); + return message; + } + internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetDeadDevices): GetDeadDevices { + let message = target ?? this.create(), end = reader.pos + length; + while (reader.pos < end) { + let [fieldNo, wireType] = reader.tag(); + switch (fieldNo) { + case /* required uint32 time */ 1: + message.time = reader.uint32(); + break; + default: + let u = options.readUnknownField; + if (u === "throw") + throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`); + let d = reader.skip(wireType); + if (u !== false) + (u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d); + } + } + return message; + } + internalBinaryWrite(message: GetDeadDevices, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter { + /* required uint32 time = 1; */ + if (message.time !== 0) + writer.tag(1, WireType.Varint).uint32(message.time); + let u = options.writeUnknownFields; + if (u !== false) + (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); + return writer; + } +} +/** + * @generated MessageType for protobuf message GetDeadDevices + */ +export const GetDeadDevices = new GetDeadDevices$Type();