From e231e0087c792f3039e11d071b2b26a9408f3e22 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Thu, 31 Oct 2019 18:40:46 +0100 Subject: [PATCH 01/12] initial run --- .gitignore | 3 ++ Makefile | 39 +++++++++++++++++++ examples/project/Makefile | 16 ++++++-- .../{ => cluster}/turbine-cluster.template | 4 ++ templates/cluster/turbine-sg.template | 7 ++++ templates/cluster/turbine-vpn.template | 7 ++++ .../services/turbine-codedeploy.template | 7 ++++ .../{ => services}/turbine-scheduler.template | 0 .../{ => services}/turbine-webserver.template | 0 .../{ => services}/turbine-workerset.template | 0 10 files changed, 79 insertions(+), 4 deletions(-) rename templates/{ => cluster}/turbine-cluster.template (99%) create mode 100644 templates/cluster/turbine-sg.template create mode 100644 templates/cluster/turbine-vpn.template create mode 100644 templates/services/turbine-codedeploy.template rename templates/{ => services}/turbine-scheduler.template (100%) rename templates/{ => services}/turbine-webserver.template (100%) rename templates/{ => services}/turbine-workerset.template (100%) diff --git a/.gitignore b/.gitignore index 5612a58c..ca7ca0f2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ # taskcat taskcat_outputs/ + +*.tgz +*.idea \ No newline at end of file diff --git a/Makefile b/Makefile index 1eb5852d..c981a5d0 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,15 @@ +define message1 + Environment variable BASE_IP is required. Not set. + Use following command: + "$$ my_ip=`curl ipinfo.io | jq .ip`;eval my_ip=$${my_ip[i]};my_ip="$$my_ip/32"; export BASE_IP=$$my_ip" + +endef + +ifndef BASE_IP +export message1 +$(error $(message1)) +endif + ifndef BRANCH BRANCH := $(shell git rev-parse --abbrev-ref HEAD) endif @@ -8,6 +20,11 @@ else BUCKET := s3://turbine-quickstart/quickstart-turbine-airflow-$(BRANCH) endif +# turbine-master +CURRENT_LOCAL_IP = $(BASE_IP) +# DELETE ME +AWS_REGION := eu-central-1 +PROJECT_NAME := eksairflow01-staging lint: cfn-lint templates/*.template @@ -17,3 +34,25 @@ test: sync: aws s3 sync --exclude '.*' --acl public-read . $(BUCKET) + +# DELETE ME +artifacts: + aws s3 cp --recursive submodules/quickstart-aws-vpc s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}submodules/quickstart-aws-vpc/templates/ + aws s3 cp --recursive templates/cluster s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + aws s3 cp --recursive templates/services s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + +# DELETE ME +cluster: + aws cloudformation --region ${AWS_REGION} create-stack --stack-name ${PROJECT_NAME} \ + --template-body file://templates/turbine-master.template \ + --parameters \ + ParameterKey="AllowedWebBlock",ParameterValue="${CURRENT_LOCAL_IP}" \ + ParameterKey="DbMasterPassword",ParameterValue="super_secret" \ + ParameterKey="QSS3BucketName",ParameterValue="${PROJECT_NAME}-${AWS_REGION}" \ + ParameterKey="QSS3KeyPrefix",ParameterValue="${PROJECT_NAME}" \ + --capabilities CAPABILITY_NAMED_IAM + +# DELETE ME +clean: + aws cloudformation delete-stack --stack-name ${PROJECT_NAME} + diff --git a/examples/project/Makefile b/examples/project/Makefile index 9fc1af3f..1315bdf4 100644 --- a/examples/project/Makefile +++ b/examples/project/Makefile @@ -1,5 +1,12 @@ -ifndef stack-name -$(error stack-name is not set) +define message1 + Environment variable stack_name is required. Not set. + Use following command: + "$$ export stack_name=" + +endef + +ifndef stack_name +$(error $(message1)) endif ifndef revision revision := $(shell date --utc +%Y%m%dT%H%M%SZ) @@ -8,16 +15,17 @@ endif define getRef $(shell aws cloudformation describe-stacks \ - --stack-name $(stack-name) \ + --stack-name $(stack_name) \ --query "Stacks[0].Outputs[?OutputKey=='$(1)'].OutputValue" \ --output text) endef + APPLICATION := $(call getRef,CodeDeployApplication) DEPLOYMENT_GROUP := $(call getRef,CodeDeployDeploymentGroup) DEPLOYMENTS_BUCKET := $(call getRef,DeploymentsBucket) -PACKAGE := $(stack-name)_$(revision).tgz +PACKAGE := $(stack_name)_$(revision).tgz package: diff --git a/templates/turbine-cluster.template b/templates/cluster/turbine-cluster.template similarity index 99% rename from templates/turbine-cluster.template rename to templates/cluster/turbine-cluster.template index e6e67774..efa0d095 100644 --- a/templates/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -169,6 +169,10 @@ Resources: ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole' + + # ---------------------- Metric for Scaling Mechanism----------------------- + + # Allowing 'Metric Lambda' to access the cloudwatch metric 'Turbine' Logger: Type: AWS::IAM::Role Properties: diff --git a/templates/cluster/turbine-sg.template b/templates/cluster/turbine-sg.template new file mode 100644 index 00000000..9f9cb903 --- /dev/null +++ b/templates/cluster/turbine-sg.template @@ -0,0 +1,7 @@ +AWSTemplateFormatVersion: "2010-09-09" +Description: +Resources: + DummyServer: + Type: "AWS::EC2::Instance" + Properties: + ImageId: "" diff --git a/templates/cluster/turbine-vpn.template b/templates/cluster/turbine-vpn.template new file mode 100644 index 00000000..9f9cb903 --- /dev/null +++ b/templates/cluster/turbine-vpn.template @@ -0,0 +1,7 @@ +AWSTemplateFormatVersion: "2010-09-09" +Description: +Resources: + DummyServer: + Type: "AWS::EC2::Instance" + Properties: + ImageId: "" diff --git a/templates/services/turbine-codedeploy.template b/templates/services/turbine-codedeploy.template new file mode 100644 index 00000000..9f9cb903 --- /dev/null +++ b/templates/services/turbine-codedeploy.template @@ -0,0 +1,7 @@ +AWSTemplateFormatVersion: "2010-09-09" +Description: +Resources: + DummyServer: + Type: "AWS::EC2::Instance" + Properties: + ImageId: "" diff --git a/templates/turbine-scheduler.template b/templates/services/turbine-scheduler.template similarity index 100% rename from templates/turbine-scheduler.template rename to templates/services/turbine-scheduler.template diff --git a/templates/turbine-webserver.template b/templates/services/turbine-webserver.template similarity index 100% rename from templates/turbine-webserver.template rename to templates/services/turbine-webserver.template diff --git a/templates/turbine-workerset.template b/templates/services/turbine-workerset.template similarity index 100% rename from templates/turbine-workerset.template rename to templates/services/turbine-workerset.template From 4a214aa1ef06a86fc3c7278b93592cd9237b0207 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Thu, 31 Oct 2019 19:12:04 +0100 Subject: [PATCH 02/12] adjust lint targets --- Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index c981a5d0..0ea18f1a 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,8 @@ AWS_REGION := eu-central-1 PROJECT_NAME := eksairflow01-staging lint: - cfn-lint templates/*.template + cfn-lint templates/cluster/*.template + cfn-lint templates/services/*.template test: taskcat -c ./ci/taskcat.yaml From dd47cd8e72c0d13c1c1f0096b583e650eea381f3 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Thu, 31 Oct 2019 19:20:40 +0100 Subject: [PATCH 03/12] adjust lint targets (1) --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index 05ba573d..a29b8334 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,11 @@ install: git: submodules: false before_install: + # DELETE ME + - my_ip=`curl ipinfo.io | jq .ip`;eval my_ip=${my_ip[i]} + - my_ip="$my_ip/32" + - export BASE_IP=$my_ip + - sed -i 's/git@github.com:/https:\/\/github.com\//' .gitmodules - git submodule update --init --recursive - git checkout .gitmodules From 93b3894032414380ea3b6c542186205a78d53760 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Fri, 1 Nov 2019 17:08:20 +0100 Subject: [PATCH 04/12] cfn auto deletion deploy bucket --- .travis.yml | 4 +- CHANGELOG.md | 30 + templates/cluster/turbine-cluster.template | 1211 +++++++++++--------- templates/cluster/turbine-vpn.template | 7 - 4 files changed, 712 insertions(+), 540 deletions(-) create mode 100644 CHANGELOG.md delete mode 100644 templates/cluster/turbine-vpn.template diff --git a/.travis.yml b/.travis.yml index a29b8334..c4d3b41d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,9 +8,7 @@ git: submodules: false before_install: # DELETE ME - - my_ip=`curl ipinfo.io | jq .ip`;eval my_ip=${my_ip[i]} - - my_ip="$my_ip/32" - - export BASE_IP=$my_ip + - export BASE_IP=0.0.0.0/0 - sed -i 's/git@github.com:/https:\/\/github.com\//' .gitmodules - git submodule update --init --recursive diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..3782e481 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,30 @@ +# CHANGELOG + +Fork: 31.09.2019 +Branch: develop +Contact: olmax99@gmail.com + +## Changes in Branch + +### 1. Comments + +Comments are good, we love comments!! + +### 2. Structure + +- More granular approach <- easier for debugging + * external services templates contains database and SQS + * Security groups have their own template +- Templates split in `cluster` and `services` + +### 3. Delete Buckets + +Incident: DELETE_FAILED with Logs and Deployment Bucket are not empty + +- Added custom cfn event + Lambda function for cleaning deployments bucket contents when delete-stack +- Retain Logs bucket for error investigation or dag data archiving + + + + + \ No newline at end of file diff --git a/templates/cluster/turbine-cluster.template b/templates/cluster/turbine-cluster.template index efa0d095..5f5ecdf2 100644 --- a/templates/cluster/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -129,12 +129,574 @@ Resources: DependsOn: - Meta + # ---------------------- Airflow EC2 cluster instance configuration----------------------------- + + Meta: + Type: AWS::CloudFormation::WaitConditionHandle + Properties: {} + Metadata: + AWS::CloudFormation::Init: + configSets: + default: + - filesys + - runtime + - secrets + - sysconf + - migrate + - service + - lchooks + - metahup + - cdagent + filesys: + commands: + mkdir: + test: test ! -d /airflow + command: | + mkdir /airflow + chown -R ec2-user /airflow + mount: + test: test ! -d /mnt/efs + command: !Sub | + mkdir /mnt/efs + fspec="${EfsFileSystem}.efs.${AWS::Region}.amazonaws.com:/" + param="nfsvers=4.1,rsize=1048576,wsize=1048576" + param="$param,hard,timeo=600,retrans=2,noresvport" + echo "$fspec /mnt/efs nfs $param,_netdev 0 0" >> /etc/fstab + mount /mnt/efs && chown -R ec2-user /mnt/efs + runtime: + packages: + yum: + git: [] + gcc: [] + gcc-c++: [] + jq: [] + lapack-devel: [] + libcurl-devel: [] + libxml2-devel: [] + libxslt-devel: [] + openssl-devel: [] + postgresql-devel: [] + python3: [] + python3-devel: [] + python3-pip: [] + python3-wheel: [] + commands: + install: + command: | + PYCURL_SSL_LIBRARY=openssl pip3 install \ + --no-cache-dir --compile --ignore-installed \ + pycurl + SLUGIFY_USES_TEXT_UNIDECODE=yes pip3 install \ + celery[sqs] \ + apache-airflow[celery,postgres,s3,crypto]==1.10.4 + secrets: + commands: + generate: + command: !Sub | + export $(cat /etc/environment | xargs) + + if [ "$TURBINE_MACHINE" != "SCHEDULER" ]; then + echo "Secret generation reserved for the scheduler" + exit 0 + fi + FERNET_KEY=$(aws ssm get-parameter \ + --name ${AWS::StackName}-fernet-key \ + --region '${AWS::Region}' \ + --query 'Parameter.Value') + if [ "$FERNET_KEY" = "" ]; then + FERNET_KEY=$(python3 -c "if True:# + from cryptography.fernet import Fernet + key = Fernet.generate_key().decode() + print(key)") + aws ssm put-parameter \ + --name ${AWS::StackName}-fernet-key \ + --region '${AWS::Region}' \ + --value $FERNET_KEY \ + --type SecureString + fi + retrieve: + command: !Sub | + while [ "$FERNET_KEY" = "" ]; do + echo "Waiting for Fernet key to be available..." + sleep 1 + FERNET_KEY=$(aws ssm get-parameter \ + --name ${AWS::StackName}-fernet-key \ + --region '${AWS::Region}' \ + --with-decryption \ + --query 'Parameter.Value' \ + --output text) + done + echo "FERNET_KEY=$FERNET_KEY" >> /etc/environment + sysconf: + files: + /etc/sysconfig/airflow: + content: !Sub + - | + TURBINE_MACHINE=${!TURBINE_MACHINE} + AWS_DEFAULT_REGION=${AWS::Region} + AIRFLOW_HOME=/airflow + AIRFLOW__CORE__EXECUTOR=CeleryExecutor + AIRFLOW__CORE__FERNET_KEY=${!FERNET_KEY} + AIRFLOW__CORE__LOAD_EXAMPLES=${LoadExampleDags} + TURBINE__CORE__LOAD_DEFAULTS=${LoadDefaultConn} + AIRFLOW__CORE__SQL_ALCHEMY_CONN=${DbUri} + AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://${LogsBucket} + AIRFLOW__CORE__REMOTE_LOGGING=True + AIRFLOW__WEBSERVER__BASE_URL=http://${!HOSTNAME}:${WebserverPort} + AIRFLOW__WEBSERVER__WEB_SERVER_PORT=${WebserverPort} + AIRFLOW__CELERY__BROKER_URL=sqs:// + AIRFLOW__CELERY__DEFAULT_QUEUE=${QueueName} + AIRFLOW__CELERY__RESULT_BACKEND=db+${DbUri} + AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION=${AWS::Region} + - QueueName: !GetAtt Tasks.QueueName + DbUri: !Join + - '' + - - postgresql:// + - !Ref DbMasterUsername + - ':' + - !Ref DbMasterPassword + - '@' + - !GetAtt Database.Endpoint.Address + - /airflow + commands: + envsubst: + command: | + export $(cat /etc/environment | xargs) + + PUBLIC=$(curl -s -o /dev/null -w "%{http_code}" \ + http://169.254.169.254/latest/meta-data/public-ipv4) + PUB_IPV4=$(ec2-metadata -v | awk '{print $2}') + LOC_IPV4=$(ec2-metadata -o | awk '{print $2}') + if [ $PUBLIC = "200" ] + then HOSTNAME=$PUB_IPV4 + else HOSTNAME=$LOC_IPV4 + fi + + echo "$(envsubst /etc/sysconfig/airflow + migrate: + commands: + migration: + command: | + export $(cat /etc/environment | xargs) + export $(cat /etc/sysconfig/airflow | xargs) + if [ "$TURBINE_MACHINE" != "SCHEDULER" ]; then + echo "Database setup reserved for the scheduler" + exit 0 + fi + if [ "$TURBINE__CORE__LOAD_DEFAULTS" == "True" ]; then + su -c '/usr/local/bin/airflow initdb' ec2-user + else + su -c '/usr/local/bin/airflow upgradedb' ec2-user + fi + service: + files: + /usr/bin/turbine: + mode: 755 + content: | + #!/bin/sh + if [ "$TURBINE_MACHINE" == "SCHEDULER" ] + then exec airflow scheduler + elif [ "$TURBINE_MACHINE" == "WEBSERVER" ] + then exec airflow webserver + elif [ "$TURBINE_MACHINE" == "WORKER" ] + then exec airflow worker + else echo "TURBINE_MACHINE value unknown" && exit 1 + fi + /usr/lib/tmpfiles.d/airflow.conf: + content: | + D /run/airflow 0755 ec2-user ec2-user + /usr/lib/systemd/system/airflow.service: + content: | + [Service] + EnvironmentFile=/etc/sysconfig/airflow + User=ec2-user + Group=ec2-user + ExecStart=/usr/bin/turbine + Restart=always + RestartSec=5s + KillMode=mixed + TimeoutStopSec=24h + [Install] + WantedBy=multi-user.target + /usr/lib/systemd/system/watcher.path: + content: | + [Unit] + After=airflow.service + PartOf=airflow.service + [Path] + PathModified=/etc/sysconfig/airflow + [Install] + WantedBy=airflow.service + /usr/lib/systemd/system/watcher.service: + content: | + [Service] + Type=oneshot + ExecStartPre=/usr/bin/systemctl daemon-reload + ExecStart=/usr/bin/systemctl restart airflow + # TODO: How does the HAS_DEPLOYMENT mechanism work here? + commands: + setup: + command: !Sub | + HAS_DEPLOYMENT=$(aws deploy list-deployments \ + --application-name ${AWS::StackName}-deployment-application \ + --deployment-group ${AWS::StackName}-deployment-group \ + --region ${AWS::Region} | \ + jq '.deployments | has(0)') + + systemctl enable airflow.service watcher.path + + if [ "$HAS_DEPLOYMENT" = "false" ]; then + systemctl start airflow + else + echo "Deployment pending, deferring service start" + fi + # TODO: What does lchooks stands for? How is it related to timer, beat? + # Lifecycle hooks promoting graceful shutdown, delaying termination for task completion + lchooks: + files: + /usr/bin/lchkill: + mode: 755 + content: !Sub | + #!/bin/sh + INSTANCE_ID=$(ec2-metadata -i | awk '{print $2}') + TERMINATE_MESSAGE="Terminating EC2 instance <$INSTANCE_ID>" + TERMINATING=$(aws autoscaling describe-scaling-activities \ + --auto-scaling-group-name '${AWS::StackName}-scaling-group' \ + --max-items 100 \ + --region '${AWS::Region}' | \ + jq --arg TERMINATE_MESSAGE "$TERMINATE_MESSAGE" \ + '.Activities[] + | select(.Description + | test($TERMINATE_MESSAGE)) != []') + + if [ "$TERMINATING" = "true" ]; then + systemctl stop airflow + fi + /usr/lib/systemd/system/lchkill.timer: + content: | + [Timer] + OnCalendar=*:0/1 + [Install] + WantedBy=airflow.service + /usr/lib/systemd/system/lchkill.service: + content: | + [Service] + Type=oneshot + ExecStart=/usr/bin/lchkill + # WORKER ONLY <- lchbeat, lchbeat.timer + /usr/bin/lchbeat: + mode: 755 + content: !Sub | + #!/bin/sh + SERVICE_STATUS=$(systemctl is-active airflow) + + if [ "$SERVICE_STATUS" = "deactivating" ]; then + aws autoscaling record-lifecycle-action-heartbeat \ + --instance-id $(ec2-metadata -i | awk '{print $2}') \ + --lifecycle-hook-name '${AWS::StackName}-scaling-lfhook' \ + --auto-scaling-group-name '${AWS::StackName}-scaling-group' \ + --region '${AWS::Region}' + fi + /usr/lib/systemd/system/lchbeat.timer: + content: | + [Timer] + OnCalendar=*:0/1 + [Install] + WantedBy=airflow.service + /usr/lib/systemd/system/lchbeat.service: + content: | + [Service] + Type=oneshot + ExecStart=/usr/bin/lchbeat + commands: + setup: + command: | + if [ "$TURBINE_MACHINE" = "WORKER" ]; then + systemctl enable lchkill.timer lchbeat.timer + fi + metahup: + files: + /etc/cfn/cfn-hup.conf: + content: !Sub | + [main] + stack=${AWS::StackId} + region=${AWS::Region} + role=${AirflowRole} + interval=1 + /etc/cfn/hooks.d/cfn-auto-reloader.conf: + content: !Sub | + [cfn-auto-reloader-hook] + triggers=post.update + path=Resources.Meta.Metadata.AWS::CloudFormation::Init + action=/opt/aws/bin/cfn-init -v \ + --region ${AWS::Region} \ + --role ${AirflowRole} \ + --stack ${AWS::StackName} \ + --resource Meta + runas=root + /lib/systemd/system/cfn-hup.service: + content: | + [Service] + ExecStart=/opt/aws/bin/cfn-hup + Restart=always + [Install] + WantedBy=multi-user.target + commands: + setup: + command: | + systemctl enable cfn-hup.service + systemctl start cfn-hup.service + cdagent: + packages: + yum: + ruby: [] + wget: [] + commands: + install: + command: !Sub | + wget https://aws-codedeploy-${AWS::Region}.s3.amazonaws.com/latest/install + chmod +x ./install + ./install auto + + # IAM Profile and role for all EC2 cluster instances + AirflowProfile: + Type: AWS::IAM::InstanceProfile + Properties: + Roles: + - !Ref AirflowRole + + AirflowRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - ec2.amazonaws.com + Action: + - sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM + Policies: + - PolicyName: !Sub ${AWS::StackName}-cfn-describe + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - cloudformation:DescribeStackResource + Resource: !Sub arn:aws:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/${AWS::StackName}/* + - PolicyName: !Sub ${AWS::StackName}-ssm-rw-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - ssm:GetParameter + - ssm:PutParameter + Resource: + - !Sub arn:aws:ssm:*:${AWS::AccountId}:*/* + - PolicyName: !Sub ${AWS::StackName}-queue-rw-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - sqs:ListQueues + Resource: + - !Sub arn:aws:sqs:*:${AWS::AccountId}:* + - Effect: Allow + Action: + - sqs:ChangeMessageVisibility + - sqs:DeleteMessage + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:ReceiveMessage + - sqs:SendMessage + Resource: !Sub + - arn:aws:sqs:*:${AWS::AccountId}:${queue} + - queue: !GetAtt + - Tasks + - QueueName + - PolicyName: !Sub ${AWS::StackName}-deployments-r-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - s3:Get* + - s3:List* + Resource: !Sub arn:aws:s3:::${DeploymentsBucket}/* + - Effect: Allow + Action: + - codedeploy:List* + Resource: !Sub arn:aws:codedeploy:*:${AWS::AccountId}:deploymentgroup:* + - PolicyName: !Sub ${AWS::StackName}-logs-rw-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - s3:Get* + - s3:Put* + Resource: !Sub arn:aws:s3:::${LogsBucket}/* + - PolicyName: !Sub ${AWS::StackName}-lifecycle-heartbeat + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - autoscaling:RecordLifecycleActionHeartbeat + - autoscaling:CompleteLifecycleAction + Resource: !Sub arn:aws:autoscaling:*:${AWS::AccountId}:autoScalingGroup:*:* + - Effect: Allow + Action: + - autoscaling:DescribeScalingActivities + Resource: '*' + + # --------------------- Buckets containing Logs and Deployment packages--------------------- + LogsBucket: Type: AWS::S3::Bucket + DeletionPolicy: Retain DeploymentsBucket: Type: AWS::S3::Bucket + # Custom cloudformation hooks that signal to target: Delete, Update, Create + CleanUpDeployments: + DependsOn: cleanupDeployBucket + Type: Custom::cleanupdeploy + Properties: + ServiceToken: + Fn::GetAtt: + - "cleanupDeployBucket" + - "Arn" + BucketName: !Ref DeploymentsBucket + + # Removes all objects from LogsBucket and DeploymentsBucket upon cfn Delete + # TODO: Use SQS for two events triggering the same lambda function + cleanupDeployBucket: + DependsOn: DeploymentsBucket + Type: "AWS::Lambda::Function" + Properties: + Code: + ZipFile: !Sub | + import boto3 + import json + import logging + # module cfnresponse does not exist for python3.7, use requests instead + # import cfnresponse + # Will yield a warning, but is currently the only solution for python3.7 + # since inline code cannot import third party packages + from botocore.vendored import requests + from botocore.exceptions import ClientError + + logger = logging.getLogger(__name__) + + def setup(level='DEBUG', boto_level=None, **kwargs): + logging.root.setLevel(level) + + if not boto_level: + boto_level = level + + logging.getLogger('boto').setLevel(boto_level) + logging.getLogger('boto3').setLevel(boto_level) + logging.getLogger('botocore').setLevel(boto_level) + logging.getLogger('urllib3').setLevel(boto_level) + + try: + setup('DEBUG', formatter_cls=None, boto_level='ERROR') + except Exception as e: + logger.error(e, exc_info=True) + + def clean_up_bucket(target_bucket): + logger.info(f"Clean content of bucket {target_bucket}.") + s3_resource = boto3.resource('s3') + try: + bucket_response = s3_resource.Bucket(target_bucket).load() + except ClientError as e: + logger.info(f"s3:://{target_bucket} not found. {e}") + return + else: + bucket_obj = s3_resource.Bucket(target_bucket) + bucket_obj.objects.all().delete() + + def handler(event, context): + # helper(event, context) + + response_data = {} + # NOTE: The status value sent by the custom resource provider must be either SUCCESS or FAILED!! + try: + bucket = event['ResourceProperties']['BucketName'] + if event['RequestType'] == 'Delete': + clean_up_bucket(bucket) + if event['RequestType'] == 'Update': + logger.info(f"custom::cleanupbucket update. Target bucket: {bucket}") + if event['RequestType'] == 'Create': + logger.info(f"custom::cleanupbucket create. Target bucket: {bucket}") + send_response_cfn(event, context, "SUCCESS") + except Exception as e: + logger.info(str(e)) + send_response_cfn(event, context, "FAILED") + + def send_response_cfn(event, context, response_status): + response_body = {'Status': response_status, + 'Reason': 'Log stream name: ' + context.log_stream_name, + 'PhysicalResourceId': context.log_stream_name, + 'StackId': event['StackId'], + 'RequestId': event['RequestId'], + 'LogicalResourceId': event['LogicalResourceId'], + 'Data': json.loads("{}")} + # Sends the response signal to the respective custom resource request + requests.put(event['ResponseURL'], data=json.dumps(response_body)) + Description: cleanup Bucket on Delete Lambda function. + Handler: index.handler + Role: !GetAtt CleanupS3ExecutionRole.Arn + Runtime: python3.7 + Timeout: 100 + + CleanupS3ExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + Path: "/" + + CleanupS3ExecutionPolicy: + DependsOn: + - CleanupS3ExecutionRole + Type: AWS::IAM::Policy + Properties: + PolicyName: DeleteS3BucketLogsRolePolicy + Roles: + - Ref: CleanupS3ExecutionRole + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - logs:* + Resource: + - arn:aws:logs:*:*:* + - Effect: Allow + Action: + - s3:* + Resource: + - "*" + + # TODO: Move to separate template if possible + # --------------------- CodeDeploy resources----------------------------------------------- + CodeDeployApplication: Type: AWS::CodeDeploy::Application Properties: @@ -169,7 +731,6 @@ Resources: ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole' - # ---------------------- Metric for Scaling Mechanism----------------------- # Allowing 'Metric Lambda' to access the cloudwatch metric 'Turbine' @@ -287,547 +848,137 @@ Resources: else l = (ANOMV > 0) ? 1.0 : 0.0; await cw.putMetricData({ - Namespace: 'Turbine', - MetricData: [{ MetricName: 'WorkerLoad', - Dimensions: [ { Name: 'StackName', - Value: '${AWS::StackName}' }], - Timestamp: prev, - Value: (l > 0) ? l : 0, - Unit: 'None' }], - }).promise(); - }; - - asgName: !Sub '${AWS::StackName}-scaling-group' - queueName: !GetAtt - - Tasks - - QueueName - Role: !GetAtt - - Logger - - Arn - Metadata: - 'AWS::CloudFormation::Designer': - id: 94c385fa-fb13-42cc-a292-7e68c10956f3 - - Timer: - Type: AWS::Events::Rule - Properties: - ScheduleExpression: rate(1 minute) - State: ENABLED - Targets: - - Arn: !GetAtt - - Metric - - Arn - Id: TargetFunction - - Invoke: - Type: AWS::Lambda::Permission - Properties: - FunctionName: !Ref Metric - Action: lambda:InvokeFunction - Principal: events.amazonaws.com - SourceArn: !GetAtt - - Timer - - Arn - - EfsFileSystem: - Type: AWS::EFS::FileSystem - Properties: - FileSystemTags: - - Key: Name - Value: !Sub ${AWS::StackName}-filesystem - - EfsMountTarget1A: - Type: AWS::EFS::MountTarget - Properties: - FileSystemId: !Ref EfsFileSystem - SubnetId: !Ref PrivateSubnet1AID - SecurityGroups: - - !Ref Access - - EfsMountTarget2A: - Type: AWS::EFS::MountTarget - Properties: - FileSystemId: !Ref EfsFileSystem - SubnetId: !Ref PrivateSubnet2AID - SecurityGroups: - - !Ref Access - - DBs: - Type: AWS::RDS::DBSubnetGroup - Properties: - DBSubnetGroupDescription: Associates the Database Instances with the selected VPC Subnets. - SubnetIds: - - !Ref PrivateSubnet1AID - - !Ref PrivateSubnet2AID - - Database: - Type: AWS::RDS::DBInstance - Properties: - AllocatedStorage: '20' - DBInstanceClass: db.t2.micro - DBName: airflow - Engine: postgres - MasterUsername: !Ref DbMasterUsername - MasterUserPassword: !Ref DbMasterPassword - Tags: - - Key: Name - Value: !Sub ${AWS::StackName}-database - DBSubnetGroupName: !Ref DBs - VPCSecurityGroups: - - !Ref Connection - - Tasks: - Type: AWS::SQS::Queue - Properties: {} - - Access: - Type: AWS::EC2::SecurityGroup - Properties: - GroupDescription: >- - Security Rules with permissions for the shared filesystem across Airflow - instances. - SecurityGroupIngress: - - CidrIp: !Ref VPCCIDR - IpProtocol: TCP - FromPort: 2049 - ToPort: 2049 - VpcId: !Ref VPCID - Tags: - - Key: Name - Value: !Sub '${AWS::StackName}-access' - - Connection: - Type: AWS::EC2::SecurityGroup - Properties: - GroupDescription: Security Rules with permissions for database connections for Airflow. - SecurityGroupIngress: - - CidrIp: !Ref VPCCIDR - IpProtocol: TCP - FromPort: 5432 - ToPort: 5432 - VpcId: !Ref VPCID - Tags: - - Key: Name - Value: !Sub ${AWS::StackName}-connection + Namespace: 'Turbine', + MetricData: [{ MetricName: 'WorkerLoad', + Dimensions: [ { Name: 'StackName', + Value: '${AWS::StackName}' }], + Timestamp: prev, + Value: (l > 0) ? l : 0, + Unit: 'None' }], + }).promise(); + }; + - asgName: !Sub '${AWS::StackName}-scaling-group' + queueName: !GetAtt + - Tasks + - QueueName + Role: !GetAtt + - Logger + - Arn + Metadata: + 'AWS::CloudFormation::Designer': + id: 94c385fa-fb13-42cc-a292-7e68c10956f3 - AirflowRole: - Type: AWS::IAM::Role + Timer: + Type: AWS::Events::Rule Properties: - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Principal: - Service: - - ec2.amazonaws.com - Action: - - sts:AssumeRole - ManagedPolicyArns: - - arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM - Policies: - - PolicyName: !Sub ${AWS::StackName}-cfn-describe - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - cloudformation:DescribeStackResource - Resource: !Sub arn:aws:cloudformation:${AWS::Region}:${AWS::AccountId}:stack/${AWS::StackName}/* - - PolicyName: !Sub ${AWS::StackName}-ssm-rw-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - ssm:GetParameter - - ssm:PutParameter - Resource: - - !Sub arn:aws:ssm:*:${AWS::AccountId}:*/* - - PolicyName: !Sub ${AWS::StackName}-queue-rw-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - sqs:ListQueues - Resource: - - !Sub arn:aws:sqs:*:${AWS::AccountId}:* - - Effect: Allow - Action: - - sqs:ChangeMessageVisibility - - sqs:DeleteMessage - - sqs:GetQueueAttributes - - sqs:GetQueueUrl - - sqs:ReceiveMessage - - sqs:SendMessage - Resource: !Sub - - arn:aws:sqs:*:${AWS::AccountId}:${queue} - - queue: !GetAtt - - Tasks - - QueueName - - PolicyName: !Sub ${AWS::StackName}-deployments-r-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - s3:Get* - - s3:List* - Resource: !Sub arn:aws:s3:::${DeploymentsBucket}/* - - Effect: Allow - Action: - - codedeploy:List* - Resource: !Sub arn:aws:codedeploy:*:${AWS::AccountId}:deploymentgroup:* - - PolicyName: !Sub ${AWS::StackName}-logs-rw-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - s3:Get* - - s3:Put* - Resource: !Sub arn:aws:s3:::${LogsBucket}/* - - PolicyName: !Sub ${AWS::StackName}-lifecycle-heartbeat - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - autoscaling:RecordLifecycleActionHeartbeat - - autoscaling:CompleteLifecycleAction - Resource: !Sub arn:aws:autoscaling:*:${AWS::AccountId}:autoScalingGroup:*:* - - Effect: Allow - Action: - - autoscaling:DescribeScalingActivities - Resource: '*' + ScheduleExpression: rate(1 minute) + State: ENABLED + Targets: + - Arn: !GetAtt + - Metric + - Arn + Id: TargetFunction - AirflowProfile: - Type: AWS::IAM::InstanceProfile + Invoke: + Type: AWS::Lambda::Permission Properties: - Roles: - - !Ref AirflowRole + FunctionName: !Ref Metric + Action: lambda:InvokeFunction + Principal: events.amazonaws.com + SourceArn: !GetAtt + - Timer + - Arn - Meta: - Type: AWS::CloudFormation::WaitConditionHandle - Properties: {} - Metadata: - AWS::CloudFormation::Init: - configSets: - default: - - filesys - - runtime - - secrets - - sysconf - - migrate - - service - - lchooks - - metahup - - cdagent - filesys: - commands: - mkdir: - test: test ! -d /airflow - command: | - mkdir /airflow - chown -R ec2-user /airflow - mount: - test: test ! -d /mnt/efs - command: !Sub | - mkdir /mnt/efs - fspec="${EfsFileSystem}.efs.${AWS::Region}.amazonaws.com:/" - param="nfsvers=4.1,rsize=1048576,wsize=1048576" - param="$param,hard,timeo=600,retrans=2,noresvport" - echo "$fspec /mnt/efs nfs $param,_netdev 0 0" >> /etc/fstab - mount /mnt/efs && chown -R ec2-user /mnt/efs - runtime: - packages: - yum: - git: [] - gcc: [] - gcc-c++: [] - jq: [] - lapack-devel: [] - libcurl-devel: [] - libxml2-devel: [] - libxslt-devel: [] - openssl-devel: [] - postgresql-devel: [] - python3: [] - python3-devel: [] - python3-pip: [] - python3-wheel: [] - commands: - install: - command: | - PYCURL_SSL_LIBRARY=openssl pip3 install \ - --no-cache-dir --compile --ignore-installed \ - pycurl - SLUGIFY_USES_TEXT_UNIDECODE=yes pip3 install \ - celery[sqs] \ - apache-airflow[celery,postgres,s3,crypto]==1.10.4 - secrets: - commands: - generate: - command: !Sub | - export $(cat /etc/environment | xargs) + # ---------------------------- Shared Storage Volumes (EFS)------------------------------ - if [ "$TURBINE_MACHINE" != "SCHEDULER" ]; then - echo "Secret generation reserved for the scheduler" - exit 0 - fi - FERNET_KEY=$(aws ssm get-parameter \ - --name ${AWS::StackName}-fernet-key \ - --region '${AWS::Region}' \ - --query 'Parameter.Value') - if [ "$FERNET_KEY" = "" ]; then - FERNET_KEY=$(python3 -c "if True:# - from cryptography.fernet import Fernet - key = Fernet.generate_key().decode() - print(key)") - aws ssm put-parameter \ - --name ${AWS::StackName}-fernet-key \ - --region '${AWS::Region}' \ - --value $FERNET_KEY \ - --type SecureString - fi - retrieve: - command: !Sub | - while [ "$FERNET_KEY" = "" ]; do - echo "Waiting for Fernet key to be available..." - sleep 1 - FERNET_KEY=$(aws ssm get-parameter \ - --name ${AWS::StackName}-fernet-key \ - --region '${AWS::Region}' \ - --with-decryption \ - --query 'Parameter.Value' \ - --output text) - done - echo "FERNET_KEY=$FERNET_KEY" >> /etc/environment - sysconf: - files: - /etc/sysconfig/airflow: - content: !Sub - - | - TURBINE_MACHINE=${!TURBINE_MACHINE} - AWS_DEFAULT_REGION=${AWS::Region} - AIRFLOW_HOME=/airflow - AIRFLOW__CORE__EXECUTOR=CeleryExecutor - AIRFLOW__CORE__FERNET_KEY=${!FERNET_KEY} - AIRFLOW__CORE__LOAD_EXAMPLES=${LoadExampleDags} - TURBINE__CORE__LOAD_DEFAULTS=${LoadDefaultConn} - AIRFLOW__CORE__SQL_ALCHEMY_CONN=${DbUri} - AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://${LogsBucket} - AIRFLOW__CORE__REMOTE_LOGGING=True - AIRFLOW__WEBSERVER__BASE_URL=http://${!HOSTNAME}:${WebserverPort} - AIRFLOW__WEBSERVER__WEB_SERVER_PORT=${WebserverPort} - AIRFLOW__CELERY__BROKER_URL=sqs:// - AIRFLOW__CELERY__DEFAULT_QUEUE=${QueueName} - AIRFLOW__CELERY__RESULT_BACKEND=db+${DbUri} - AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION=${AWS::Region} - - QueueName: !GetAtt Tasks.QueueName - DbUri: !Join - - '' - - - postgresql:// - - !Ref DbMasterUsername - - ':' - - !Ref DbMasterPassword - - '@' - - !GetAtt Database.Endpoint.Address - - /airflow - commands: - envsubst: - command: | - export $(cat /etc/environment | xargs) + EfsFileSystem: + Type: AWS::EFS::FileSystem + Properties: + FileSystemTags: + - Key: Name + Value: !Sub ${AWS::StackName}-filesystem - PUBLIC=$(curl -s -o /dev/null -w "%{http_code}" \ - http://169.254.169.254/latest/meta-data/public-ipv4) - PUB_IPV4=$(ec2-metadata -v | awk '{print $2}') - LOC_IPV4=$(ec2-metadata -o | awk '{print $2}') - if [ $PUBLIC = "200" ] - then HOSTNAME=$PUB_IPV4 - else HOSTNAME=$LOC_IPV4 - fi + EfsMountTarget1A: + Type: AWS::EFS::MountTarget + Properties: + FileSystemId: !Ref EfsFileSystem + SubnetId: !Ref PrivateSubnet1AID + SecurityGroups: + - !Ref Access - echo "$(envsubst /etc/sysconfig/airflow - migrate: - commands: - migration: - command: | - export $(cat /etc/environment | xargs) - export $(cat /etc/sysconfig/airflow | xargs) - if [ "$TURBINE_MACHINE" != "SCHEDULER" ]; then - echo "Database setup reserved for the scheduler" - exit 0 - fi - if [ "$TURBINE__CORE__LOAD_DEFAULTS" == "True" ]; then - su -c '/usr/local/bin/airflow initdb' ec2-user - else - su -c '/usr/local/bin/airflow upgradedb' ec2-user - fi - service: - files: - /usr/bin/turbine: - mode: 755 - content: | - #!/bin/sh - if [ "$TURBINE_MACHINE" == "SCHEDULER" ] - then exec airflow scheduler - elif [ "$TURBINE_MACHINE" == "WEBSERVER" ] - then exec airflow webserver - elif [ "$TURBINE_MACHINE" == "WORKER" ] - then exec airflow worker - else echo "TURBINE_MACHINE value unknown" && exit 1 - fi - /usr/lib/tmpfiles.d/airflow.conf: - content: | - D /run/airflow 0755 ec2-user ec2-user - /usr/lib/systemd/system/airflow.service: - content: | - [Service] - EnvironmentFile=/etc/sysconfig/airflow - User=ec2-user - Group=ec2-user - ExecStart=/usr/bin/turbine - Restart=always - RestartSec=5s - KillMode=mixed - TimeoutStopSec=24h - [Install] - WantedBy=multi-user.target - /usr/lib/systemd/system/watcher.path: - content: | - [Unit] - After=airflow.service - PartOf=airflow.service - [Path] - PathModified=/etc/sysconfig/airflow - [Install] - WantedBy=airflow.service - /usr/lib/systemd/system/watcher.service: - content: | - [Service] - Type=oneshot - ExecStartPre=/usr/bin/systemctl daemon-reload - ExecStart=/usr/bin/systemctl restart airflow - commands: - setup: - command: !Sub | - HAS_DEPLOYMENT=$(aws deploy list-deployments \ - --application-name ${AWS::StackName}-deployment-application \ - --deployment-group ${AWS::StackName}-deployment-group \ - --region ${AWS::Region} | \ - jq '.deployments | has(0)') + EfsMountTarget2A: + Type: AWS::EFS::MountTarget + Properties: + FileSystemId: !Ref EfsFileSystem + SubnetId: !Ref PrivateSubnet2AID + SecurityGroups: + - !Ref Access - systemctl enable airflow.service watcher.path + # TODO: Move to Services as Database or External template + # ---------------------------- Airflow Postgres---------------------------------------- - if [ "$HAS_DEPLOYMENT" = "false" ]; then - systemctl start airflow - else - echo "Deployment pending, deferring service start" - fi - lchooks: - files: - /usr/bin/lchkill: - mode: 755 - content: !Sub | - #!/bin/sh - INSTANCE_ID=$(ec2-metadata -i | awk '{print $2}') - TERMINATE_MESSAGE="Terminating EC2 instance <$INSTANCE_ID>" - TERMINATING=$(aws autoscaling describe-scaling-activities \ - --auto-scaling-group-name '${AWS::StackName}-scaling-group' \ - --max-items 100 \ - --region '${AWS::Region}' | \ - jq --arg TERMINATE_MESSAGE "$TERMINATE_MESSAGE" \ - '.Activities[] - | select(.Description - | test($TERMINATE_MESSAGE)) != []') + DBs: + Type: AWS::RDS::DBSubnetGroup + Properties: + DBSubnetGroupDescription: Associates the Database Instances with the selected VPC Subnets. + SubnetIds: + - !Ref PrivateSubnet1AID + - !Ref PrivateSubnet2AID - if [ "$TERMINATING" = "true" ]; then - systemctl stop airflow - fi - /usr/lib/systemd/system/lchkill.timer: - content: | - [Timer] - OnCalendar=*:0/1 - [Install] - WantedBy=airflow.service - /usr/lib/systemd/system/lchkill.service: - content: | - [Service] - Type=oneshot - ExecStart=/usr/bin/lchkill - /usr/bin/lchbeat: - mode: 755 - content: !Sub | - #!/bin/sh - SERVICE_STATUS=$(systemctl is-active airflow) + Database: + Type: AWS::RDS::DBInstance + Properties: + AllocatedStorage: '20' + DBInstanceClass: db.t2.micro + DBName: airflow + Engine: postgres + MasterUsername: !Ref DbMasterUsername + MasterUserPassword: !Ref DbMasterPassword + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-database + DBSubnetGroupName: !Ref DBs + VPCSecurityGroups: + - !Ref Connection - if [ "$SERVICE_STATUS" = "deactivating" ]; then - aws autoscaling record-lifecycle-action-heartbeat \ - --instance-id $(ec2-metadata -i | awk '{print $2}') \ - --lifecycle-hook-name '${AWS::StackName}-scaling-lfhook' \ - --auto-scaling-group-name '${AWS::StackName}-scaling-group' \ - --region '${AWS::Region}' - fi - /usr/lib/systemd/system/lchbeat.timer: - content: | - [Timer] - OnCalendar=*:0/1 - [Install] - WantedBy=airflow.service - /usr/lib/systemd/system/lchbeat.service: - content: | - [Service] - Type=oneshot - ExecStart=/usr/bin/lchbeat - commands: - setup: - command: | - if [ "$TURBINE_MACHINE" = "WORKER" ]; then - systemctl enable lchkill.timer lchbeat.timer - fi - metahup: - files: - /etc/cfn/cfn-hup.conf: - content: !Sub | - [main] - stack=${AWS::StackId} - region=${AWS::Region} - role=${AirflowRole} - interval=1 - /etc/cfn/hooks.d/cfn-auto-reloader.conf: - content: !Sub | - [cfn-auto-reloader-hook] - triggers=post.update - path=Resources.Meta.Metadata.AWS::CloudFormation::Init - action=/opt/aws/bin/cfn-init -v \ - --region ${AWS::Region} \ - --role ${AirflowRole} \ - --stack ${AWS::StackName} \ - --resource Meta - runas=root - /lib/systemd/system/cfn-hup.service: - content: | - [Service] - ExecStart=/opt/aws/bin/cfn-hup - Restart=always - [Install] - WantedBy=multi-user.target - commands: - setup: - command: | - systemctl enable cfn-hup.service - systemctl start cfn-hup.service - cdagent: - packages: - yum: - ruby: [] - wget: [] - commands: - install: - command: !Sub | - wget https://aws-codedeploy-${AWS::Region}.s3.amazonaws.com/latest/install - chmod +x ./install - ./install auto + # TODO: Move to Database/External Services template + # ---------------------------- SQS Queuing--------------------------------------------- + Tasks: + Type: AWS::SQS::Queue + Properties: {} + + # TODO: Move to SG template + # ---------------------------- Security Groups----------------------------------------- + + Access: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: >- + Security Rules with permissions for the shared filesystem across Airflow + instances. + SecurityGroupIngress: + - CidrIp: !Ref VPCCIDR + IpProtocol: TCP + FromPort: 2049 + ToPort: 2049 + VpcId: !Ref VPCID + Tags: + - Key: Name + Value: !Sub '${AWS::StackName}-access' + + Connection: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Security Rules with permissions for database connections for Airflow. + SecurityGroupIngress: + - CidrIp: !Ref VPCCIDR + IpProtocol: TCP + FromPort: 5432 + ToPort: 5432 + VpcId: !Ref VPCID + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-connection Outputs: DeploymentsBucket: diff --git a/templates/cluster/turbine-vpn.template b/templates/cluster/turbine-vpn.template deleted file mode 100644 index 9f9cb903..00000000 --- a/templates/cluster/turbine-vpn.template +++ /dev/null @@ -1,7 +0,0 @@ -AWSTemplateFormatVersion: "2010-09-09" -Description: -Resources: - DummyServer: - Type: "AWS::EC2::Instance" - Properties: - ImageId: "" From fb3afd3b1585958168062630509083945402fea4 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Fri, 1 Nov 2019 18:24:43 +0100 Subject: [PATCH 05/12] add SGStack --- CHANGELOG.md | 13 ++-- templates/cluster/turbine-cluster.template | 71 +++++++++++----------- templates/cluster/turbine-sg.template | 51 ++++++++++++++-- templates/turbine-master.template | 14 +++++ 4 files changed, 104 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3782e481..ff2d6cdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,16 +12,19 @@ Comments are good, we love comments!! ### 2. Structure -- More granular approach <- easier for debugging +- More granular approach <- easier for maintaining and debugging * external services templates contains database and SQS - * Security groups have their own template + * All security groups have a dedicated template - Templates split in `cluster` and `services` -### 3. Delete Buckets +### 3. Log and Deployment Bucket -Incident: DELETE_FAILED with Logs and Deployment Bucket are not empty +Incident: After dag run, cloudformation DELETE_FAILED with "Logs and Deployment +Bucket are not empty" -- Added custom cfn event + Lambda function for cleaning deployments bucket contents when delete-stack +- Private Buckets by default +- Added custom Cfn event + Lambda function for cleaning deployments bucket +content when delete-stack - Retain Logs bucket for error investigation or dag data archiving diff --git a/templates/cluster/turbine-cluster.template b/templates/cluster/turbine-cluster.template index 5f5ecdf2..95d08e51 100644 --- a/templates/cluster/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -51,6 +51,10 @@ Parameters: Type: String QSS3KeyPrefix: Type: String + EfsAccessSG: + Type: String + DbConnectionSG: + Type: String Resources: @@ -560,10 +564,36 @@ Resources: LogsBucket: Type: AWS::S3::Bucket + Properties: + AccessControl: Private + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: True + BlockPublicPolicy: True + IgnorePublicAcls: True + RestrictPublicBuckets: True + VersioningConfiguration: + Status: Enabled DeletionPolicy: Retain DeploymentsBucket: Type: AWS::S3::Bucket + Properties: + AccessControl: Private + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: True + BlockPublicPolicy: True + IgnorePublicAcls: True + RestrictPublicBuckets: True + VersioningConfiguration: + Status: Enabled # Custom cloudformation hooks that signal to target: Delete, Update, Create CleanUpDeployments: @@ -891,6 +921,7 @@ Resources: # ---------------------------- Shared Storage Volumes (EFS)------------------------------ + # TODO: Get EfsAccessSG from turbine-sg.template EfsFileSystem: Type: AWS::EFS::FileSystem Properties: @@ -904,7 +935,7 @@ Resources: FileSystemId: !Ref EfsFileSystem SubnetId: !Ref PrivateSubnet1AID SecurityGroups: - - !Ref Access + - !Ref EfsAccessSG EfsMountTarget2A: Type: AWS::EFS::MountTarget @@ -912,7 +943,7 @@ Resources: FileSystemId: !Ref EfsFileSystem SubnetId: !Ref PrivateSubnet2AID SecurityGroups: - - !Ref Access + - !Ref EfsAccessSG # TODO: Move to Services as Database or External template # ---------------------------- Airflow Postgres---------------------------------------- @@ -925,6 +956,7 @@ Resources: - !Ref PrivateSubnet1AID - !Ref PrivateSubnet2AID + # TODO: Get SG from turbine-sg.template Database: Type: AWS::RDS::DBInstance Properties: @@ -939,7 +971,7 @@ Resources: Value: !Sub ${AWS::StackName}-database DBSubnetGroupName: !Ref DBs VPCSecurityGroups: - - !Ref Connection + - !Ref DbConnectionSG # TODO: Move to Database/External Services template # ---------------------------- SQS Queuing--------------------------------------------- @@ -947,39 +979,6 @@ Resources: Type: AWS::SQS::Queue Properties: {} - # TODO: Move to SG template - # ---------------------------- Security Groups----------------------------------------- - - Access: - Type: AWS::EC2::SecurityGroup - Properties: - GroupDescription: >- - Security Rules with permissions for the shared filesystem across Airflow - instances. - SecurityGroupIngress: - - CidrIp: !Ref VPCCIDR - IpProtocol: TCP - FromPort: 2049 - ToPort: 2049 - VpcId: !Ref VPCID - Tags: - - Key: Name - Value: !Sub '${AWS::StackName}-access' - - Connection: - Type: AWS::EC2::SecurityGroup - Properties: - GroupDescription: Security Rules with permissions for database connections for Airflow. - SecurityGroupIngress: - - CidrIp: !Ref VPCCIDR - IpProtocol: TCP - FromPort: 5432 - ToPort: 5432 - VpcId: !Ref VPCID - Tags: - - Key: Name - Value: !Sub ${AWS::StackName}-connection - Outputs: DeploymentsBucket: Value: !Ref DeploymentsBucket diff --git a/templates/cluster/turbine-sg.template b/templates/cluster/turbine-sg.template index 9f9cb903..8c4984c7 100644 --- a/templates/cluster/turbine-sg.template +++ b/templates/cluster/turbine-sg.template @@ -1,7 +1,50 @@ AWSTemplateFormatVersion: "2010-09-09" -Description: + +Description: >- + All Security Groups for Turbine. + +Parameters: + VPCID: + Type: AWS::EC2::VPC::Id + VPCCIDR: + Type: String + AllowedPattern: >- + ^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([0-9]|[1-2][0-9]|3[0-2]))$ + Resources: - DummyServer: - Type: "AWS::EC2::Instance" + + EfsAccessSG: + Type: AWS::EC2::SecurityGroup Properties: - ImageId: "" + GroupDescription: >- + Security Rules with permissions for the shared filesystem across Airflow + instances. + SecurityGroupIngress: + - CidrIp: !Ref VPCCIDR + IpProtocol: TCP + FromPort: 2049 + ToPort: 2049 + VpcId: !Ref VPCID + Tags: + - Key: Name + Value: !Sub '${AWS::StackName}-access' + + DbConnectionSG: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Security Rules with permissions for database connections for Airflow. + SecurityGroupIngress: + - CidrIp: !Ref VPCCIDR + IpProtocol: TCP + FromPort: 5432 + ToPort: 5432 + VpcId: !Ref VPCID + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-connection + +Outputs: + EfsAccessSecurityGroup: + Value: !Ref EfsAccessSG + DbConnectionSecurityGroup: + Value: !Ref DbConnectionSG diff --git a/templates/turbine-master.template b/templates/turbine-master.template index a078f45d..6caebdf4 100644 --- a/templates/turbine-master.template +++ b/templates/turbine-master.template @@ -162,6 +162,18 @@ Resources: - 1 - !GetAZs + SGStack: + Type: AWS::CloudFormation::Stack + Properties: + TemplateURL: !Join + - '/' + - - !Sub https://${QSS3BucketName}.s3.amazonaws.com + - !Sub ${QSS3KeyPrefix}templates + - turbine-sg.template + Parameters: + VPCID: !GetAtt VPCStack.Outputs.VPCID + VPCCIDR: !Ref VPCCIDR + AirflowStack: Type: AWS::CloudFormation::Stack Properties: @@ -192,6 +204,8 @@ Resources: LoadDefaultConn: !Ref LoadDefaultConn QSS3BucketName: !Ref QSS3BucketName QSS3KeyPrefix: !Ref QSS3KeyPrefix + EfsAccessSG: !GetAtt SGStack.Outputs.EfsAccessSecurityGroup + DbConnectionSG: !GetAtt SGStack.Outputs.DbConnectionSecurityGroup Outputs: DeploymentsBucket: From e932d1dc690cbed4b45bdb1c74836addd8b9e107 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Sat, 2 Nov 2019 10:29:11 +0100 Subject: [PATCH 06/12] private buckets --- .../turbine-codedeploy.template | 0 templates/cluster/turbine-cluster.template | 31 ++++++++++++------- templates/services/turbine-workerset.template | 3 +- 3 files changed, 21 insertions(+), 13 deletions(-) rename templates/{services => ci}/turbine-codedeploy.template (100%) diff --git a/templates/services/turbine-codedeploy.template b/templates/ci/turbine-codedeploy.template similarity index 100% rename from templates/services/turbine-codedeploy.template rename to templates/ci/turbine-codedeploy.template diff --git a/templates/cluster/turbine-cluster.template b/templates/cluster/turbine-cluster.template index 95d08e51..0c7e5f4c 100644 --- a/templates/cluster/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -58,6 +58,8 @@ Parameters: Resources: + # ------------------------ Create Cluster Instance Services--------------------------------- + SchedulerStack: Type: AWS::CloudFormation::Stack Properties: @@ -133,7 +135,7 @@ Resources: DependsOn: - Meta - # ---------------------- Airflow EC2 cluster instance configuration----------------------------- + # ---------------------- Turbine EC2 cluster instance configuration----------------------------- Meta: Type: AWS::CloudFormation::WaitConditionHandle @@ -569,14 +571,12 @@ Resources: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: - SSEAlgorithm: AES256 + SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True - VersioningConfiguration: - Status: Enabled DeletionPolicy: Retain DeploymentsBucket: @@ -586,16 +586,14 @@ Resources: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: - SSEAlgorithm: AES256 + SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True - VersioningConfiguration: - Status: Enabled - # Custom cloudformation hooks that signal to target: Delete, Update, Create + # Custom CloudFormation hook that signals to target: Delete, Update, Create CleanUpDeployments: DependsOn: cleanupDeployBucket Type: Custom::cleanupdeploy @@ -606,8 +604,7 @@ Resources: - "Arn" BucketName: !Ref DeploymentsBucket - # Removes all objects from LogsBucket and DeploymentsBucket upon cfn Delete - # TODO: Use SQS for two events triggering the same lambda function + # Removes all objects from Deployments Bucket upon cfn delete-stack cleanupDeployBucket: DependsOn: DeploymentsBucket Type: "AWS::Lambda::Function" @@ -761,10 +758,11 @@ Resources: ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole' + # TODO: Try moving to turbine-workerset.template # ---------------------- Metric for Scaling Mechanism----------------------- # Allowing 'Metric Lambda' to access the cloudwatch metric 'Turbine' - Logger: + MetricRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: @@ -785,6 +783,8 @@ Resources: - cloudwatch:GetMetric* - cloudwatch:PutMetricData + # How is the Metric Lambda connected to the ASG? + # Alarms are triggered above or below treshold based on Lambda Metric Metric: Type: AWS::Lambda::Function Properties: @@ -801,6 +801,8 @@ Resources: _d.setMinutes(d.getMinutes() + m); return _d; }; + + # TODO: Understand time series results (is it an array, a map or a table)? const getMetricAtTime = (ms, m, t) => { const m_idx = ms.MetricDataResults .map(_r => _r.Id) @@ -829,6 +831,8 @@ Resources: curr.setSeconds(0); curr.setMilliseconds(0); const prev = datePlusMinutes(curr, -5); const back = datePlusMinutes(prev, -5); + + # -------Step1: Get the relevant variables for worker load calculation-------- const metrics = await cw.getMetricData({ StartTime: back, EndTime: curr, ScanBy: 'TimestampDescending', @@ -866,6 +870,7 @@ Resources: Stat: 'Average', Unit: 'None', }}, ]}).promise(); + const ANOMV = getMetricAtTime(metrics, 'maxANOMV', prev); const NOER = getMetricAtTime(metrics, 'sumNOER', prev); const GISI = getMetricAtTime(metrics, 'avgGISI', prev); @@ -877,6 +882,8 @@ Resources: l = 1 - NOER / (M * 0.098444 * 300); else l = (ANOMV > 0) ? 1.0 : 0.0; + + # Results are pushed into the Metric 'WorkerLoad' await cw.putMetricData({ Namespace: 'Turbine', MetricData: [{ MetricName: 'WorkerLoad', @@ -892,7 +899,7 @@ Resources: - Tasks - QueueName Role: !GetAtt - - Logger + - MetricRole - Arn Metadata: 'AWS::CloudFormation::Designer': diff --git a/templates/services/turbine-workerset.template b/templates/services/turbine-workerset.template index 01bcaaf8..07b94983 100644 --- a/templates/services/turbine-workerset.template +++ b/templates/services/turbine-workerset.template @@ -53,7 +53,7 @@ Resources: Type: AWS::EC2::SecurityGroup Properties: GroupDescription: >- - Security Rules with permissions for node itercommunication between + Security Rules with permissions for node intercommunication between Airflow worker instances. VpcId: !Ref VPCID SecurityGroupIngress: @@ -89,6 +89,7 @@ Resources: HeartbeatTimeout: 300 LifecycleTransition: autoscaling:EC2_INSTANCE_TERMINATING + # ShrinkAlarm and GrowthAlarm are triggered by the treshold [...] from Metric Workerload ShrinkAlarm: Type: AWS::CloudWatch::Alarm Properties: From 3f875825b5c2ec502634f2f7b16cebab1f71d632 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Sat, 2 Nov 2019 13:17:21 +0100 Subject: [PATCH 07/12] create CI Stack template from CodeDeploy resources in cluster --- Makefile | 1 + templates/ci/turbine-codedeploy.template | 189 ++++++++++++++++++++- templates/cluster/turbine-cluster.template | 178 +------------------ templates/turbine-master.template | 21 ++- 4 files changed, 212 insertions(+), 177 deletions(-) diff --git a/Makefile b/Makefile index 0ea18f1a..7a11e9ca 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,7 @@ artifacts: aws s3 cp --recursive submodules/quickstart-aws-vpc s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}submodules/quickstart-aws-vpc/templates/ aws s3 cp --recursive templates/cluster s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates aws s3 cp --recursive templates/services s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + aws s3 cp --recursive templates/ci s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates # DELETE ME cluster: diff --git a/templates/ci/turbine-codedeploy.template b/templates/ci/turbine-codedeploy.template index 9f9cb903..860b8517 100644 --- a/templates/ci/turbine-codedeploy.template +++ b/templates/ci/turbine-codedeploy.template @@ -1,7 +1,188 @@ AWSTemplateFormatVersion: "2010-09-09" -Description: + +Description: >- + CI CodeDeploy resources for Airflow build. + + NOTE: The DeploymentsBucket is defined in turbine-cluster.template for + the cluster instance configuration dependency. + +Parameters: + SchedulerScalingGroup: + Type: String + WebserverScalingGroup: + Type: String + WorkerSetScalingGroup: + Type: String + DeploymentsBucket: + Type: String + Resources: - DummyServer: - Type: "AWS::EC2::Instance" + + CodeDeployApplication: + Type: AWS::CodeDeploy::Application Properties: - ImageId: "" + ApplicationName: !Sub ${AWS::StackName}-deployment-application + ComputePlatform: Server + + CodeDeployDeploymentGroup: + Type: AWS::CodeDeploy::DeploymentGroup + Properties: + ApplicationName: !Ref CodeDeployApplication + DeploymentGroupName: !Sub ${AWS::StackName}-deployment-group + AutoScalingGroups: + - !Ref SchedulerScalingGroup + - !Ref WebserverScalingGroup + - !Ref WorkerSetScalingGroup + ServiceRoleArn: !GetAtt + - CodeDeployServiceRole + - Arn + + CodeDeployServiceRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - codedeploy.amazonaws.com + Action: + - sts:AssumeRole + ManagedPolicyArns: + - 'arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole' + + # Custom CloudFormation hook that signals to target: Delete, Update, Create + CleanUpDeployments: + DependsOn: cleanupDeployBucket + Type: Custom::cleanupdeploy + Properties: + ServiceToken: + Fn::GetAtt: + - "cleanupDeployBucket" + - "Arn" + BucketName: !Ref DeploymentsBucket + + # Removes all objects from Deployments Bucket upon cfn delete-stack + cleanupDeployBucket: + Type: "AWS::Lambda::Function" + Properties: + Code: + ZipFile: !Sub | + import boto3 + import json + import logging + # module cfnresponse does not exist for python3.7, use requests instead + # import cfnresponse + # Will yield a warning, but is currently the only solution for python3.7 + # since inline code cannot import third party packages + from botocore.vendored import requests + from botocore.exceptions import ClientError + + logger = logging.getLogger(__name__) + + def setup(level='DEBUG', boto_level=None, **kwargs): + logging.root.setLevel(level) + + if not boto_level: + boto_level = level + + logging.getLogger('boto').setLevel(boto_level) + logging.getLogger('boto3').setLevel(boto_level) + logging.getLogger('botocore').setLevel(boto_level) + logging.getLogger('urllib3').setLevel(boto_level) + + try: + setup('DEBUG', formatter_cls=None, boto_level='ERROR') + except Exception as e: + logger.error(e, exc_info=True) + + def clean_up_bucket(target_bucket): + logger.info(f"Clean content of bucket {target_bucket}.") + s3_resource = boto3.resource('s3') + try: + bucket_response = s3_resource.Bucket(target_bucket).load() + except ClientError as e: + logger.info(f"s3:://{target_bucket} not found. {e}") + return + else: + bucket_obj = s3_resource.Bucket(target_bucket) + bucket_obj.objects.all().delete() + + def handler(event, context): + # helper(event, context) + + response_data = {} + # NOTE: The status value sent by the custom resource provider must be either SUCCESS or FAILED!! + try: + bucket = event['ResourceProperties']['BucketName'] + if event['RequestType'] == 'Delete': + clean_up_bucket(bucket) + if event['RequestType'] == 'Update': + logger.info(f"custom::cleanupbucket update. Target bucket: {bucket}") + if event['RequestType'] == 'Create': + logger.info(f"custom::cleanupbucket create. Target bucket: {bucket}") + send_response_cfn(event, context, "SUCCESS") + except Exception as e: + logger.info(str(e)) + send_response_cfn(event, context, "FAILED") + + def send_response_cfn(event, context, response_status): + response_body = {'Status': response_status, + 'Reason': 'Log stream name: ' + context.log_stream_name, + 'PhysicalResourceId': context.log_stream_name, + 'StackId': event['StackId'], + 'RequestId': event['RequestId'], + 'LogicalResourceId': event['LogicalResourceId'], + 'Data': json.loads("{}")} + # Sends the response signal to the respective custom resource request + requests.put(event['ResponseURL'], data=json.dumps(response_body)) + Description: cleanup Bucket on Delete Lambda function. + Handler: index.handler + Role: !GetAtt CleanupS3ExecutionRole.Arn + Runtime: python3.7 + Timeout: 100 + + CleanupS3ExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - sts:AssumeRole + Path: "/" + + CleanupS3ExecutionPolicy: + DependsOn: + - CleanupS3ExecutionRole + Type: AWS::IAM::Policy + Properties: + PolicyName: DeleteS3BucketLogsRolePolicy + Roles: + - Ref: CleanupS3ExecutionRole + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - logs:* + Resource: + - arn:aws:logs:*:*:* + - Effect: Allow + Action: + - s3:* + Resource: + - "*" + +Outputs: + DeploymentsBucket: + Value: !Ref DeploymentsBucket + CodeDeployApplication: + Value: !Ref CodeDeployApplication + CodeDeployDeploymentGroup: + Value: !Ref CodeDeployDeploymentGroup diff --git a/templates/cluster/turbine-cluster.template b/templates/cluster/turbine-cluster.template index 0c7e5f4c..47505eed 100644 --- a/templates/cluster/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -579,6 +579,8 @@ Resources: RestrictPublicBuckets: True DeletionPolicy: Retain + # NOTE: Dependency from instance configuration requires Deployments Bucket to be + # defined here instead in turbine-codedeploy.template DeploymentsBucket: Type: AWS::S3::Bucket Properties: @@ -593,171 +595,6 @@ Resources: IgnorePublicAcls: True RestrictPublicBuckets: True - # Custom CloudFormation hook that signals to target: Delete, Update, Create - CleanUpDeployments: - DependsOn: cleanupDeployBucket - Type: Custom::cleanupdeploy - Properties: - ServiceToken: - Fn::GetAtt: - - "cleanupDeployBucket" - - "Arn" - BucketName: !Ref DeploymentsBucket - - # Removes all objects from Deployments Bucket upon cfn delete-stack - cleanupDeployBucket: - DependsOn: DeploymentsBucket - Type: "AWS::Lambda::Function" - Properties: - Code: - ZipFile: !Sub | - import boto3 - import json - import logging - # module cfnresponse does not exist for python3.7, use requests instead - # import cfnresponse - # Will yield a warning, but is currently the only solution for python3.7 - # since inline code cannot import third party packages - from botocore.vendored import requests - from botocore.exceptions import ClientError - - logger = logging.getLogger(__name__) - - def setup(level='DEBUG', boto_level=None, **kwargs): - logging.root.setLevel(level) - - if not boto_level: - boto_level = level - - logging.getLogger('boto').setLevel(boto_level) - logging.getLogger('boto3').setLevel(boto_level) - logging.getLogger('botocore').setLevel(boto_level) - logging.getLogger('urllib3').setLevel(boto_level) - - try: - setup('DEBUG', formatter_cls=None, boto_level='ERROR') - except Exception as e: - logger.error(e, exc_info=True) - - def clean_up_bucket(target_bucket): - logger.info(f"Clean content of bucket {target_bucket}.") - s3_resource = boto3.resource('s3') - try: - bucket_response = s3_resource.Bucket(target_bucket).load() - except ClientError as e: - logger.info(f"s3:://{target_bucket} not found. {e}") - return - else: - bucket_obj = s3_resource.Bucket(target_bucket) - bucket_obj.objects.all().delete() - - def handler(event, context): - # helper(event, context) - - response_data = {} - # NOTE: The status value sent by the custom resource provider must be either SUCCESS or FAILED!! - try: - bucket = event['ResourceProperties']['BucketName'] - if event['RequestType'] == 'Delete': - clean_up_bucket(bucket) - if event['RequestType'] == 'Update': - logger.info(f"custom::cleanupbucket update. Target bucket: {bucket}") - if event['RequestType'] == 'Create': - logger.info(f"custom::cleanupbucket create. Target bucket: {bucket}") - send_response_cfn(event, context, "SUCCESS") - except Exception as e: - logger.info(str(e)) - send_response_cfn(event, context, "FAILED") - - def send_response_cfn(event, context, response_status): - response_body = {'Status': response_status, - 'Reason': 'Log stream name: ' + context.log_stream_name, - 'PhysicalResourceId': context.log_stream_name, - 'StackId': event['StackId'], - 'RequestId': event['RequestId'], - 'LogicalResourceId': event['LogicalResourceId'], - 'Data': json.loads("{}")} - # Sends the response signal to the respective custom resource request - requests.put(event['ResponseURL'], data=json.dumps(response_body)) - Description: cleanup Bucket on Delete Lambda function. - Handler: index.handler - Role: !GetAtt CleanupS3ExecutionRole.Arn - Runtime: python3.7 - Timeout: 100 - - CleanupS3ExecutionRole: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Principal: - Service: - - lambda.amazonaws.com - Action: - - sts:AssumeRole - Path: "/" - - CleanupS3ExecutionPolicy: - DependsOn: - - CleanupS3ExecutionRole - Type: AWS::IAM::Policy - Properties: - PolicyName: DeleteS3BucketLogsRolePolicy - Roles: - - Ref: CleanupS3ExecutionRole - PolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Action: - - logs:* - Resource: - - arn:aws:logs:*:*:* - - Effect: Allow - Action: - - s3:* - Resource: - - "*" - - # TODO: Move to separate template if possible - # --------------------- CodeDeploy resources----------------------------------------------- - - CodeDeployApplication: - Type: AWS::CodeDeploy::Application - Properties: - ApplicationName: !Sub ${AWS::StackName}-deployment-application - ComputePlatform: Server - - CodeDeployDeploymentGroup: - Type: AWS::CodeDeploy::DeploymentGroup - Properties: - ApplicationName: !Ref CodeDeployApplication - DeploymentGroupName: !Sub ${AWS::StackName}-deployment-group - AutoScalingGroups: - - !GetAtt SchedulerStack.Outputs.AutoScalingGroup - - !GetAtt WebserverStack.Outputs.AutoScalingGroup - - !GetAtt WorkerSetStack.Outputs.AutoScalingGroup - ServiceRoleArn: !GetAtt - - CodeDeployServiceRole - - Arn - - CodeDeployServiceRole: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Principal: - Service: - - codedeploy.amazonaws.com - Action: - - sts:AssumeRole - ManagedPolicyArns: - - 'arn:aws:iam::aws:policy/service-role/AWSCodeDeployRole' - # TODO: Try moving to turbine-workerset.template # ---------------------- Metric for Scaling Mechanism----------------------- @@ -928,7 +765,6 @@ Resources: # ---------------------------- Shared Storage Volumes (EFS)------------------------------ - # TODO: Get EfsAccessSG from turbine-sg.template EfsFileSystem: Type: AWS::EFS::FileSystem Properties: @@ -989,10 +825,12 @@ Resources: Outputs: DeploymentsBucket: Value: !Ref DeploymentsBucket - CodeDeployApplication: - Value: !Ref CodeDeployApplication - CodeDeployDeploymentGroup: - Value: !Ref CodeDeployDeploymentGroup + SchedulerAutoScaling: + Value: !GetAtt SchedulerStack.Outputs.AutoScalingGroup + WebserverAutoScaling: + Value: !GetAtt WebserverStack.Outputs.AutoScalingGroup + WorkerSetAutoScaling: + Value: !GetAtt WorkerSetStack.Outputs.AutoScalingGroup Mappings: AWSAMIRegionMap: diff --git a/templates/turbine-master.template b/templates/turbine-master.template index 6caebdf4..2fe4beb2 100644 --- a/templates/turbine-master.template +++ b/templates/turbine-master.template @@ -207,13 +207,28 @@ Resources: EfsAccessSG: !GetAtt SGStack.Outputs.EfsAccessSecurityGroup DbConnectionSG: !GetAtt SGStack.Outputs.DbConnectionSecurityGroup + CIStack: + Type: AWS::CloudFormation::Stack + Properties: + TemplateURL: !Join + - '/' + - - !Sub https://${QSS3BucketName}.s3.amazonaws.com + - !Sub ${QSS3KeyPrefix}templates + - turbine-codedeploy.template + Parameters: + DeploymentsBucket: !GetAtt AirflowStack.Outputs.DeploymentsBucket + SchedulerScalingGroup: !GetAtt AirflowStack.Outputs.SchedulerAutoScaling + WebserverScalingGroup: !GetAtt AirflowStack.Outputs.WebserverAutoScaling + WorkerSetScalingGroup: !GetAtt AirflowStack.Outputs.WorkerSetAutoScaling + Outputs: + # NOTE: This is the context required by CodeDeploy create-deployment for 'example project airflow' DeploymentsBucket: - Value: !GetAtt AirflowStack.Outputs.DeploymentsBucket + Value: !GetAtt CIStack.Outputs.DeploymentsBucket CodeDeployApplication: - Value: !GetAtt AirflowStack.Outputs.CodeDeployApplication + Value: !GetAtt CIStack.Outputs.CodeDeployApplication CodeDeployDeploymentGroup: - Value: !GetAtt AirflowStack.Outputs.CodeDeployDeploymentGroup + Value: !GetAtt CIStack.Outputs.CodeDeployDeploymentGroup Metadata: AWS::CloudFormation::Interface: From 3cf5c0568254d17733d261cb177d43e48e0c3c5a Mon Sep 17 00:00:00 2001 From: olmax99 Date: Sun, 3 Nov 2019 14:04:24 +0100 Subject: [PATCH 08/12] template for support resources --- CHANGELOG.md | 18 ++-- templates/ci/turbine-codedeploy.template | 3 +- templates/cluster/turbine-cluster.template | 87 +++---------------- templates/services/turbine-resource.template | 88 ++++++++++++++++++++ templates/turbine-master.template | 25 +++++- 5 files changed, 134 insertions(+), 87 deletions(-) create mode 100644 templates/services/turbine-resource.template diff --git a/CHANGELOG.md b/CHANGELOG.md index ff2d6cdc..8dbd3732 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,9 @@ Fork: 31.09.2019 Branch: develop -Contact: olmax99@gmail.com +Contact: olmighty99@gmail.com -## Changes in Branch +## Changes in Branch Develop ### 1. Comments @@ -12,17 +12,19 @@ Comments are good, we love comments!! ### 2. Structure -- More granular approach <- easier for maintaining and debugging - * external services templates contains database and SQS - * All security groups have a dedicated template -- Templates split in `cluster` and `services` +- More granular templates folder <- easier for maintaining and debugging + * `turbine-resource.template` contains all the Turbine support services + * The security groups have a dedicated template + * CI has its own sub folder +- Templates split in `cluster`, `services`, and `ci` ### 3. Log and Deployment Bucket -Incident: After dag run, cloudformation DELETE_FAILED with "Logs and Deployment +- Private Buckets by default + +Incident: After dag run, CloudFormation DELETE_FAILED with "Logs and Deployment Bucket are not empty" -- Private Buckets by default - Added custom Cfn event + Lambda function for cleaning deployments bucket content when delete-stack - Retain Logs bucket for error investigation or dag data archiving diff --git a/templates/ci/turbine-codedeploy.template b/templates/ci/turbine-codedeploy.template index 860b8517..e5874403 100644 --- a/templates/ci/turbine-codedeploy.template +++ b/templates/ci/turbine-codedeploy.template @@ -4,7 +4,8 @@ Description: >- CI CodeDeploy resources for Airflow build. NOTE: The DeploymentsBucket is defined in turbine-cluster.template for - the cluster instance configuration dependency. + the cluster instance configuration depends on it. This is in order to + avoid circular dependency between CIStack and AirflowStack. Parameters: SchedulerScalingGroup: diff --git a/templates/cluster/turbine-cluster.template b/templates/cluster/turbine-cluster.template index 47505eed..a5b4c3c3 100644 --- a/templates/cluster/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -38,11 +38,6 @@ Parameters: Type: Number GrowthThreshold: Type: Number - DbMasterUsername: - Type: String - DbMasterPassword: - Type: String - NoEcho: true LoadExampleDags: Type: String LoadDefaultConn: @@ -51,9 +46,16 @@ Parameters: Type: String QSS3KeyPrefix: Type: String - EfsAccessSG: + DbMasterUsername: + Type: String + DbMasterPassword: + Type: String + NoEcho: true + DbEndpointAddress: Type: String - DbConnectionSG: + TaskQueueName: + Type: String + EfsFileSystem: Type: String Resources: @@ -254,7 +256,7 @@ Resources: AIRFLOW__CELERY__DEFAULT_QUEUE=${QueueName} AIRFLOW__CELERY__RESULT_BACKEND=db+${DbUri} AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__REGION=${AWS::Region} - - QueueName: !GetAtt Tasks.QueueName + - QueueName: !Ref TaskQueueName DbUri: !Join - '' - - postgresql:// @@ -262,7 +264,7 @@ Resources: - ':' - !Ref DbMasterPassword - '@' - - !GetAtt Database.Endpoint.Address + - !Ref DbEndpointAddress - /airflow commands: envsubst: @@ -523,9 +525,7 @@ Resources: - sqs:SendMessage Resource: !Sub - arn:aws:sqs:*:${AWS::AccountId}:${queue} - - queue: !GetAtt - - Tasks - - QueueName + - queue: !Ref TaskQueueName - PolicyName: !Sub ${AWS::StackName}-deployments-r-policy PolicyDocument: Version: 2012-10-17 @@ -732,9 +732,7 @@ Resources: }).promise(); }; - asgName: !Sub '${AWS::StackName}-scaling-group' - queueName: !GetAtt - - Tasks - - QueueName + queueName: !Ref TaskQueueName Role: !GetAtt - MetricRole - Arn @@ -763,65 +761,6 @@ Resources: - Timer - Arn - # ---------------------------- Shared Storage Volumes (EFS)------------------------------ - - EfsFileSystem: - Type: AWS::EFS::FileSystem - Properties: - FileSystemTags: - - Key: Name - Value: !Sub ${AWS::StackName}-filesystem - - EfsMountTarget1A: - Type: AWS::EFS::MountTarget - Properties: - FileSystemId: !Ref EfsFileSystem - SubnetId: !Ref PrivateSubnet1AID - SecurityGroups: - - !Ref EfsAccessSG - - EfsMountTarget2A: - Type: AWS::EFS::MountTarget - Properties: - FileSystemId: !Ref EfsFileSystem - SubnetId: !Ref PrivateSubnet2AID - SecurityGroups: - - !Ref EfsAccessSG - - # TODO: Move to Services as Database or External template - # ---------------------------- Airflow Postgres---------------------------------------- - - DBs: - Type: AWS::RDS::DBSubnetGroup - Properties: - DBSubnetGroupDescription: Associates the Database Instances with the selected VPC Subnets. - SubnetIds: - - !Ref PrivateSubnet1AID - - !Ref PrivateSubnet2AID - - # TODO: Get SG from turbine-sg.template - Database: - Type: AWS::RDS::DBInstance - Properties: - AllocatedStorage: '20' - DBInstanceClass: db.t2.micro - DBName: airflow - Engine: postgres - MasterUsername: !Ref DbMasterUsername - MasterUserPassword: !Ref DbMasterPassword - Tags: - - Key: Name - Value: !Sub ${AWS::StackName}-database - DBSubnetGroupName: !Ref DBs - VPCSecurityGroups: - - !Ref DbConnectionSG - - # TODO: Move to Database/External Services template - # ---------------------------- SQS Queuing--------------------------------------------- - Tasks: - Type: AWS::SQS::Queue - Properties: {} - Outputs: DeploymentsBucket: Value: !Ref DeploymentsBucket diff --git a/templates/services/turbine-resource.template b/templates/services/turbine-resource.template new file mode 100644 index 00000000..9c3c14b7 --- /dev/null +++ b/templates/services/turbine-resource.template @@ -0,0 +1,88 @@ +AWSTemplateFormatVersion: "2010-09-09" + +Description: >- + Turbine resources for supporting the main Airflow cluster: + - RDS Airflow metadata database + - SQS Broker + - EFS shared directory + +Parameters: + PrivateSubnet1ID: + Type: String + PrivateSubnet2ID: + Type: String + EfsSecurityGroup: + Type: String + DbSecurityGroup: + Type: String + PostgresDbUser: + Type: String + PostgresMasterPasswd: + Type: String + +Resources: + + # ---------------------------- Shared Storage Volumes (EFS)------------------------------ + + EfsFileSystem: + Type: AWS::EFS::FileSystem + Properties: + FileSystemTags: + - Key: Name + Value: !Sub ${AWS::StackName}-filesystem + + EfsMountTarget1A: + Type: AWS::EFS::MountTarget + Properties: + FileSystemId: !Ref EfsFileSystem + SubnetId: !Ref PrivateSubnet1ID + SecurityGroups: + - !Ref EfsSecurityGroup + + EfsMountTarget2A: + Type: AWS::EFS::MountTarget + Properties: + FileSystemId: !Ref EfsFileSystem + SubnetId: !Ref PrivateSubnet2ID + SecurityGroups: + - !Ref EfsSecurityGroup + + # ---------------------------- Airflow Postgres---------------------------------------- + + DBs: + Type: AWS::RDS::DBSubnetGroup + Properties: + DBSubnetGroupDescription: Associates the Database Instances with the selected VPC Subnets. + SubnetIds: + - !Ref PrivateSubnet1ID + - !Ref PrivateSubnet2ID + + Database: + Type: AWS::RDS::DBInstance + Properties: + AllocatedStorage: '20' + DBInstanceClass: db.t2.micro + DBName: airflow + Engine: postgres + MasterUsername: !Ref PostgresDbUser + MasterUserPassword: !Ref PostgresMasterPasswd + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-database + DBSubnetGroupName: !Ref DBs + VPCSecurityGroups: + - !Ref DbSecurityGroup + + # ---------------------------- SQS Queuing--------------------------------------------- + + Tasks: + Type: AWS::SQS::Queue + Properties: {} + +Outputs: + TaskQueueName: + Value: !GetAtt Tasks.QueueName + DatabaseAddress: + Value: !GetAtt Database.Endpoint.Address + EfsFS: + Value: !Ref EfsFileSystem diff --git a/templates/turbine-master.template b/templates/turbine-master.template index 2fe4beb2..d6a8c4bd 100644 --- a/templates/turbine-master.template +++ b/templates/turbine-master.template @@ -198,14 +198,31 @@ Resources: MaxGroupSize: !Ref MaxGroupSize GrowthThreshold: !Ref GrowthThreshold ShrinkThreshold: !Ref ShrinkThreshold - DbMasterUsername: !Ref DbMasterUsername - DbMasterPassword: !Ref DbMasterPassword LoadExampleDags: !Ref LoadExampleDags LoadDefaultConn: !Ref LoadDefaultConn QSS3BucketName: !Ref QSS3BucketName QSS3KeyPrefix: !Ref QSS3KeyPrefix - EfsAccessSG: !GetAtt SGStack.Outputs.EfsAccessSecurityGroup - DbConnectionSG: !GetAtt SGStack.Outputs.DbConnectionSecurityGroup + DbMasterUsername: !Ref DbMasterUsername + DbMasterPassword: !Ref DbMasterPassword + DbEndpointAddress: !GetAtt ResourceStack.Outputs.DatabaseAddress + TaskQueueName: !GetAtt ResourceStack.Outputs.TaskQueueName + EfsFileSystem: !GetAtt ResourceStack.Outputs.EfsFS + + ResourceStack: + Type: AWS::CloudFormation::Stack + Properties: + TemplateURL: !Join + - '/' + - - !Sub https://${QSS3BucketName}.s3.amazonaws.com + - !Sub ${QSS3KeyPrefix}templates + - turbine-resource.template + Parameters: + PrivateSubnet1ID: !GetAtt VPCStack.Outputs.PrivateSubnet1AID + PrivateSubnet2ID: !GetAtt VPCStack.Outputs.PublicSubnet2ID + EfsSecurityGroup: !GetAtt SGStack.Outputs.EfsAccessSecurityGroup + DbSecurityGroup: !GetAtt SGStack.Outputs.DbConnectionSecurityGroup + PostgresDbUser: !Ref DbMasterUsername + PostgresMasterPasswd: !Ref DbMasterPassword CIStack: Type: AWS::CloudFormation::Stack From c128914adc5131893d27fb7639e9ca29ebbcf082 Mon Sep 17 00:00:00 2001 From: olmax99 Date: Mon, 4 Nov 2019 09:01:58 +0100 Subject: [PATCH 09/12] move Lambda metric to resource template --- CHANGELOG.md | 4 +- templates/cluster/turbine-cluster.template | 170 +----------------- templates/services/turbine-resource.template | 172 ++++++++++++++++++- 3 files changed, 173 insertions(+), 173 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dbd3732..b4772b0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ Contact: olmighty99@gmail.com ### 1. Comments -Comments are good, we love comments!! +- Comments are good, we love comments!! ### 2. Structure @@ -20,7 +20,7 @@ Comments are good, we love comments!! ### 3. Log and Deployment Bucket -- Private Buckets by default +- Private Buckets by default (explicit) Incident: After dag run, CloudFormation DELETE_FAILED with "Logs and Deployment Bucket are not empty" diff --git a/templates/cluster/turbine-cluster.template b/templates/cluster/turbine-cluster.template index a5b4c3c3..23a0d797 100644 --- a/templates/cluster/turbine-cluster.template +++ b/templates/cluster/turbine-cluster.template @@ -139,6 +139,8 @@ Resources: # ---------------------- Turbine EC2 cluster instance configuration----------------------------- + # Loaded by every cluster instance's launch configuration respectively via + # {$ParentStack} --resource Meta Meta: Type: AWS::CloudFormation::WaitConditionHandle Properties: {} @@ -341,7 +343,6 @@ Resources: Type=oneshot ExecStartPre=/usr/bin/systemctl daemon-reload ExecStart=/usr/bin/systemctl restart airflow - # TODO: How does the HAS_DEPLOYMENT mechanism work here? commands: setup: command: !Sub | @@ -358,7 +359,6 @@ Resources: else echo "Deployment pending, deferring service start" fi - # TODO: What does lchooks stands for? How is it related to timer, beat? # Lifecycle hooks promoting graceful shutdown, delaying termination for task completion lchooks: files: @@ -595,172 +595,6 @@ Resources: IgnorePublicAcls: True RestrictPublicBuckets: True - # TODO: Try moving to turbine-workerset.template - # ---------------------- Metric for Scaling Mechanism----------------------- - - # Allowing 'Metric Lambda' to access the cloudwatch metric 'Turbine' - MetricRole: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Principal: - Service: lambda.amazonaws.com - Action: sts:AssumeRole - Policies: - - PolicyName: !Sub ${AWS::StackName}-cloudwatch-policy - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Resource: '*' - Action: - - cloudwatch:GetMetric* - - cloudwatch:PutMetricData - - # How is the Metric Lambda connected to the ASG? - # Alarms are triggered above or below treshold based on Lambda Metric - Metric: - Type: AWS::Lambda::Function - Properties: - Runtime: nodejs8.10 - Handler: index.handler - Code: - ZipFile: !Sub - - | - var AWS = require('aws-sdk'); - AWS.config.update({region: '${AWS::Region}'}); - var cw = new AWS.CloudWatch({apiVersion: '2010-08-01'}); - const datePlusMinutes = (d, m) => { - const _d = new Date(d); - _d.setMinutes(d.getMinutes() + m); - return _d; - }; - - # TODO: Understand time series results (is it an array, a map or a table)? - const getMetricAtTime = (ms, m, t) => { - const m_idx = ms.MetricDataResults - .map(_r => _r.Id) - .indexOf(m); - const t_idx = ms.MetricDataResults[m_idx] - .Timestamps - .map(_t => _t.toISOString()) - .indexOf(t.toISOString()); - return ms.MetricDataResults[m_idx] - .Values[t_idx]; - }; - const discount = (ms, m, t1, t2, ws) => { - let incs = 0, d = t1; - let v1 = getMetricAtTime(ms, m, d), v2; - for (let i = 0; d < t2 ; i++) { - d = datePlusMinutes(t1, i+1); - v2 = getMetricAtTime(ms, m, d); - if (v2 > v1) incs += ws[i]; - v1 = v2; - } - return incs; - }; - exports.handler = async function(event, context) { - let curr = new Date(); - curr.setMinutes(Math.floor(curr.getMinutes()/5)*5-5); - curr.setSeconds(0); curr.setMilliseconds(0); - const prev = datePlusMinutes(curr, -5); - const back = datePlusMinutes(prev, -5); - - # -------Step1: Get the relevant variables for worker load calculation-------- - const metrics = await cw.getMetricData({ - StartTime: back, EndTime: curr, - ScanBy: 'TimestampDescending', - MetricDataQueries: [ - { Id: 'maxANOMV', MetricStat: { - Metric: { Namespace: 'AWS/SQS', - MetricName: 'ApproximateNumberOfMessagesVisible', - Dimensions: [{ Name: 'QueueName', - Value: '${queueName}' }]}, - Period: 300, - Stat: 'Maximum', - Unit: 'Count' }}, - { Id: 'sumNOER', MetricStat: { - Metric: { Namespace: 'AWS/SQS', - MetricName: 'NumberOfEmptyReceives', - Dimensions: [{ Name: 'QueueName', - Value: '${queueName}' }]}, - Period: 300, - Stat: 'Sum', - Unit: 'Count', }}, - { Id: 'avgGISI', MetricStat: { - Metric: { Namespace: 'AWS/AutoScaling', - MetricName: 'GroupInServiceInstances', - Dimensions: [{ Name: 'AutoScalingGroupName', - Value: '${asgName}' }]}, - Period: 300, - Stat: 'Average', - Unit: 'None', }}, - { Id: 'uGISI', MetricStat: { - Metric: { Namespace: 'AWS/AutoScaling', - MetricName: 'GroupDesiredCapacity', - Dimensions: [{ Name: 'AutoScalingGroupName', - Value: '${asgName}' }]}, - Period: 60, - Stat: 'Average', - Unit: 'None', }}, - ]}).promise(); - - const ANOMV = getMetricAtTime(metrics, 'maxANOMV', prev); - const NOER = getMetricAtTime(metrics, 'sumNOER', prev); - const GISI = getMetricAtTime(metrics, 'avgGISI', prev); - const ws = [0, 0, 0, 0.1, 0.3, 0.3, 0.3, 0.3, 0.2]; - const dGISI = discount(metrics, 'uGISI', back, curr, ws); - const M = GISI - dGISI; - let l; - if (M > 0) - l = 1 - NOER / (M * 0.098444 * 300); - else - l = (ANOMV > 0) ? 1.0 : 0.0; - - # Results are pushed into the Metric 'WorkerLoad' - await cw.putMetricData({ - Namespace: 'Turbine', - MetricData: [{ MetricName: 'WorkerLoad', - Dimensions: [ { Name: 'StackName', - Value: '${AWS::StackName}' }], - Timestamp: prev, - Value: (l > 0) ? l : 0, - Unit: 'None' }], - }).promise(); - }; - - asgName: !Sub '${AWS::StackName}-scaling-group' - queueName: !Ref TaskQueueName - Role: !GetAtt - - MetricRole - - Arn - Metadata: - 'AWS::CloudFormation::Designer': - id: 94c385fa-fb13-42cc-a292-7e68c10956f3 - - Timer: - Type: AWS::Events::Rule - Properties: - ScheduleExpression: rate(1 minute) - State: ENABLED - Targets: - - Arn: !GetAtt - - Metric - - Arn - Id: TargetFunction - - Invoke: - Type: AWS::Lambda::Permission - Properties: - FunctionName: !Ref Metric - Action: lambda:InvokeFunction - Principal: events.amazonaws.com - SourceArn: !GetAtt - - Timer - - Arn - Outputs: DeploymentsBucket: Value: !Ref DeploymentsBucket diff --git a/templates/services/turbine-resource.template b/templates/services/turbine-resource.template index 9c3c14b7..a033c047 100644 --- a/templates/services/turbine-resource.template +++ b/templates/services/turbine-resource.template @@ -4,7 +4,8 @@ Description: >- Turbine resources for supporting the main Airflow cluster: - RDS Airflow metadata database - SQS Broker - - EFS shared directory + - EFS shared storage mount + - Custom Cluster Scaling Metric Parameters: PrivateSubnet1ID: @@ -73,16 +74,181 @@ Resources: VPCSecurityGroups: - !Ref DbSecurityGroup - # ---------------------------- SQS Queuing--------------------------------------------- + # ---------------------------- SQS Queue----------------------------------------------- Tasks: Type: AWS::SQS::Queue Properties: {} + # ---------------------------- Custom Cluster Scaling Metric--------------------------- + + # How is the Metric Lambda connected to the ASG? How is the threshold being calculated? + # Alarms are triggered above or below threshold based on Lambda Metric. ASG reacts to the Alarms. + Metric: + Type: AWS::Lambda::Function + Properties: + Runtime: nodejs8.10 + Handler: index.handler + Code: + ZipFile: !Sub + - | + var AWS = require('aws-sdk'); + AWS.config.update({region: '${AWS::Region}'}); + var cw = new AWS.CloudWatch({apiVersion: '2010-08-01'}); + const datePlusMinutes = (d, m) => { + const _d = new Date(d); + _d.setMinutes(d.getMinutes() + m); + return _d; + }; + + const getMetricAtTime = (ms, m, t) => { + const m_idx = ms.MetricDataResults + .map(_r => _r.Id) + .indexOf(m); + const t_idx = ms.MetricDataResults[m_idx] + .Timestamps + .map(_t => _t.toISOString()) + .indexOf(t.toISOString()); + return ms.MetricDataResults[m_idx] + .Values[t_idx]; + }; + const discount = (ms, m, t1, t2, ws) => { + let incs = 0, d = t1; + let v1 = getMetricAtTime(ms, m, d), v2; + for (let i = 0; d < t2 ; i++) { + d = datePlusMinutes(t1, i+1); + v2 = getMetricAtTime(ms, m, d); + if (v2 > v1) incs += ws[i]; + v1 = v2; + } + return incs; + }; + exports.handler = async function(event, context) { + let curr = new Date(); + curr.setMinutes(Math.floor(curr.getMinutes()/5)*5-5); + curr.setSeconds(0); curr.setMilliseconds(0); + const prev = datePlusMinutes(curr, -5); + const back = datePlusMinutes(prev, -5); + + # ------- Step1: Get the relevant variables for worker load calculation-------- + const metrics = await cw.getMetricData({ + StartTime: back, EndTime: curr, + ScanBy: 'TimestampDescending', + MetricDataQueries: [ + { Id: 'maxANOMV', MetricStat: { + Metric: { Namespace: 'AWS/SQS', + MetricName: 'ApproximateNumberOfMessagesVisible', + Dimensions: [{ Name: 'QueueName', + Value: '${queueName}' }]}, + Period: 300, + Stat: 'Maximum', + Unit: 'Count' }}, + { Id: 'sumNOER', MetricStat: { + Metric: { Namespace: 'AWS/SQS', + MetricName: 'NumberOfEmptyReceives', + Dimensions: [{ Name: 'QueueName', + Value: '${queueName}' }]}, + Period: 300, + Stat: 'Sum', + Unit: 'Count', }}, + { Id: 'avgGISI', MetricStat: { + Metric: { Namespace: 'AWS/AutoScaling', + MetricName: 'GroupInServiceInstances', + Dimensions: [{ Name: 'AutoScalingGroupName', + Value: '${asgName}' }]}, + Period: 300, + Stat: 'Average', + Unit: 'None', }}, + { Id: 'uGISI', MetricStat: { + Metric: { Namespace: 'AWS/AutoScaling', + MetricName: 'GroupDesiredCapacity', + Dimensions: [{ Name: 'AutoScalingGroupName', + Value: '${asgName}' }]}, + Period: 60, + Stat: 'Average', + Unit: 'None', }}, + ]}).promise(); + + # ---------Step2: Calculate Threshold--------------------------------------- + const ANOMV = getMetricAtTime(metrics, 'maxANOMV', prev); + const NOER = getMetricAtTime(metrics, 'sumNOER', prev); + const GISI = getMetricAtTime(metrics, 'avgGISI', prev); + const ws = [0, 0, 0, 0.1, 0.3, 0.3, 0.3, 0.3, 0.2]; + const dGISI = discount(metrics, 'uGISI', back, curr, ws); + const M = GISI - dGISI; + let l; + if (M > 0) + l = 1 - NOER / (M * 0.098444 * 300); + else + l = (ANOMV > 0) ? 1.0 : 0.0; + + # ---------Step3: Push results into the Metric 'WorkerLoad'----------------- + await cw.putMetricData({ + Namespace: 'Turbine', + MetricData: [{ MetricName: 'WorkerLoad', + Dimensions: [ { Name: 'StackName', + Value: '${AWS::StackName}' }], + Timestamp: prev, + Value: (l > 0) ? l : 0, + Unit: 'None' }], + }).promise(); + }; + - asgName: !Sub '${AWS::StackName}-scaling-group' + queueName: !GetAtt Tasks.QueueName + Role: !GetAtt + - MetricRole + - Arn + Metadata: + 'AWS::CloudFormation::Designer': + id: 94c385fa-fb13-42cc-a292-7e68c10956f3 + + Timer: + Type: AWS::Events::Rule + Properties: + ScheduleExpression: rate(1 minute) + State: ENABLED + Targets: + - Arn: !GetAtt + - Metric + - Arn + Id: TargetFunction + + Invoke: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref Metric + Action: lambda:InvokeFunction + Principal: events.amazonaws.com + SourceArn: !GetAtt + - Timer + - Arn + + # Allowing 'Metric Lambda' to access the CloudWatch metric 'Turbine' + MetricRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: !Sub ${AWS::StackName}-cloudwatch-policy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Resource: '*' + Action: + - cloudwatch:GetMetric* + - cloudwatch:PutMetricData + Outputs: TaskQueueName: Value: !GetAtt Tasks.QueueName DatabaseAddress: Value: !GetAtt Database.Endpoint.Address EfsFS: - Value: !Ref EfsFileSystem + Value: !Ref EfsFileSystem \ No newline at end of file From f6bd5a5c06b89499b33061cd326960c4dde73e7d Mon Sep 17 00:00:00 2001 From: olmax99 Date: Mon, 4 Nov 2019 09:08:52 +0100 Subject: [PATCH 10/12] cleanup for PR --- Makefile | 49 +-------------------------------------- examples/project/Makefile | 34 +++++++++------------------ 2 files changed, 12 insertions(+), 71 deletions(-) diff --git a/Makefile b/Makefile index 7a11e9ca..9bfa58ed 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,3 @@ -define message1 - Environment variable BASE_IP is required. Not set. - Use following command: - "$$ my_ip=`curl ipinfo.io | jq .ip`;eval my_ip=$${my_ip[i]};my_ip="$$my_ip/32"; export BASE_IP=$$my_ip" - -endef - -ifndef BASE_IP -export message1 -$(error $(message1)) -endif - ifndef BRANCH BRANCH := $(shell git rev-parse --abbrev-ref HEAD) endif @@ -20,41 +8,6 @@ else BUCKET := s3://turbine-quickstart/quickstart-turbine-airflow-$(BRANCH) endif -# turbine-master -CURRENT_LOCAL_IP = $(BASE_IP) -# DELETE ME -AWS_REGION := eu-central-1 -PROJECT_NAME := eksairflow01-staging lint: - cfn-lint templates/cluster/*.template - cfn-lint templates/services/*.template - -test: - taskcat -c ./ci/taskcat.yaml - -sync: - aws s3 sync --exclude '.*' --acl public-read . $(BUCKET) - -# DELETE ME -artifacts: - aws s3 cp --recursive submodules/quickstart-aws-vpc s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}submodules/quickstart-aws-vpc/templates/ - aws s3 cp --recursive templates/cluster s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates - aws s3 cp --recursive templates/services s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates - aws s3 cp --recursive templates/ci s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates - -# DELETE ME -cluster: - aws cloudformation --region ${AWS_REGION} create-stack --stack-name ${PROJECT_NAME} \ - --template-body file://templates/turbine-master.template \ - --parameters \ - ParameterKey="AllowedWebBlock",ParameterValue="${CURRENT_LOCAL_IP}" \ - ParameterKey="DbMasterPassword",ParameterValue="super_secret" \ - ParameterKey="QSS3BucketName",ParameterValue="${PROJECT_NAME}-${AWS_REGION}" \ - ParameterKey="QSS3KeyPrefix",ParameterValue="${PROJECT_NAME}" \ - --capabilities CAPABILITY_NAMED_IAM - -# DELETE ME -clean: - aws cloudformation delete-stack --stack-name ${PROJECT_NAME} - + cfn-lint templates/*.template diff --git a/examples/project/Makefile b/examples/project/Makefile index 1315bdf4..f238b623 100644 --- a/examples/project/Makefile +++ b/examples/project/Makefile @@ -1,43 +1,31 @@ -define message1 - Environment variable stack_name is required. Not set. - Use following command: - "$$ export stack_name=" - -endef - -ifndef stack_name -$(error $(message1)) +ifndef stack-name +$(error stack-name is not set) endif ifndef revision revision := $(shell date --utc +%Y%m%dT%H%M%SZ) endif +# deployment variables +REVISION := $(shell date --utc +%Y%m%dT%H%M%SZ) +PACKAGE = $(stack-name)_$(REVISION).tgz define getRef $(shell aws cloudformation describe-stacks \ - --stack-name $(stack_name) \ + --stack-name $(stack-name) \ --query "Stacks[0].Outputs[?OutputKey=='$(1)'].OutputValue" \ --output text) endef - APPLICATION := $(call getRef,CodeDeployApplication) DEPLOYMENT_GROUP := $(call getRef,CodeDeployDeploymentGroup) DEPLOYMENTS_BUCKET := $(call getRef,DeploymentsBucket) +# stack resources +APPLICATION = $(stack-name)-deployment-application +DEPLOYMENT_GROUP = $(stack-name)-deployment-group +DEPLOYMENTS_BUCKET = $(shell aws cloudformation describe-stack-resources --stack-name $(stack-name) --logical-resource-id DeploymentsBucket --query 'StackResources[0].PhysicalResourceId' --output text) -PACKAGE := $(stack_name)_$(revision).tgz +PACKAGE := $(stack-name)_$(revision).tgz package: cd airflow && tar czf ../$(PACKAGE) . - -upload: package - aws s3 cp $(PACKAGE) s3://$(DEPLOYMENTS_BUCKET) - -deploy: upload - aws deploy create-deployment \ - --application-name $(APPLICATION) \ - --deployment-group-name $(DEPLOYMENT_GROUP) \ - --s3-location bucket=$(DEPLOYMENTS_BUCKET),bundleType=tgz,key=$(PACKAGE) \ - --deployment-config-name CodeDeployDefault.AllAtOnce \ - --file-exists-behavior OVERWRITE From 31188a5881025469b2b56dff1ffe0b32c7c9f2fd Mon Sep 17 00:00:00 2001 From: olmax99 <38816637+olmax99@users.noreply.github.com> Date: Mon, 4 Nov 2019 09:18:16 +0100 Subject: [PATCH 11/12] Update Makefile --- Makefile | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Makefile b/Makefile index 9bfa58ed..9788bcaf 100644 --- a/Makefile +++ b/Makefile @@ -11,3 +11,9 @@ endif lint: cfn-lint templates/*.template + +test: + taskcat -c ./ci/taskcat.yaml + +sync: + aws s3 sync --exclude '.*' --acl public-read . $(BUCKET) From b1875c3b60df65f48ce574cccc54c80150a1266a Mon Sep 17 00:00:00 2001 From: olmax99 Date: Mon, 4 Nov 2019 09:28:46 +0100 Subject: [PATCH 12/12] Reverting to prev head --- Makefile | 49 ++++++++++++++++++++++++++++++++++++++- examples/project/Makefile | 34 ++++++++++++++++++--------- 2 files changed, 71 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 9bfa58ed..7a11e9ca 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,15 @@ +define message1 + Environment variable BASE_IP is required. Not set. + Use following command: + "$$ my_ip=`curl ipinfo.io | jq .ip`;eval my_ip=$${my_ip[i]};my_ip="$$my_ip/32"; export BASE_IP=$$my_ip" + +endef + +ifndef BASE_IP +export message1 +$(error $(message1)) +endif + ifndef BRANCH BRANCH := $(shell git rev-parse --abbrev-ref HEAD) endif @@ -8,6 +20,41 @@ else BUCKET := s3://turbine-quickstart/quickstart-turbine-airflow-$(BRANCH) endif +# turbine-master +CURRENT_LOCAL_IP = $(BASE_IP) +# DELETE ME +AWS_REGION := eu-central-1 +PROJECT_NAME := eksairflow01-staging lint: - cfn-lint templates/*.template + cfn-lint templates/cluster/*.template + cfn-lint templates/services/*.template + +test: + taskcat -c ./ci/taskcat.yaml + +sync: + aws s3 sync --exclude '.*' --acl public-read . $(BUCKET) + +# DELETE ME +artifacts: + aws s3 cp --recursive submodules/quickstart-aws-vpc s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}submodules/quickstart-aws-vpc/templates/ + aws s3 cp --recursive templates/cluster s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + aws s3 cp --recursive templates/services s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + aws s3 cp --recursive templates/ci s3://${PROJECT_NAME}-${AWS_REGION}/${PROJECT_NAME}templates + +# DELETE ME +cluster: + aws cloudformation --region ${AWS_REGION} create-stack --stack-name ${PROJECT_NAME} \ + --template-body file://templates/turbine-master.template \ + --parameters \ + ParameterKey="AllowedWebBlock",ParameterValue="${CURRENT_LOCAL_IP}" \ + ParameterKey="DbMasterPassword",ParameterValue="super_secret" \ + ParameterKey="QSS3BucketName",ParameterValue="${PROJECT_NAME}-${AWS_REGION}" \ + ParameterKey="QSS3KeyPrefix",ParameterValue="${PROJECT_NAME}" \ + --capabilities CAPABILITY_NAMED_IAM + +# DELETE ME +clean: + aws cloudformation delete-stack --stack-name ${PROJECT_NAME} + diff --git a/examples/project/Makefile b/examples/project/Makefile index f238b623..1315bdf4 100644 --- a/examples/project/Makefile +++ b/examples/project/Makefile @@ -1,31 +1,43 @@ -ifndef stack-name -$(error stack-name is not set) +define message1 + Environment variable stack_name is required. Not set. + Use following command: + "$$ export stack_name=" + +endef + +ifndef stack_name +$(error $(message1)) endif ifndef revision revision := $(shell date --utc +%Y%m%dT%H%M%SZ) endif -# deployment variables -REVISION := $(shell date --utc +%Y%m%dT%H%M%SZ) -PACKAGE = $(stack-name)_$(REVISION).tgz define getRef $(shell aws cloudformation describe-stacks \ - --stack-name $(stack-name) \ + --stack-name $(stack_name) \ --query "Stacks[0].Outputs[?OutputKey=='$(1)'].OutputValue" \ --output text) endef + APPLICATION := $(call getRef,CodeDeployApplication) DEPLOYMENT_GROUP := $(call getRef,CodeDeployDeploymentGroup) DEPLOYMENTS_BUCKET := $(call getRef,DeploymentsBucket) -# stack resources -APPLICATION = $(stack-name)-deployment-application -DEPLOYMENT_GROUP = $(stack-name)-deployment-group -DEPLOYMENTS_BUCKET = $(shell aws cloudformation describe-stack-resources --stack-name $(stack-name) --logical-resource-id DeploymentsBucket --query 'StackResources[0].PhysicalResourceId' --output text) -PACKAGE := $(stack-name)_$(revision).tgz +PACKAGE := $(stack_name)_$(revision).tgz package: cd airflow && tar czf ../$(PACKAGE) . + +upload: package + aws s3 cp $(PACKAGE) s3://$(DEPLOYMENTS_BUCKET) + +deploy: upload + aws deploy create-deployment \ + --application-name $(APPLICATION) \ + --deployment-group-name $(DEPLOYMENT_GROUP) \ + --s3-location bucket=$(DEPLOYMENTS_BUCKET),bundleType=tgz,key=$(PACKAGE) \ + --deployment-config-name CodeDeployDefault.AllAtOnce \ + --file-exists-behavior OVERWRITE