Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/cli/local/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,16 @@ export const builder = function(yargs) {
type: 'boolean',
default: true
})
.option('time-to-device-cleanup', {
describe: 'Time in seconds after which connected devices should be deleted. 0 - do not delete',
type: 'number',
default: 0
})
.option('device-cleanup-interval', {
describe: 'Interval for checking devices for cleanup in minutes',
type: 'number',
default: 120_000
})
.epilog('Each option can be be overwritten with an environment variable ' +
'by converting the option to uppercase, replacing dashes with ' +
'underscores and prefixing it with `STF_LOCAL_` (e.g. ' +
Expand Down Expand Up @@ -343,7 +353,9 @@ export const handler = function(argv) {
[ // reaper one
'reaper', 'reaper001',
'--connect-push', argv.bindDevPull,
'--connect-sub', argv.bindDevPub
'--connect-sub', argv.bindDevPub,
'--time-to-device-cleanup', argv.timeToDeviceCleanup,
'--device-cleanup-interval', argv.deviceCleanupInterval
],
[ // provider
'provider',
Expand Down
12 changes: 12 additions & 0 deletions lib/cli/reaper/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ export const builder = function(yargs) {
type: 'string',
default: os.hostname()
})
.option('time-to-device-cleanup', {
describe: 'Time in minutes after which connected devices should be deleted. 0 - do not delete',
type: 'number',
default: 0
})
.option('device-cleanup-interval', {
describe: 'Interval for checking devices for cleanup in minutes',
type: 'number',
default: 2
})
.epilog('Each option can be be overwritten with an environment variable ' +
'by converting the option to uppercase, replacing dashes with ' +
'underscores and prefixing it with `STF_REAPER_` (e.g. ' +
Expand All @@ -39,6 +49,8 @@ export const handler = function(argv) {
return reaper({
name: argv.name,
heartbeatTimeout: argv.heartbeatTimeout,
timeToDeviceCleanup: argv.timeToDeviceCleanup,
deviceCleanupInterval: argv.deviceCleanupInterval,
endpoints: {
push: argv.connectPush,
sub: argv.connectSub
Expand Down
14 changes: 2 additions & 12 deletions lib/db/models/all/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {v4 as uuidv4} from 'uuid'
import * as apiutil from '../../../util/apiutil.js'
import GroupModel from '../group/index.js'
import UserModel from '../user/index.js'

import DeviceModel from '../device/index.js'
import logger from '../../../util/logger.js'
import {getRootGroup, getGroup} from '../group/model.js'

Expand Down Expand Up @@ -887,16 +887,6 @@ export const loadStandardDevices = function(groups, fields) {
}, fields)
}

// dbapi.loadPresentDevices = function() {
export const loadPresentDevices = function() {
return db.devices.find({present: true}).toArray()
}

// dbapi.loadDeviceBySerial = function(serial) {
export const loadDeviceBySerial = function(serial) {
return findDevice({serial: serial})
}

// dbapi.loadDevice = function(groups, serial) {
export const loadDevice = function(groups, serial) {
return findDevice({
Expand Down Expand Up @@ -1186,7 +1176,7 @@ export const setAbsentDisconnectedDevices = function() {

// dbapi.getInstalledApplications = function(message) {
export const getInstalledApplications = function(message) {
return loadDeviceBySerial(message.serial)
return DeviceModel.loadDeviceBySerial(message.serial)
}

// dbapi.setDeviceType = function(serial, type) {
Expand Down
15 changes: 13 additions & 2 deletions lib/db/models/device/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ export const loadPresentDevices = function() {
return db.devices.find({present: true}).toArray()
}

export const loadDeviceBySerial = function(serial) {
return db.devices.findOne({serial})
}

// dbapi.loadDevicesBySerials = function(serials) {
export const loadDevicesBySerials = function(serials) {
return db.devices.find({serial: {$in: serials}}).toArray()
Expand Down Expand Up @@ -97,6 +101,15 @@ export const getDeviceType = function(serial) {
})
}

export const getDeadDevice = function(time = 60_000) {
return db.devices.find(
{
present: false,
presenceChangedAt: {$lt: new Date(Date.now() - time)}
}
).project({serial: 1, present: 1, presenceChangedAt: 1}).toArray()
}

// dbapi.generateIndexes = function() {
export const generateIndexes = function() {
db.devices.createIndex({serial: -1}).then((result) => {
Expand All @@ -111,8 +124,6 @@ export const generateIndexes = function() {
*/




// dbapi.deleteDevice = function(serial) {
export const deleteDevice = function(serial) {
return db.devices.deleteOne({serial: serial})
Expand Down
2 changes: 1 addition & 1 deletion lib/db/models/group/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ export const deleteUserGroup = async(id) => {
await db.groups.deleteOne({id})

await Promise.all(
group?.users.map(email => db.groups.updateOne(
group?.users.map(email => db.users.updateOne(
{email}
, {
$pull: {'groups.subscribed': id}
Expand Down
20 changes: 16 additions & 4 deletions lib/units/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import lifecycle from '../../util/lifecycle.js'
import srv from '../../util/srv.js'
import * as zmqutil from '../../util/zmqutil.js'
import UserModel from '../../db/models/user/index.js'
import DeviceModel from '../../db/models/device/index.js'
import {
UserChangeMessage,
GroupChangeMessage,
Expand Down Expand Up @@ -53,8 +54,9 @@ import {
DeleteDevice,
SetAbsentDisconnectedDevices,
GetServicesAvailabilityMessage,
DeviceRegisteredMessage, GetPresentDevices, DeviceGetIsInOrigin
DeviceRegisteredMessage, GetPresentDevices, DeviceGetIsInOrigin, GetDeadDevices
} from '../../wire/wire.js'
import {getDeadDevice} from "../../db/models/device/model.js";

interface Options {
name: string
Expand Down Expand Up @@ -228,8 +230,8 @@ export default db.ensureConnectivity(async(options: Options) => {
appDealer.send([channel, data])
})
.on(DeviceGetIsInOrigin, async (channel, message) => {
const device = await dbapi.loadDeviceBySerial(message.serial)
const isInOrigin = device.group.id === device.group.origin
const device = await DeviceModel.loadDeviceBySerial(message.serial)
const isInOrigin = device ? device.group.id === device.group.origin : false
devDealer.send([
channel,
reply.okay('success', {isInOrigin})
Expand Down Expand Up @@ -289,7 +291,7 @@ export default db.ensureConnectivity(async(options: Options) => {
appDealer.send([channel, data])
})
.on(GetPresentDevices, async (channel, message, data) => {
const devices = await dbapi.loadPresentDevices()
const devices = await DeviceModel.loadPresentDevices()
.then(devices => devices.map(d => d.serial))
devDealer.send([
channel,
Expand All @@ -299,6 +301,16 @@ export default db.ensureConnectivity(async(options: Options) => {
.on(DeviceHeartbeatMessage, (channel, message, data) => {
devDealer.send([ channel, data ])
})
.on(GetDeadDevices, async(channel, message, data) => {
const deadDevices = await DeviceModel.getDeadDevice(message.time)
devDealer.send([
channel,
reply.okay('success', {deadDevices})
])
})
.on(DeleteDevice, async(channel, message, data) => {
DeviceModel.deleteDevice(message.serial)
})
.handler();

devDealer.on('message', router)
Expand Down
104 changes: 79 additions & 25 deletions lib/units/reaper/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logger from '../../util/logger.js'
import {
DeleteDevice,
DeviceAbsentMessage,
DeviceHeartbeatMessage,
DeviceIntroductionMessage, DevicePresentMessage,
DeviceIntroductionMessage, DevicePresentMessage, GetDeadDevices,
GetPresentDevices
} from '../../wire/wire.js'
import wireutil from '../../wire/util.js'
Expand All @@ -17,6 +18,8 @@ const log = logger.createLogger('reaper')

interface Options {
heartbeatTimeout: number
timeToDeviceCleanup: number // in minutes
deviceCleanupInterval: number // in minutes
endpoints: {
sub: string[]
push: string[]
Expand Down Expand Up @@ -87,34 +90,85 @@ export default (async(options: Options) => {
ttlset.stop()
})

try {
log.info('Reaping devices with no heartbeat')
const router = new WireRouter()
.on(DeviceIntroductionMessage, (channel, message) => {
ttlset.drop(message.serial, TTLSet.SILENT)
ttlset.bump(message.serial, Date.now())
})
.on(DeviceHeartbeatMessage, (channel, message) => {
ttlset.bump(message.serial, Date.now())
})
.on(DeviceAbsentMessage, (channel, message) => {
ttlset.drop(message.serial, TTLSet.SILENT)
})

const router = new WireRouter()
.on(DeviceIntroductionMessage, (channel, message) => {
ttlset.drop(message.serial, TTLSet.SILENT)
ttlset.bump(message.serial, Date.now())
})
.on(DeviceHeartbeatMessage, (channel, message) => {
ttlset.bump(message.serial, Date.now())
})
.on(DeviceAbsentMessage, (channel, message) => {
ttlset.drop(message.serial, TTLSet.SILENT)
})
if (options.timeToDeviceCleanup) {
log.info('deviceCleanerLoop enabled')

// Listen to changes
sub.on('message', router.handler())
// This functionality is implemented in the Reaper unit because this unit cannot be replicated
const deviceCleanerLoop = () => setTimeout(async() => {
log.info('Checking dead devices [interval: %s]', options.deviceCleanupInterval)
try {
const absenceDuration = options.timeToDeviceCleanup
const {deadDevices} = await runTransactionDev(wireutil.global, GetDeadDevices, {
time: options.timeToDeviceCleanup * 60 * 1000
}, {sub, push, router})

// Load initial state
const {devices} = await runTransactionDev(wireutil.global, GetPresentDevices, {}, {sub, push, router})
for (const {serial, present} of deadDevices) {
if (present) {
continue
}

const now = Date.now()
devices.forEach((serial: string) => {
ttlset.bump(serial, now, TTLSet.SILENT)
})
log.info( // @ts-ignore
'Removing a dead device [serial: %s, absence_duration: %.1f %s]',
serial,
... (
absenceDuration >= 60 // if more 1 hour
? [absenceDuration / 60, 'hrs']
: [absenceDuration, 'min']
)
)

push.send([
wireutil.global,
wireutil.pack(DeleteDevice, {serial})
])
}
} catch (err: any) {
log.error('Dead device check failed with error: %s', err?.message)
} finally {
deviceCleanerLoop()
}
}, options.deviceCleanupInterval * 60 * 1000)

deviceCleanerLoop()
}
catch (err: any) {
log.fatal('Unable to load initial state: %s', err?.message)
lifecycle.fatal()

const init = async() => {
try {
log.info('Reaping devices with no heartbeat')

// Listen to changes
sub.on('message', router.handler())

// Load initial state
const {devices} = await runTransactionDev(wireutil.global, GetPresentDevices, {}, {sub, push, router})

const now = Date.now()
devices?.forEach((serial: string) => {
ttlset.bump(serial, now, TTLSet.SILENT)
})
}
catch (err: any) {
if (err?.message === 'Timeout when running transaction') {
log.error('Load initial state error: Timeout when running transaction, retry')
setTimeout(init, 2000)
return
}
log.fatal('Unable to load initial state: %s', err?.message)
lifecycle.fatal()
}
}

init()
})
4 changes: 4 additions & 0 deletions lib/wire/wire.proto
Original file line number Diff line number Diff line change
Expand Up @@ -909,3 +909,7 @@ message DeviceGetIsInOrigin {

message GetPresentDevices {
}

message GetDeadDevices {
required uint32 time = 1;
}
56 changes: 56 additions & 0 deletions lib/wire/wire.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,15 @@ export interface DeviceGetIsInOrigin {
*/
export interface GetPresentDevices {
}
/**
* @generated from protobuf message GetDeadDevices
*/
export interface GetDeadDevices {
/**
* @generated from protobuf field: required uint32 time = 1
*/
time: number;
}
/**
* @generated from protobuf enum DeviceStatus
*/
Expand Down Expand Up @@ -11744,3 +11753,50 @@ class GetPresentDevices$Type extends MessageType<GetPresentDevices> {
* @generated MessageType for protobuf message GetPresentDevices
*/
export const GetPresentDevices = new GetPresentDevices$Type();
// @generated message type with reflection information, may provide speed optimized methods
class GetDeadDevices$Type extends MessageType<GetDeadDevices> {
constructor() {
super("GetDeadDevices", [
{ no: 1, name: "time", kind: "scalar", T: 13 /*ScalarType.UINT32*/ }
]);
}
create(value?: PartialMessage<GetDeadDevices>): GetDeadDevices {
const message = globalThis.Object.create((this.messagePrototype!));
message.time = 0;
if (value !== undefined)
reflectionMergePartial<GetDeadDevices>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: GetDeadDevices): GetDeadDevices {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* required uint32 time */ 1:
message.time = reader.uint32();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: GetDeadDevices, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* required uint32 time = 1; */
if (message.time !== 0)
writer.tag(1, WireType.Varint).uint32(message.time);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message GetDeadDevices
*/
export const GetDeadDevices = new GetDeadDevices$Type();