Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 90 additions & 80 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
const AWS = require("aws-sdk");
const initOptions = {/* initialization options */};
const initOptions = {/* initialization options */ };
const pgp = require('pg-promise')(initOptions);
const validate = require('jsonschema').validate;

AWS.config.getCredentials(function(err) {
if (err) console.error(err.stack);
// credentials not loaded
else {
console.log("AWS Configured");
}
AWS.config.getCredentials(function (err) {
if (err) console.error(err.stack);
// credentials not loaded
else {
console.log("AWS Configured");
}
});

// AWS bug that is not auto-updating region
AWS.config.update({region:'us-east-1'});
AWS.config.update({ region: 'us-east-1' });

const lambda = new AWS.Lambda();
const secretManager = new AWS.SecretsManager();
Expand All @@ -24,56 +24,56 @@ const secretManager = new AWS.SecretsManager();
*/
exports.handler = async (event) => {
console.log('Executor Running');

// Connect To Secret Manager & Database
const data = await secretManager.getSecretValue({SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P'}).promise()
const data = await secretManager.getSecretValue({ SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P' }).promise()

secret = JSON.parse(data.SecretString);
const cn = {
host: secret.host,
port: secret.port,
database: secret.dbname,
user: secret.username,
password: secret.password,
max: 1
max: 1
};

const db = pgp(cn);
await db.none('UPDATE functions SET state = $1 WHERE state = $2',['ACTIVE', 'Active'])
await db.none('UPDATE functions SET state = $1 WHERE state = $2',['FAILED', 'Failed'])
await db.none('UPDATE functions SET state = $1 WHERE state = $2', ['ACTIVE', 'Active'])
await db.none('UPDATE functions SET state = $1 WHERE state = $2', ['FAILED', 'Failed'])

const scheduleObj = await db.oneOrNone(`SELECT jobs.ID as jobid,`
+ ` jobs.STATE as jobstate,`
+ ` jobs.RETRY_COUNT as jobretry,`
+ ` jobs.AA_ID as useraa,`
+ ` jobs.REQUEST_PARAMS as fiupayload,`
+ ` jobs.FUNCTION_ID as fnid,`
+ ` functions.FUNCTION_NAME as fnname,`
+ ` functions.STATE as fnstate,`
+ ` functions.S3_LOCATION as fnloc,` // Don't need location in Scheduler Lambda
+ ` functions.RESULT_JSON_SCHEMA as fnjsonschema`
+ ` FROM jobs `
// + ` FROM jobs, functions`
// + ` WHERE `
// + ` AND jobs.FUNCTION_ID = functions.ID `
+ ` INNER JOIN functions ON functions.ID = jobs.FUNCTION_ID AND jobs.STATE = 'CREATED' AND ( functions.STATE = 'ACTIVE' OR functions.STATE = 'INACTIVE')`
+ ` ORDER BY jobs.created DESC LIMIT 1`)
+ ` jobs.STATE as jobstate,`
+ ` jobs.RETRY_COUNT as jobretry,`
+ ` jobs.AA_ID as useraa,`
+ ` jobs.REQUEST_PARAMS as fiupayload,`
+ ` jobs.FUNCTION_ID as fnid,`
+ ` functions.FUNCTION_NAME as fnname,`
+ ` functions.STATE as fnstate,`
+ ` functions.S3_LOCATION as fnloc,` // Don't need location in Scheduler Lambda
+ ` functions.RESULT_JSON_SCHEMA as fnjsonschema`
+ ` FROM jobs `
// + ` FROM jobs, functions`
// + ` WHERE `
// + ` AND jobs.FUNCTION_ID = functions.ID `
+ ` INNER JOIN functions ON functions.ID = jobs.FUNCTION_ID AND jobs.STATE = 'CREATED' AND ( functions.STATE = 'ACTIVE' OR functions.STATE = 'INACTIVE')`
+ ` ORDER BY jobs.created DESC LIMIT 1`)
console.log('Job', scheduleObj);

// Return if no object in queue
if (!scheduleObj) {
return "Nothing in Queue";
}

const updateJob = async (newState, jobResult='') => {
const updateJob = async (newState, jobResult = '') => {
await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 WHERE ID = $1', [scheduleObj.jobid, newState, jobResult])
}

const increaseRetry = async (jobResult) => {
if (scheduleObj.jobretry >= 3)
await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'FAILED', jobResult, scheduleObj.jobretry+1])
await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'FAILED', jobResult, scheduleObj.jobretry + 1])
else
await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'CREATED', jobResult, scheduleObj.jobretry+1])
await db.none('UPDATE jobs SET STATE=$2, LAST_UPDATED = NOW(), RESULT=$3 , RETRY_COUNT=$4 WHERE ID = $1', [scheduleObj.jobid, 'CREATED', jobResult, scheduleObj.jobretry + 1])

}

Expand All @@ -97,41 +97,51 @@ exports.handler = async (event) => {
FunctionName: `fiu_${scheduleObj.fnid}`,
Qualifier: '$LATEST',
// documentation bug https://github.com/aws/aws-sdk-js/issues/1876
Payload: JSON.stringify({
userAA: scheduleObj.useraa,
payload: scheduleObj.fiupayload
}),
Payload: JSON.stringify(scheduleObj.fiupayload)
// Payload: JSON.stringify({
// userAA: scheduleObj.useraa,
// payload: scheduleObj.fiupayload
// }),
}).promise();

if (dataRun.StatusCode === 200) {
// Function ran successfully 🎉

//check for validation
if (validate(dataRun.Payload, JSON.parse(scheduleObj.fnjsonschema)).valid) {
if ('errorMessage' in dataRun.Payload) {
result = {
data: dataRun.Payload,
data: null,
log: dataRun.LogResult,
error: null
error: dataRun.Payload
}
updateJob('FAILED', JSON.stringify(result))
} else {
result = {
data: dataRun.Payload,
log: dataRun.LogResult,
error: 'Validation Failed'
//check for validation
if (validate(dataRun.Payload, JSON.parse(scheduleObj.fnjsonschema)).valid) {
result = {
data: dataRun.Payload,
log: dataRun.LogResult,
error: null
}
} else {
result = {
data: dataRun.Payload,
log: dataRun.LogResult,
error: 'Validation Failed'
}
}
updateJob('SUCCESS', JSON.stringify(result))
}

} else {
// Function itself ran into an error
result = {
data: dataRun.Payload,
log: dataRun.LogResult,
error: dataRun.FunctionError
}
updateJob('FAILED', JSON.stringify(result))
}

updateJob('SUCCESS', JSON.stringify(result))

} catch (errRun) {
// update: State -> Created & Retry++ (if r >= 3) set failed

Expand All @@ -153,32 +163,32 @@ exports.handler = async (event) => {
console.error(`E4.2 Function pending creation for job ${scheduleObj.jobid}`);
return "E4.2";
}

return result;
}

// https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
// https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html#createFunction-property
exports.creatorHandle = async (event) => {
console.log('Creator Running', event.Records[0].s3.object, event.Records[0].s3.bucket);

// Connect To Secret Manager & Database
const data = await secretManager.getSecretValue({SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P'}).promise()
const data = await secretManager.getSecretValue({ SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P' }).promise()

secret = JSON.parse(data.SecretString);
const cn = {
host: secret.host,
port: secret.port,
database: secret.dbname,
user: secret.username,
password: secret.password,
max: 1
max: 1
};
const db = pgp(cn);

const db = pgp(cn);
console.log('DB Connected');
let fn = "";
fn = await db.oneOrNone('SELECT * FROM functions WHERE S3_LOCATION=$1 OR S3_LOCATION=$2 LIMIT 1',[event.Records[0].s3.object.key,event.Records[0].s3.object.key.split('/')[1]]);
fn = await db.oneOrNone('SELECT * FROM functions WHERE S3_LOCATION=$1 OR S3_LOCATION=$2 LIMIT 1', [event.Records[0].s3.object.key, event.Records[0].s3.object.key.split('/')[1]]);
console.log('FN', fn);

if (!fn) {
Expand All @@ -190,22 +200,22 @@ exports.creatorHandle = async (event) => {
try {
console.log("Getting Function");
const dataFn = await lambda.getFunction({
FunctionName: `fiu_${fn.id}`,
FunctionName: `${fn.fiu_id}_${fn.id}`,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might probably break lambda's name limit (64 chars)

Qualifier: '$LATEST',
}).promise();
if (dataFn) {
try {
console.log("Updating Function")
resp = await lambda.updateFunctionCode({
FunctionName: dataFn.Configuration.FunctionName,
S3Bucket: event.Records[0].s3.bucket.name,
S3Bucket: event.Records[0].s3.bucket.name,
S3Key: event.Records[0].s3.object.key
}).promise();
if (fn.handler !== dataFn.Configuration.Handler || fn.runtime !== dataFn.Configuration.Runtime) {
console.log("Updating Handler & Runtime")
await lambda.updateFunctionConfiguration({
FunctionName: dataFn.Configuration.FunctionName,
Handler: fn.handler,
Handler: fn.handler,
Runtime: fn.runtime
}).promise();
}
Expand All @@ -221,30 +231,30 @@ exports.creatorHandle = async (event) => {
console.log("Creating Function");
resp = await lambda.createFunction({
Code: {
S3Bucket: event.Records[0].s3.bucket.name,
S3Bucket: event.Records[0].s3.bucket.name,
S3Key: event.Records[0].s3.object.key
},
Description: `Package ${fn.id} for FIU ${fn.fiu_id}`,
},
Description: `Package ${fn.id} for FIU ${fn.fiu_id}`,
Environment: {
Variables: {
"CREATION_DATE": (new Date()).toString(),
"CREATION_DATE": (new Date()).toString(),
}
},
FunctionName: `fiu_${fn.id}`,
Handler: fn.handler,
Role: "arn:aws:iam::788726710547:role/VDRFiuBinaryLambdaRole",
},
FunctionName: `${fn.fiu_id}_${fn.id}`,
Handler: fn.handler,
Role: "arn:aws:iam::788726710547:role/VDRFiuBinaryLambdaRole",
// Runtime: 'nodejs12.x',
Runtime: fn.runtime,
Runtime: fn.runtime,
Tags: {
"DEPARTMENT": "FIU_RES"
},
},
Timeout: 15,
VpcConfig: {
SecurityGroupIds: [ "sg-0dc27f2e5b0369bee" ],
SubnetIds: [ "subnet-08592b3f5a6fcebbc" ],
VpcConfig: {
SecurityGroupIds: ["sg-0dc27f2e5b0369bee"],
SubnetIds: ["subnet-08592b3f5a6fcebbc"],
// VpcId: "vpc-072adfdb316436fd6"
}

}).promise();
}
return resp;
Expand All @@ -255,23 +265,23 @@ exports.creatorHandle = async (event) => {

exports.stateCheckFn = async (event) => {
console.log('Scheduled State Check Fn Running');

// Connect To Secret Manager & Database
const data = await secretManager.getSecretValue({SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P'}).promise()
const data = await secretManager.getSecretValue({ SecretId: 'arn:aws:secretsmanager:us-east-1:788726710547:secret:postgres-q03L8P' }).promise()

secret = JSON.parse(data.SecretString);
const cn = {
host: secret.host,
port: secret.port,
database: secret.dbname,
user: secret.username,
password: secret.password,
max: 1
max: 1
};

const db = pgp(cn);
// const fnQueue = await db.manyOrNone(`SELECT * FROM functions WHERE fiu_id=$1`,['7ec79ac6-a8ec-4223-82a1-6547442d7f32'])
const fnQueue = await db.manyOrNone(`SELECT * FROM functions WHERE STATE=$1`,['PENDING'])
const fnQueue = await db.manyOrNone(`SELECT * FROM functions WHERE STATE=$1`, ['PENDING'])
console.log(fnQueue);
// return;
const result = [];
Expand All @@ -289,7 +299,7 @@ exports.stateCheckFn = async (event) => {

result.push(pgp.as.format('UPDATE functions SET state = $1 WHERE id = $2', [dataFn.Configuration.State.toUpperCase(), fn.id]))
} catch {
if (new Date(fn.created) < new Date() - 15*60*1000) {
if (new Date(fn.created) < new Date() - 15 * 60 * 1000) {
// This was created over 15 minutes ago.
result.push(pgp.as.format('UPDATE functions SET state = $1 WHERE id = $2', ['FAILED', fn.id]))
}
Expand Down