Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_memberOrganizations_memberId_emailDomain"
ON "memberOrganizations" ("memberId")
WHERE "source" = 'email-domain' AND "deletedAt" IS NULL;
2 changes: 1 addition & 1 deletion services/apps/cron_service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/cron-service",
"private": true,
"scripts": {
"start": "SERVICE=cron-service tsx src/main.ts",
"start": "SERVICE=cron-service LOG_LEVEL=trace tsx src/main.ts",
Comment thread
skwowet marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove log level trace here.

"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "SERVICE=cron-service LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import CronTime from 'cron-time-generator'

import {
MEMBER_ORG_STINT_CHANGES_DATES_PREFIX,
MEMBER_ORG_STINT_CHANGES_QUEUE,
inferMemberOrganizationStintChanges,
} from '@crowd/common_services'
import {
QueryExecutor,
changeMemberOrganizationAffiliationOverrides,
checkOrganizationAffiliationPolicy,
createMemberOrganization,
fetchMemberOrganizationsBySource,
updateMemberOrganization,
} from '@crowd/data-access-layer'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'
import { MemberOrgStintChange, OrganizationSource } from '@crowd/types'

import { IJobDefinition } from '../types'

const job: IJobDefinition = {
name: 'infer-member-organization-stint-changes',
cronTime: CronTime.every(5).minutes(),
timeout: 10 * 60,
process: async (ctx) => {
const redis = await getRedisClient(REDIS_CONFIG())
const db = await getDbConnection(WRITE_DB_CONFIG())
const qx = pgpQx(db)

ctx.log.info('Starting member organization stint inference job.')

const memberIds = await redis.sRandMemberCount(MEMBER_ORG_STINT_CHANGES_QUEUE, 500)
if (!memberIds?.length) return

ctx.log.info({ count: memberIds.length }, 'Processing members from queue.')

let processed = 0

for (const memberId of memberIds) {
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const hash = await redis.hGetAll(datesKey)

if (!hash || Object.keys(hash).length === 0) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}

const { activityDates, orgIds } = parseMemberActivityHash(hash)

if (activityDates.length > 0) {
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)

const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)

if (changes.length > 0) {
ctx.log.debug({ memberId, changes }, 'Stint changes identified.')
await applyStintChanges(qx, changes)
}
}
Comment thread
skwowet marked this conversation as resolved.
Comment thread
skwowet marked this conversation as resolved.
Comment thread
skwowet marked this conversation as resolved.

// Remove only the fields we actually read
await redis
.multi()
.hDel(datesKey, orgIds)
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
.exec()
Comment thread
skwowet marked this conversation as resolved.

processed++
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
}
Comment thread
skwowet marked this conversation as resolved.
}

ctx.log.info({ processed }, 'Batch complete.')
},
}

/**
* Parses the Redis hash into a clean, typed list of activity dates.
*/
function parseMemberActivityHash(hash: Record<string, string>) {
const orgIds = Object.keys(hash)
const activityDates = orgIds.flatMap((organizationId) => {
try {
const dates = JSON.parse(hash[organizationId])
return Array.isArray(dates)
? dates
.filter((d): d is string => typeof d === 'string')
.map((date) => ({ organizationId, date }))
: []
} catch {
return []
}
})
return { activityDates, orgIds }
}

/**
* Applies the stint changes to the database.
*/
async function applyStintChanges(qx: QueryExecutor, changes: MemberOrgStintChange[]) {
for (const change of changes) {
if (change.type === 'insert') {
const memberOrganizationId = await createMemberOrganization(qx, change.memberId, {
organizationId: change.organizationId,
dateStart: change.dateStart,
dateEnd: change.dateEnd,
source: OrganizationSource.EMAIL_DOMAIN,
})

const isAffiliationBlocked = await checkOrganizationAffiliationPolicy(
qx,
change.organizationId,
)

if (memberOrganizationId && isAffiliationBlocked) {
await changeMemberOrganizationAffiliationOverrides(qx, [
{
memberId: change.memberId,
memberOrganizationId,
allowAffiliation: false,
},
])
}
} else {
await updateMemberOrganization(qx, change.memberId, change.id, {
dateStart: change.dateStart,
dateEnd: change.dateEnd,
})
}
}
}

export default job
4 changes: 1 addition & 3 deletions services/apps/data_sink_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/data-sink-worker",
"private": true,
"scripts": {
"start": "SERVICE=data-sink-worker tsx src/main.ts",
"start": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx src/main.ts",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove log level trace here.

"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"start:debug": "SERVICE=data-sink-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9233 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
Expand All @@ -17,8 +17,6 @@
"script:restart-result": "SERVICE=script tsx src/bin/restart-result.ts",
"script:process-results": "SERVICE=script tsx src/bin/process-results.ts",
"script:trigger-results-for-tenant": "SERVICE=script tsx src/bin/trigger-results-for-tenant.ts",
"script:map-tenant-members-to-org": "SERVICE=script tsx src/bin/map-tenant-members-to-org.ts",
"script:map-member-to-org": "SERVICE=script tsx src/bin/map-member-to-org.ts",
"script:fix-activity-obj-member-data": "SERVICE=script tsx src/bin/fix-activity-obj-member-data.ts",
"script:fix-member-displayName": "SERVICE=script tsx src/bin/fix-member-displayName.ts",
"script:fix-members-joinedAt": "SERVICE=script tsx src/bin/fix-members-joinedAt.ts",
Expand Down
97 changes: 0 additions & 97 deletions services/apps/data_sink_worker/src/bin/map-member-to-org.ts

This file was deleted.

This file was deleted.

Loading
Loading