diff --git a/index.js b/index.js index be3c76f..9fa69fd 100644 --- a/index.js +++ b/index.js @@ -1,18 +1,18 @@ const AWS = require("aws-sdk"); -const initOptions = {/* initialization options */}; +const initOptions = {/* initialization options */ }; const pgp = require('pg-promise')(initOptions); const validate = require('jsonschema').validate; -AWS.config.getCredentials(function(err) { - if (err) console.error(err.stack); - // credentials not loaded - else { - console.log("AWS Configured"); - } +AWS.config.getCredentials(function (err) { + if (err) console.error(err.stack); + // credentials not loaded + else { + console.log("AWS Configured"); + } }); // AWS bug that is not auto-updating region -AWS.config.update({region:'us-east-1'}); +AWS.config.update({ region: 'us-east-1' }); const lambda = new AWS.Lambda(); const secretManager = new AWS.SecretsManager(); @@ -24,10 +24,10 @@ const secretManager = new AWS.SecretsManager(); */ exports.handler = async (event) => { console.log('Executor Running'); - + // Connect To Secret Manager & Database - const data = await secretManager.getSecretValue({SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P'}).promise() - + const data = await secretManager.getSecretValue({ SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P' }).promise() + secret = JSON.parse(data.SecretString); const cn = { host: secret.host, @@ -35,29 +35,29 @@ exports.handler = async (event) => { database: secret.dbname, user: secret.username, password: secret.password, - max: 1 + max: 1 }; - + const db = pgp(cn); - await db.none('UPDATE functions SET state = $1 WHERE state = $2',['ACTIVE', 'Active']) - await db.none('UPDATE functions SET state = $1 WHERE state = $2',['FAILED', 'Failed']) + await db.none('UPDATE functions SET state = $1 WHERE state = $2', ['ACTIVE', 'Active']) + await db.none('UPDATE functions SET state = $1 WHERE state = $2', ['FAILED', 'Failed']) const scheduleObj = await db.oneOrNone(`SELECT jobs.ID as jobid,` - + ` jobs.STATE as jobstate,` - + ` jobs.RETRY_COUNT as jobretry,` - + ` jobs.AA_ID as useraa,` - + ` jobs.REQUEST_PARAMS as fiupayload,` - + ` jobs.FUNCTION_ID as fnid,` - + ` functions.FUNCTION_NAME as fnname,` - + ` functions.STATE as fnstate,` - + ` functions.S3_LOCATION as fnloc,` // Don't need location in Scheduler Lambda - + ` functions.RESULT_JSON_SCHEMA as fnjsonschema` - + ` FROM jobs ` - // + ` FROM jobs, functions` - // + ` WHERE ` - // + ` AND jobs.FUNCTION_ID = functions.ID ` - + ` INNER JOIN functions ON functions.ID = jobs.FUNCTION_ID AND jobs.STATE = 'CREATED' AND ( functions.STATE = 'ACTIVE' OR functions.STATE = 'INACTIVE')` - + ` ORDER BY jobs.created DESC LIMIT 1`) + + ` jobs.STATE as jobstate,` + + ` jobs.RETRY_COUNT as jobretry,` + + ` jobs.AA_ID as useraa,` + + ` jobs.REQUEST_PARAMS as fiupayload,` + + ` jobs.FUNCTION_ID as fnid,` + + ` functions.FUNCTION_NAME as fnname,` + + ` functions.STATE as fnstate,` + + ` functions.S3_LOCATION as fnloc,` // Don't need location in Scheduler Lambda + + ` functions.RESULT_JSON_SCHEMA as fnjsonschema` + + ` FROM jobs ` + // + ` FROM jobs, functions` + // + ` WHERE ` + // + ` AND jobs.FUNCTION_ID = functions.ID ` + + ` INNER JOIN functions ON functions.ID = jobs.FUNCTION_ID AND jobs.STATE = 'CREATED' AND ( functions.STATE = 'ACTIVE' OR functions.STATE = 'INACTIVE')` + + ` ORDER BY jobs.created DESC LIMIT 1`) console.log('Job', scheduleObj); // Return if no object in queue @@ -65,15 +65,15 @@ exports.handler = async (event) => { return "Nothing in Queue"; } - const updateJob = async (newState, jobResult='') => { + const updateJob = async (newState, jobResult = '') => { await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 WHERE ID = $1', [scheduleObj.jobid, newState, jobResult]) } const increaseRetry = async (jobResult) => { if (scheduleObj.jobretry >= 3) - await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'FAILED', jobResult, scheduleObj.jobretry+1]) + await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'FAILED', jobResult, scheduleObj.jobretry + 1]) else - await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'CREATED', jobResult, scheduleObj.jobretry+1]) + await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'CREATED', jobResult, scheduleObj.jobretry + 1]) } @@ -97,30 +97,41 @@ exports.handler = async (event) => { FunctionName: `fiu_${scheduleObj.fnid}`, Qualifier: '$LATEST', // documentation bug https://github.com/aws/aws-sdk-js/issues/1876 - Payload: JSON.stringify({ - userAA: scheduleObj.useraa, - payload: scheduleObj.fiupayload - }), + Payload: JSON.stringify(scheduleObj.fiupayload) + // Payload: JSON.stringify({ + // userAA: scheduleObj.useraa, + // payload: scheduleObj.fiupayload + // }), }).promise(); if (dataRun.StatusCode === 200) { // Function ran successfully 🎉 - //check for validation - if (validate(dataRun.Payload, JSON.parse(scheduleObj.fnjsonschema)).valid) { + if ('errorMessage' in dataRun.Payload) { result = { - data: dataRun.Payload, + data: null, log: dataRun.LogResult, - error: null + error: dataRun.Payload } + updateJob('FAILED', JSON.stringify(result)) } else { - result = { - data: dataRun.Payload, - log: dataRun.LogResult, - error: 'Validation Failed' + //check for validation + if (validate(dataRun.Payload, JSON.parse(scheduleObj.fnjsonschema)).valid) { + result = { + data: dataRun.Payload, + log: dataRun.LogResult, + error: null + } + } else { + result = { + data: dataRun.Payload, + log: dataRun.LogResult, + error: 'Validation Failed' + } } + updateJob('SUCCESS', JSON.stringify(result)) } - + } else { // Function itself ran into an error result = { @@ -128,10 +139,9 @@ exports.handler = async (event) => { log: dataRun.LogResult, error: dataRun.FunctionError } + updateJob('FAILED', JSON.stringify(result)) } - updateJob('SUCCESS', JSON.stringify(result)) - } catch (errRun) { // update: State -> Created & Retry++ (if r >= 3) set failed @@ -153,7 +163,7 @@ exports.handler = async (event) => { console.error(`E4.2 Function pending creation for job ${scheduleObj.jobid}`); return "E4.2"; } - + return result; } @@ -161,10 +171,10 @@ exports.handler = async (event) => { // https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#createFunction-property exports.creatorHandle = async (event) => { console.log('Creator Running', event.Records[0].s3.object, event.Records[0].s3.bucket); - + // Connect To Secret Manager & Database - const data = await secretManager.getSecretValue({SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P'}).promise() - + const data = await secretManager.getSecretValue({ SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P' }).promise() + secret = JSON.parse(data.SecretString); const cn = { host: secret.host, @@ -172,13 +182,13 @@ exports.creatorHandle = async (event) => { database: secret.dbname, user: secret.username, password: secret.password, - max: 1 + max: 1 }; - - const db = pgp(cn); + + const db = pgp(cn); console.log('DB Connected'); let fn = ""; - fn = await db.oneOrNone('SELECT * FROM functions WHERE S3_LOCATION=$1 OR S3_LOCATION=$2 LIMIT 1',[event.Records[0].s3.object.key,event.Records[0].s3.object.key.split('/')[1]]); + fn = await db.oneOrNone('SELECT * FROM functions WHERE S3_LOCATION=$1 OR S3_LOCATION=$2 LIMIT 1', [event.Records[0].s3.object.key, event.Records[0].s3.object.key.split('/')[1]]); console.log('FN', fn); if (!fn) { @@ -190,7 +200,7 @@ exports.creatorHandle = async (event) => { try { console.log("Getting Function"); const dataFn = await lambda.getFunction({ - FunctionName: `fiu_${fn.id}`, + FunctionName: `${fn.fiu_id}_${fn.id}`, Qualifier: '$LATEST', }).promise(); if (dataFn) { @@ -198,14 +208,14 @@ exports.creatorHandle = async (event) => { console.log("Updating Function") resp = await lambda.updateFunctionCode({ FunctionName: dataFn.Configuration.FunctionName, - S3Bucket: event.Records[0].s3.bucket.name, + S3Bucket: event.Records[0].s3.bucket.name, S3Key: event.Records[0].s3.object.key }).promise(); if (fn.handler !== dataFn.Configuration.Handler || fn.runtime !== dataFn.Configuration.Runtime) { console.log("Updating Handler & Runtime") await lambda.updateFunctionConfiguration({ FunctionName: dataFn.Configuration.FunctionName, - Handler: fn.handler, + Handler: fn.handler, Runtime: fn.runtime }).promise(); } @@ -221,30 +231,30 @@ exports.creatorHandle = async (event) => { console.log("Creating Function"); resp = await lambda.createFunction({ Code: { - S3Bucket: event.Records[0].s3.bucket.name, + S3Bucket: event.Records[0].s3.bucket.name, S3Key: event.Records[0].s3.object.key - }, - Description: `Package ${fn.id} for FIU ${fn.fiu_id}`, + }, + Description: `Package ${fn.id} for FIU ${fn.fiu_id}`, Environment: { Variables: { - "CREATION_DATE": (new Date()).toString(), + "CREATION_DATE": (new Date()).toString(), } - }, - FunctionName: `fiu_${fn.id}`, - Handler: fn.handler, - Role: "arn:aws:iam::788726710547:role/VDRFiuBinaryLambdaRole", + }, + FunctionName: `${fn.fiu_id}_${fn.id}`, + Handler: fn.handler, + Role: "arn:aws:iam::788726710547:role/VDRFiuBinaryLambdaRole", // Runtime: 'nodejs12.x', - Runtime: fn.runtime, + Runtime: fn.runtime, Tags: { "DEPARTMENT": "FIU_RES" - }, + }, Timeout: 15, - VpcConfig: { - SecurityGroupIds: [ "sg-0dc27f2e5b0369bee" ], - SubnetIds: [ "subnet-08592b3f5a6fcebbc" ], + VpcConfig: { + SecurityGroupIds: ["sg-0dc27f2e5b0369bee"], + SubnetIds: ["subnet-08592b3f5a6fcebbc"], // VpcId: "vpc-072adfdb316436fd6" } - + }).promise(); } return resp; @@ -255,10 +265,10 @@ exports.creatorHandle = async (event) => { exports.stateCheckFn = async (event) => { console.log('Scheduled State Check Fn Running'); - + // Connect To Secret Manager & Database - const data = await secretManager.getSecretValue({SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P'}).promise() - + const data = await secretManager.getSecretValue({ SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P' }).promise() + secret = JSON.parse(data.SecretString); const cn = { host: secret.host, @@ -266,12 +276,12 @@ exports.stateCheckFn = async (event) => { database: secret.dbname, user: secret.username, password: secret.password, - max: 1 + max: 1 }; - + const db = pgp(cn); // const fnQueue = await db.manyOrNone(`SELECT * FROM functions WHERE fiu_id=$1`,['7ec79ac6-a8ec-4223-82a1-6547442d7f32']) - const fnQueue = await db.manyOrNone(`SELECT * FROM functions WHERE STATE=$1`,['PENDING']) + const fnQueue = await db.manyOrNone(`SELECT * FROM functions WHERE STATE=$1`, ['PENDING']) console.log(fnQueue); // return; const result = []; @@ -289,7 +299,7 @@ exports.stateCheckFn = async (event) => { result.push(pgp.as.format('UPDATE functions SET state = $1 WHERE id = $2', [dataFn.Configuration.State.toUpperCase(), fn.id])) } catch { - if (new Date(fn.created) < new Date() - 15*60*1000) { + if (new Date(fn.created) < new Date() - 15 * 60 * 1000) { // This was created over 15 minutes ago. result.push(pgp.as.format('UPDATE functions SET state = $1 WHERE id = $2', ['FAILED', fn.id])) }