diff --git a/.gitignore b/.gitignore index 560d1a6..0431c10 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ tmp .yardoc _yardoc doc/ + +*.swp +.rvmrc diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..6ae3653 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,6 @@ +language: ruby +rvm: + - 2.1.0 + - 2.0.0 +install: bundle install +script: rake spec diff --git a/Gemfile b/Gemfile index 081baab..b4e2a20 100644 --- a/Gemfile +++ b/Gemfile @@ -1,6 +1,3 @@ source "https://rubygems.org" -gem "aws-sdk" -gem "git" -gem "httparty" -gem "safe_yaml" +gemspec diff --git a/Gemfile.lock b/Gemfile.lock index 6d7c79a..81f80d8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,26 +1,47 @@ +PATH + remote: . + specs: + amalgam-worker (0.1.0) + aws-sdk (~> 1.33) + httparty (~> 0.12) + rest_client + safe_yaml (~> 1.0) + GEM remote: https://rubygems.org/ specs: - aws-sdk (1.8.5) + aws-sdk (1.34.0) json (~> 1.4) nokogiri (>= 1.4.4) uuidtools (~> 2.1) - git (1.2.5) - httparty (0.10.2) - multi_json (~> 1.0) + diff-lcs (1.2.5) + httparty (0.12.0) + json (~> 1.8) multi_xml (>= 0.5.2) - json (1.7.7) - multi_json (1.7.2) - multi_xml (0.5.3) - nokogiri (1.5.9) - safe_yaml (0.9.3) - uuidtools (2.1.3) + json (1.8.1) + mini_portile (0.5.2) + multi_xml (0.5.5) + netrc (0.7.7) + nokogiri (1.6.1) + mini_portile (~> 0.5.0) + rake (10.1.1) + rest_client (1.7.2) + netrc (~> 0.7.7) + rspec (2.14.1) + rspec-core (~> 2.14.0) + rspec-expectations (~> 2.14.0) + rspec-mocks (~> 2.14.0) + rspec-core (2.14.7) + rspec-expectations (2.14.5) + diff-lcs (>= 1.1.3, < 2.0) + rspec-mocks (2.14.5) + safe_yaml (1.0.1) + uuidtools (2.1.4) PLATFORMS ruby DEPENDENCIES - aws-sdk - git - httparty - safe_yaml + amalgam-worker! + rake (~> 10.1) + rspec (~> 2.14) diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..96b0ddc --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Christopher Kleynhans + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..29771fa --- /dev/null +++ b/Rakefile @@ -0,0 +1,9 @@ +require 'rspec' +require 'rspec/core' +require 'rspec/core/rake_task' + +task :default => :spec + +RSpec::Core::RakeTask.new(:spec) do |t| + t.pattern = 'spec/**/*_spec.rb' +end diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec new file mode 100644 index 0000000..1f49d7f --- /dev/null +++ b/amalgam-worker.gemspec @@ -0,0 +1,41 @@ +Gem::Specification.new do |s| + s.name = 'amalgam-worker' + s.version = '0.1.0' + s.date = '2014-02-04' + s.summary = 'Worker program for Team Amalgam' + s.description = 'Runs moolloy build jobs and test jobs' + s.authors = [ 'Chris Kleynhans' ] + s.email = 'chris@kleynhans.ca' + s.homepage = 'http://github.com/TeamAmalgam/worker' + s.license = 'MIT' + s.files = ['LICENSE', + 'lib/amalgam.rb', + 'lib/amalgam/worker.rb', + 'lib/amalgam/worker/manager.rb', + 'lib/amalgam/worker/runner.rb', + 'lib/amalgam/worker/configuration.rb', + 'lib/amalgam/worker/downloader.rb', + 'lib/amalgam/worker/downloader/s3_downloader.rb', + 'lib/amalgam/worker/downloader/test_downloader.rb', + 'lib/amalgam/worker/heartbeater.rb', + 'lib/amalgam/worker/heartbeater/http_heartbeater.rb', + 'lib/amalgam/worker/heartbeater/test_heartbeater.rb', + 'lib/amalgam/worker/uploader.rb', + 'lib/amalgam/worker/uploader/s3_uploader.rb', + 'lib/amalgam/worker/uploader/test_uploader.rb', + 'lib/amalgam/worker/queue.rb', + 'lib/amalgam/worker/queue/sqs_queue.rb', + 'lib/amalgam/worker/queue/test_queue.rb', + 'lib/amalgam/worker/job.rb', + 'lib/amalgam/worker/job/build_job.rb', + 'lib/amalgam/worker/job/run_job.rb'] + s.executables = ['amalgam-worker'] + + s.add_runtime_dependency 'aws-sdk', '~> 1.33' + s.add_runtime_dependency 'safe_yaml', '~> 1.0' + s.add_runtime_dependency 'httparty', '~> 0.12' + s.add_runtime_dependency 'rest_client' + + s.add_development_dependency 'rake', '~> 10.1' + s.add_development_dependency 'rspec', '~> 2.14' +end diff --git a/bin/amalgam-worker b/bin/amalgam-worker new file mode 100755 index 0000000..e5fee17 --- /dev/null +++ b/bin/amalgam-worker @@ -0,0 +1,26 @@ +#!/usr/bin/env ruby + +require 'amalgam/worker' +require 'safe_yaml' + +SafeYAML::OPTIONS[:deserialize_symbols] = true + +config_file_path = ARGV[0] +raise "Configuration path must be specified." if config_file_path.nil? + +worker = Amalgam::Worker.new(config_file_path) +worker.run + +Signal.trap("USR1") do + worker.termination_request +end + +Signal.trap("USR2") do + worker.terminate_current_job +end + +Signal.trap("HUP") do + worker.update_configuration +end + +worker.join diff --git a/common.rb b/common.rb deleted file mode 100644 index 569a3ca..0000000 --- a/common.rb +++ /dev/null @@ -1,11 +0,0 @@ -def sha2_hash(filename) - digest = Digest::SHA2.new - - File.open(filename) do |file| - while not file.eof - digest << file.read(digest.block_length) - end - end - - digest.digest -end diff --git a/configuration.rb b/configuration.rb deleted file mode 100644 index 63ad81a..0000000 --- a/configuration.rb +++ /dev/null @@ -1,117 +0,0 @@ -require 'aws-sdk' -require 'yaml' - -class Configuration - - SETTINGS = [ - :access_key_id, - :secret_access_key, - :command, - :s3_bucket, - :sqs_queue_name, - :server_base_url, - :username, - :password, - :tmp_dir, - :git_repo, - :ssh_key, - :seed_repo_path, - :worker_timeout - ] - - MANDATORY_SETTINGS = [ - :access_key_id, - :secret_access_key, - :command, - :s3_bucket, - :sqs_queue_name, - :server_base_url, - :git_repo - ] - - SECONDS_PER_SECOND = 1 - SECONDS_PER_MINUTE = 60 - MINUTES_PER_HOUR = 60 - SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR - - # Define a public accessor for each setting. - # We don't use attr_reader because we want to synchronize the read. - SETTINGS.each do |setting| - define_method(setting) do - @configuration_mutex.synchronize do - return instance_variable_get("@#{setting}") - end - - end - end - - def initialize(config_file_path) - @config_file_path = File.absolute_path(config_file_path) - @configuration_mutex = Mutex.new - - self.update - end - - def update - @configuration_mutex.synchronize do - load_configuration - update_global_objects - end - end - - # Atomically reads multiple setting from the configuration. - def read_multiple(settings) - values_to_return = {} - - # Grab the configuration mutex to ensure atomic execution - @configuration_mutex.synchronize do - settings.each do |setting| - unless SETTINGS.include?(setting) - raise "Setting #{setting} does not exist" - end - - values_to_return[setting] = self.instance_variable_get("@#{setting}") - end - end - - return values_to_return - end - - private - - def update_global_objects - AWS.config(:access_key_id => @access_key_id, - :secret_access_key => @secret_access_key) - end - - def load_configuration - conf = YAML.safe_load(File.read(@config_file_path)) - validate_configuration_hash(conf) - - conf.each do |key, value| - self.instance_variable_set("@#{key}", value) - end - - if @worker_timeout.is_a?(Hash) - @worker_timeout = SECONDS_PER_HOUR * (@worker_timeout[:hours] || 0) + - SECONDS_PER_MINUTE * (@worker_timeout[:minutes] || 0) + - SECONDS_PER_SECOND * (@worker_timeout[:seconds] || 0) - end - end - - def validate_configuration_hash(hash) - # All hash keys must be settings - hash.each_key do |key| - unless SETTINGS.include?(key) - raise "Unknown setting #{key} specified in the configuration file." - end - end - - # All mandatory settings must be provided. - MANDATORY_SETTINGS.each do |setting| - unless hash.has_key?(setting) - raise "#{setting} was not specified in the configuration file." - end - end - end -end diff --git a/lib/amalgam.rb b/lib/amalgam.rb new file mode 100644 index 0000000..9fd1104 --- /dev/null +++ b/lib/amalgam.rb @@ -0,0 +1,4 @@ +module Amalgam +end + +require_relative 'amalgam/worker' diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb new file mode 100644 index 0000000..4ec2f52 --- /dev/null +++ b/lib/amalgam/worker.rb @@ -0,0 +1,49 @@ +require 'logger' +require_relative '../amalgam' + +class Amalgam::Worker + class << self + def logger=(value) + @logger = value + end + + def logger + @logger ||= Logger.new(STDERR) + return @logger + end + end + + def initialize(configuration_path) + @configuration = Amalgam::Worker::Configuration.new(configuration_path) + @manager = Amalgam::Worker::Manager.new(@configuration) + end + + def run + @manager.run + end + + def join + @manager.join + end + + def termination_request + @manager.request_termination + end + + def terminate_current_job + @manager.terminate_job + end + + def update_configuration + @manager.update_configuration + end +end + +require_relative 'worker/manager' +require_relative 'worker/runner' +require_relative 'worker/job' +require_relative 'worker/queue' +require_relative 'worker/configuration' +require_relative 'worker/heartbeater' +require_relative 'worker/downloader' +require_relative 'worker/uploader' diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb new file mode 100644 index 0000000..87c44ee --- /dev/null +++ b/lib/amalgam/worker/configuration.rb @@ -0,0 +1,161 @@ +require 'aws-sdk' + +class Amalgam::Worker::Configuration + + SECONDS_PER_SECOND = 1 + SECONDS_PER_MINUTE = 60 + MINUTES_PER_HOUR = 60 + SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR + + SETTINGS = [ + # AWS Credentials. + :access_key_id, + :secret_access_key, + + :tmp_dir, # The temporary directory to work in. + :git_repo, # URL of the git repo containing moolloy. + :ssh_key, # The ssh key to use when cloning the repo. + :worker_timeout, # Maximum amount of time a worker will spec on a job. + :heartbeat_period, # Time between heartbeats. + :sleep_interval, # Time to sleep between worker checks. + :idle_timeout, # Time to spend polling for jobs before checking other state + :server_base_url, # Base url for jobs to talk to the server. + :username, # Username for jobs to talk to the server. + :password, # Password for jobs to talk to the server. + + :heartbeater_type, # The type of heartbeater to use. + :uploader_type, # The type of uploader to use. + :downloader_type, # The type of downloader to use. + :queue_type, # The type of queue to use. + + :heartbeater_options, # Options to pass to the heartbeater. + :uploader_options, # Options to pass to the uploader. + :downloader_options, # Options to pass to the downloader. + :queue_options # Options to pass to the queue. + ] + + MANDATORY_SETTINGS = [ + :access_key_id, + :secret_access_key, + :git_repo, + :server_base_url, + :heartbeater_type, + :uploader_type, + :downloader_type, + :queue_type, + :heartbeater_options, + :uploader_options, + :downloader_options, + :queue_options + ] + + NON_MANDATORY_SETTINGS = SETTINGS - MANDATORY_SETTINGS + + SETTING_DEFAULTS = { + :worker_timeout => 48 * SECONDS_PER_HOUR, + :heartbeat_period => 5 * SECONDS_PER_MINUTE, + :sleep_interval => 15 * SECONDS_PER_SECOND, + :tmp_dir => nil, + :idle_timeout => 2 * SECONDS_PER_MINUTE, + :ssh_key => nil, + :username => nil, + :password => nil + } + + # All Non-Mandatory settings must have an entry in the + # SETTING_DEFAULTS hash (even if they are nil by default) + # to ensure we have considered what happens by default + # for the Non-Mandatory settings. + NON_MANDATORY_SETTINGS.each do |name| + unless SETTING_DEFAULTS.has_key?(name) + raise "No default for non-mandatory setting #{name}" + end + end + + ATTRIBUTES = SETTINGS + [ + :uploader, + :downloader, + :queue, + :heartbeater + ] + + + ATTRIBUTES.each do |name| + define_method(name) do + @configuration_mutex.synchronize { + return instance_variable_get("@#{name}") + } + end + end + + def initialize(config_file_path) + ATTRIBUTES.each do |name| + default_value = SETTING_DEFAULTS[name] + instance_variable_set("@#{name}", default_value) + end + + + @config_file_path = File.absolute_path(config_file_path) + @configuration_mutex = Mutex.new + self.reload + end + + def reload + @configuration_mutex.synchronize { + load_configuration + update_global_objects + update_configuration_objects + } + end + + private + + def load_configuration + conf = YAML.safe_load(File.read(@config_file_path)) + validate_configuration_hash(conf) + + conf.each do |key, value| + self.instance_variable_set("@#{key}", value) + end + + if @worker_timeout.is_a?(Hash) + @worker_timeout = SECONDS_PER_HOUR * (@worker_timeout[:hours] || 0) + + SECONDS_PER_MINUTE * (@worker_timeout[:minutes] || 0) + + SECONDS_PER_SECOND * (@worker_timeout[:seconds] || 0) + end + end + + def update_global_objects + ::AWS.config(:access_key_id => @access_key_id, + :secret_access_key => @secret_access_key) + end + + def update_configuration_objects + @uploader = Amalgam::Worker::Uploader.create(@uploader_type, + @uploader_options, + @uploader) + @downloader = Amalgam::Worker::Downloader.create(@downloader_type, + @downloader_options, + @downloader) + @queue = Amalgam::Worker::Queue.create(@queue_type, @queue_options, @queue) + @heartbeater = Amalgam::Worker::Heartbeater.create(@heartbeater_type, + @heartbeater_options, + @heartbeater) + end + + def validate_configuration_hash(hash) + # All hash keys provided must be settings + hash.each_key do |key| + unless SETTINGS.include?(key) + raise "Unknown setting #{key} specified in the configuration file." + end + end + + # All mandatory settings must be provided + MANDATORY_SETTINGS.each do |setting| + unless hash.has_key?(setting) + raise "#{setting} was not specified in the configuration file." + end + end + end +end diff --git a/lib/amalgam/worker/downloader.rb b/lib/amalgam/worker/downloader.rb new file mode 100644 index 0000000..857750a --- /dev/null +++ b/lib/amalgam/worker/downloader.rb @@ -0,0 +1,38 @@ +class Amalgam::Worker::Downloader + def initialize + raise "Attempt to initialize abstract Downloader." + end + + def download(key, destination_path) + raise "Call to abstract Downloader download." + end + + class << self + def register_downloader(identifier, klass) + @downloaders ||= {} + + unless @downloaders[identifier].nil? + raise "Downloader with identifier '#{identifier} already registered." + end + + @downloaders[identifier] = klass + end + + def unregister_downloader(identifier) + @downloaders[identifier] = nil + end + + def create(identifier, options, previous_downloader = nil) + downloaders = @downloaders || {} + + if downloaders[identifier].nil? + raise "No downloader type registered for identifier '#{identifier}." + end + + return downloaders[identifier].new(options, previous_downloader) + end + end +end + +require_relative "downloader/test_downloader" +require_relative "downloader/s3_downloader" diff --git a/lib/amalgam/worker/downloader/s3_downloader.rb b/lib/amalgam/worker/downloader/s3_downloader.rb new file mode 100644 index 0000000..b93633c --- /dev/null +++ b/lib/amalgam/worker/downloader/s3_downloader.rb @@ -0,0 +1,20 @@ +class Amalgam::Worker::Downloader::S3Downloader + def initialize(options, old_downloader = nil) + s3_client = AWS::S3.new + @s3_bucket = s3_client.buckets[options[:s3_bucket]] + end + + def download(key, destination_path) + obj = @s3_bucket.objects[key] + File.open(destination_path, "w") do |f| + obj.read do |chunk| + f.write(chunk) + end + end + end +end + +Amalgam::Worker::Downloader.register_downloader( + :s3, + Amalgam::Worker::Downloader::S3Downloader +) \ No newline at end of file diff --git a/lib/amalgam/worker/downloader/test_downloader.rb b/lib/amalgam/worker/downloader/test_downloader.rb new file mode 100644 index 0000000..be81340 --- /dev/null +++ b/lib/amalgam/worker/downloader/test_downloader.rb @@ -0,0 +1,16 @@ +require 'fileutils' +class Amalgam::Worker::Downloader::TestDownloader + def initialize(options, old_downloader) + @source_directory = options[:source_directory] + end + + def download(key, destination_path) + source_path = File.join(@source_directory, key) + FileUtils.copy_file(source_path, destination_path) + end +end + +Amalgam::Worker::Downloader.register_downloader( + :test, + Amalgam::Worker::Downloader::TestDownloader +) \ No newline at end of file diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb new file mode 100644 index 0000000..baeb862 --- /dev/null +++ b/lib/amalgam/worker/heartbeater.rb @@ -0,0 +1,58 @@ +class Amalgam::Worker::Heartbeater + + attr_reader :worker_id + + def initialize + raise "Attempt to initialize abstract Heartbeater." + end + + + def register + raise "Attempt to invoke abstract method register." + end + + def heartbeat(current_job_id = nil) + raise "Attempt to invoke abstract method heartbeat." + end + + def signal_start(job_id) + raise "Attempt to invoke abstract method signal_start." + end + + def signal_completion(job_id, result_body) + raise "Attempt to invoke abstract method signal_completion." + end + + def unregister + raise "Attempt to invoke abstract method unregister." + end + + class << self + def register_heartbeater(identifier, klass) + @heartbeaters ||= {} + + unless @heartbeaters[identifier].nil? + raise "Heartbeater with identifier '#{identifier}' already registered." + end + + @heartbeaters[identifier] = klass + end + + def unregister_queue(identifier) + @heartbeaters[identifier] = nil + end + + def create(identifier, options, previous_queue = nil) + heartbeaters = @heartbeaters || {} + + if heartbeaters[identifier].nil? + raise "No heartbeater type registered for identifier '#{identifier}'." + end + + return heartbeaters[identifier].new(options, previous_queue) + end + end +end + +require_relative "heartbeater/http_heartbeater" +require_relative "heartbeater/test_heartbeater" diff --git a/lib/amalgam/worker/heartbeater/http_heartbeater.rb b/lib/amalgam/worker/heartbeater/http_heartbeater.rb new file mode 100644 index 0000000..eb13f05 --- /dev/null +++ b/lib/amalgam/worker/heartbeater/http_heartbeater.rb @@ -0,0 +1,120 @@ +require 'socket' +require 'httparty' + +class Amalgam::Worker::Heartbeater::HttpHeartbeater + + attr_reader :worker_id + + def initialize(options, old_heartbeater = nil) + @server_base_url = options[:server_base_url] + + @worker_id = nil + unless old_heartbeater.nil? + @worker_id = old_heartbeater.worker_id + end + + @auth_params = nil + unless options[:username].nil? && options[:password].nil? + @auth_params = { + :username => options[:username], + :password => options[:password] + } + end + end + + + def register + raise "Already registered." unless @worker_id.nil? + + Amalgam::Worker.logger.info("Registering with server.") + + response = HTTParty.post(register_url, { + :body => { :hostname => hostname }.to_json, + :basic_auth => @auth_params + }) + + parsed_response = JSON.parse(response.parsed_response) + + # Validate the response. + + if parsed_response["worker_id"].nil? + raise "Response did not contain worker_id." + end + + @worker_id = parsed_response["worker_id"].to_i + end + + def heartbeat(current_job_id = nil) + raise "Not registered." if @worker_id.nil? + + begin + HTTParty.post(heartbeat_url, { + :body => { :job_id => current_job_id }.to_json, + :basic_auth => @auth_params + }) + rescue => err + Amalgam::Worker.logger.error("Manager Failed to Heartbeat") + Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) + end + end + + def signal_start(job_id) + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Signalling start to server.") + HTTParty.post(start_url(job_id), { + :basic_auth => @auth_params + }) + end + + def signal_completion(job_id, result_body) + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Signalling completion to server.") + HTTParty.post(completion_url(job_id), { + :body => result_body.to_json, + :basic_auth => @auth_params + }) + end + + def unregister + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Unregistering with server.") + + HTTParty.post(unregister_url, { + :basic_auth => @auth_params + }) + end + + private + + def hostname + Socket.gethostname + end + + def register_url + "#{@server_base_url}/workers/register" + end + + def unregister_url + "#{@server_base_url}/workers/#{@worker_id}/unregister" + end + + def heartbeat_url + "#{@server_base_url}/workers/#{@worker_id}/heartbeat" + end + + def start_url(job_id) + "#{@server_base_url}/jobs/#{job_id}/start" + end + + def completion_url(job_id) + "#{@server_base_url}/jobs/#{job_id}/finish" + end + +end + + +Amalgam::Worker::Heartbeater.register_heartbeater( + :http, + Amalgam::Worker::Heartbeater::HttpHeartbeater +) diff --git a/lib/amalgam/worker/heartbeater/test_heartbeater.rb b/lib/amalgam/worker/heartbeater/test_heartbeater.rb new file mode 100644 index 0000000..e1d1ac6 --- /dev/null +++ b/lib/amalgam/worker/heartbeater/test_heartbeater.rb @@ -0,0 +1,35 @@ +class Amalgam::Worker::Heartbeater::TestHeartbeater + + attr_reader :worker_id + + def initialize(options, old_heartbeater = nil) + @worker_id = nil + + unless old_heartbeater.nil? + @worker_id = old_heartbeater.worker_id + end + end + + def register + @worker_id = 1234 + end + + def heartbeat(current_job_id = nil) + end + + def signal_start(job_id) + end + + def signal_completion(job_id, result_body) + end + + def unregister + @worker_id = nil + end + +end + +Amalgam::Worker::Heartbeater.register_heartbeater( + :test, + Amalgam::Worker::Heartbeater::TestHeartbeater +) \ No newline at end of file diff --git a/lib/amalgam/worker/job.rb b/lib/amalgam/worker/job.rb new file mode 100644 index 0000000..8e152ad --- /dev/null +++ b/lib/amalgam/worker/job.rb @@ -0,0 +1,50 @@ +class Amalgam::Worker::Job + + def initialize(description) + raise "Attempt to create abstract job." + end + + def run + raise "Attempt to run abstract job." + end + + def terminate + raise "Attempt to terminate abstract job." + end + + class << self + def register_job(identifier, klass) + @jobs ||= {} + + unless @jobs[identifier].nil? + raise "Job with identifier '#{identifier}' already registered." + end + + @jobs[identifier] = klass + end + + def unregister_job(identifier) + @jobs[identifier] = nil + end + + def create(job_description, configuration) + jobs = @jobs || {} + + job_identifier = job_description[:job_type] + + if job_identifier.nil? + raise "Job description does not specify a job type." + end + + if @jobs[job_identifier].nil? + raise "No job type registered for identifier '#{job_identifier}'" + end + + return @jobs[job_identifier].new(job_description, configuration) + end + end + +end + +require_relative 'job/build_job' +require_relative 'job/run_job' diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb new file mode 100644 index 0000000..592039c --- /dev/null +++ b/lib/amalgam/worker/job/build_job.rb @@ -0,0 +1,134 @@ +class Amalgam::Worker::Job::BuildJob < Amalgam::Worker::Job + def initialize(job_description, configuration) + Amalgam::Worker.logger.info("Creating BuildJob") + validate_job(job_description) + + @job_description = job_description + @configuration = configuration + end + + def run + run_main + end + + def terminate + # Need to implement + end + + private + + REQUIRED_PARAMETERS = [ + :commit + ] + + def validate_job(job_description) + REQUIRED_PARAMETERS.each do |param| + if job_description[param].nil? + raise "Job missing required parameter: #{param}" + end + end + end + + def run_with_ssh_key(cmd) + `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; #{cmd}'` + end + + def run_main + Amalgam::Worker.logger.info("Running BuildJob") + working_directory = Dir.getwd + + # Clone the repo. + repo_url = @configuration.git_repo + Amalgam::Worker.logger.info("Cloning git repo.") + clone_results = run_with_ssh_key("git clone #{repo_url} moolloy 2>&1") + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to clone git repo.", + :error_details => clone_results + } + end + + begin + Dir.chdir(File.join(working_directory, "moolloy")) + + # Checkout the commit we want. + commit = @job_description[:commit] + Amalgam::Worker.logger.info("Checking out commit: #{commit}") + checkout_results = `git checkout #{commit} 2>&1` + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to checkout commit.", + :error_details => checkout_results + } + end + + # Git submodule init / update + + Amalgam::Worker.logger.info("Updating submodules.") + submodule_results = `git submodule init 2>&1` + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to submodule init.", + :error_details => submodule_results + } + end + + submodule_results = run_with_ssh_key('git submodule update 2>&1') + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to submodule update.", + :error_details => submodule_results + } + end + + # ant dist + + Amalgam::Worker.logger.info("Building...") + build_results = `ant deps 2>&1 && ant configure 2>&1 && ant dist 2>&1` + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to build.", + :error_details => build_results + } + end + Amalgam::Worker.logger.info("Build Complete.") + + # Sanity test + + alloy_path = File.join(working_directory, + "moolloy", + "dist", + "alloy-dev.jar") + unless File.exists?(alloy_path) + return { + :return_code => 255, + :error_message => "Build failed to produce moolloy.", + :error_details => "Unable to find moolloy at: #{alloy_path}" + } + end + + # Upload jar + + Amalgam::Worker.logger.info("Uploading jar.") + uploader = @configuration.uploader + uploader.upload(alloy_path, "builds/#{commit}.jar") + + # Generate result + + return { + :return_code => 0, + :result_s3_key => "builds/#{commit}.jar" + } + + ensure + Dir.chdir(working_directory) + end + end +end + +Amalgam::Worker::Job.register_job(:build, Amalgam::Worker::Job::BuildJob) diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb new file mode 100644 index 0000000..e43082e --- /dev/null +++ b/lib/amalgam/worker/job/run_job.rb @@ -0,0 +1,233 @@ +require 'benchmark' +require 'fileutils' +require 'digest' +require 'rest_client' + +class Amalgam::Worker::Job::RunJob < Amalgam::Worker::Job + def initialize(job_description, configuration) + Amalgam::Worker.logger.info("Creating RunJob") + validate_job(job_description) + + @job_description = job_description + @configuration = configuration + end + + def run + run_main + end + + def terminate + # Need to implement + end + + private + + REQUIRED_PARAMETERS = [ + :jar_file_key, + :model_file_key + ] + + def validate_job(job_description) + REQUIRED_PARAMETERS.each do |param| + if job_description[param].nil? + raise "Job missing required parametter: #{param}" + end + end + end + + def sha2_hash(filename) + digest = Digest::SHA2.new + + File.open(filename) do |file| + while not file.eof + digest << file.read(digest.block_length) + end + end + + digest.digest + end + + def run_main + Amalgam::Worker.logger.info("Running RunJob") + working_directory = Dir.getwd + model_directory = File.join(working_directory, "model") + run_directory = File.join(working_directory, "run") + package_directory = File.join(working_directory, "package") + + Dir.mkdir(model_directory) + Dir.mkdir(run_directory) + Dir.mkdir(package_directory) + + begin + Dir.chdir(model_directory) + + # Download the model into the model directory. + + model_tar_file_path = File.join(model_directory, + "model.tar.bz2") + downloader = @configuration.downloader + downloader.download(@job_description[:model_file_key], model_tar_file_path) + + # Unpack the model. + + tar_result = `tar -xf #{model_tar_file_path}` + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to unpack the model.", + :error_detail => tar_result + } + end + + model_als_path = File.join(model_directory, "model.als") + + Dir.chdir(run_directory) + + # Download the moolloy jar. + + jar_file_path = File.join(run_directory, + "moolloy.jar") + + downloader = @configuration.downloader + downloader.download(@job_description[:jar_file_key], jar_file_path) + + # Run moolloy. + stdout_path = File.join(run_directory, "stdout.out") + stderr_path = File.join(run_directory, "stderr.out") + + algorithm_string = nil + if @job_description[:algorithm] + algorithm_string = "--MooAlgorithm=#{@job_description[:algorithm]}" + end + + memory_string = "-Xss4m -Xms512m -Xmx8192m" + + benchmark_result = Benchmark.measure do + `java #{memory_string} -jar "#{jar_file_path}" #{algorithm_string} "#{model_als_path}" > #{stdout_path} 2> #{stderr_path}` + end + + return_code = $?.to_i + + Dir.chdir(model_directory) + + # If moolloy finished successfully and the solutions for this model have not been populated + # we will assume that this run was correct and populate the solutions. + if (return_code == 0) && File.exists?(File.join(model_directory, "solutions_not_populated.txt")) + + Amalgam::Worker.logger.info("The solutions were not populated for this model.") + Amalgam::Worker.logger.info("Populating the solutions from this run.") + + # Delete the solutions_not_populated.txt file. + FileUtils.rm_f(File.join(model_directory, "solutions_not_populated.txt")) + + # Find the solutions this run generated and copy them into the model_directory. + test_solution_files = Dir[File.join(run_directory, "alloy_solutions_*.xml")] + FileUtils.cp(test_solution_files, model_directory) + + # Package the model and the solutions into a tarball. + tar_result = `tar -cjf "#{File.join(model_directory, "populated_model.tar.bz2")}" model.als alloy_solutions_*.xml` + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to package populated model.", + :error_details => tar_result + } + end + + # Upload the model to the dashboard. + model_id = @job_description[:model_id] + + RestClient.log = Amalgam::Worker.logger + + upload_request = RestClient::Request.new( + :method => :post, + :url => "#{@configuration.server_base_url}/models/#{model_id}/upload", + :user => @configuration.username, + :password => @configuration.password, + :payload => { + :file => File.new(File.join(model_directory, "populated_model.tar.bz2")) + } + ) + + upload_request.execute + end + + Dir.chdir(run_directory) + + # Determine correctness. + correct = (return_code == 0) + test_solution_files = Dir[File.join(run_directory, + "alloy_solutions_*.xml")] + model_solution_files = Dir[File.join(model_directory, + "alloy_solutions_*.xml")] + + if test_solution_files.count != model_solution_files.count + correct = false + end + + hash_to_model_solution = {} + model_solution_files.each do |model_file| + `tail -n +2 #{model_file} > #{model_file}.trimmed` + hash = sha2_hash(model_file + ".trimmed") + hash_to_model_solution[hash] ||= [] + hash_to_model_solution[hash] << model_file + end + + test_solution_files.each do |test_file| + `tail -n +2 #{test_file} > #{test_file}.trimmed` + hash = sha2_hash(test_file + ".trimmed") + + if !hash_to_model_solution[hash].nil? && + !hash_to_model_solution[hash].empty? + + matching_file = hash_to_model_solution[hash].shift + Amalgam::Worker.logger.info("#{test_file} matches #{matching_file}.") + else + Amalgam::Worker.logger.info("#{test_file} has no match.") + correct = false + end + end + + if correct + Amalgam::Worker.logger.info("The solutions are correct.") + else + Amalgam::Worker.logger.info("The solutions are incorrect.") + end + + # Package results + FileUtils.mv(model_directory, package_directory) + FileUtils.mv(run_directory, package_directory) + + tarball_path = File.join(working_directory, "tarball.tar.bz2") + + Dir.chdir(package_directory) + + `hostname > ./hostname` + tar_result = `tar -cjf "#{tarball_path}" #{File.join(".", "*")}` + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to package results.", + :error_detail => tar_result + } + end + + # Upload the package to s3 + key = "results/#{@job_description[:job_id]}.tar.bz2" + uploader = @configuration.uploader + uploader.upload(tarball_path, key) + + return { + :return_code => return_code, + :correct => (correct ? 1 : 0), + :real_time_seconds => benchmark_result.real, + :cpu_time_seconds => benchmark_result.total, + :result_s3_key => key + } + ensure + Dir.chdir(working_directory) + end + end +end + +Amalgam::Worker::Job.register_job(:run, Amalgam::Worker::Job::RunJob) diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb new file mode 100644 index 0000000..4f23341 --- /dev/null +++ b/lib/amalgam/worker/manager.rb @@ -0,0 +1,158 @@ +class Amalgam::Worker::Manager + + def initialize(configuration) + @configuration = configuration + @thread = nil + @time_of_last_heartbeat = nil + @configuration_update_requested = false + @termination_requested = false + end + + def run + @thread = Thread.new { + thread_main + } + end + + def join + unless @thread.nil? + @thread.join + end + @thread = nil + end + + def request_termination + @termination_requested = true + end + + def terminate_current_job + @job_termination_requested = true + end + + private + + def thread_main + Amalgam::Worker.logger.info("Manager trying to register.") + begin + register + rescue => err + Amalgam::Worker.logger.error("Manager failed to register.") + Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) + return + end + + Amalgam::Worker.logger.info("Manager registered successfully.") + + begin + while (!@termination_requested) + Amalgam::Worker.logger.info("Polling for job.") + job = poll_for_job + + unless job.nil? + Amalgam::Worker.logger.info("Got job.") + run_job(job) + end + end + rescue => err + Amalgam::Worker.logger.error("Manager encountered exception.") + Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) + ensure + Amalgam::Worker.logger.info("Manager unregistering.") + unregister + end + end + + def register + heartbeater = @configuration.heartbeater + heartbeater.register + end + + def unregister + heartbeater = @configuration.heartbeater + heartbeater.unregister + end + + def poll_for_job + result = nil + + while (result.nil? && !@termination_requested) + queue = @configuration.queue + result = queue.poll + + maybe_do_heartbeat + maybe_update_configuration + end + + return result + end + + def maybe_do_heartbeat(job_id = nil) + if (@time_of_last_heartbeat.nil? || + seconds_since_last_heartbeat >= @configuration.heartbeat_period) + + heartbeater = @configuration.heartbeater + heartbeater.heartbeat(job_id) + + @time_of_last_heartbeat = Time.now + end + end + + def maybe_update_configuration + if @configuration_update_requested + @configuration.reload + end + end + + def seconds_since_last_heartbeat + return (Time.now - @time_of_last_heartbeat) + end + + def run_job(job_description) + job_id = job_description[:job_id] + Amalgam::Worker.logger.info("Signalling start of job: #{job_id}") + @configuration.heartbeater.signal_start(job_id) + @configuration.heartbeater.heartbeat(job_id) + + # Ensure that a previous job termination will not carry-over + @job_termination_requested = false + + @runner = Amalgam::Worker::Runner.new(@configuration, job_description) + @runner.run + + @job_start = Time.now + + while (@runner.running?) + maybe_do_heartbeat(job_id) + maybe_update_configuration + maybe_terminate_job + + sleep(@configuration.sleep_interval) + end + + @runner.join + result = { + :secret_key => job_description[:secret_key] + }.merge(@runner.result) + + Amalgam::Worker.logger.info("Job Completed, Result: #{result}") + Amalgam::Worker.logger.info("Signalling compleion of job: #{job_id}") + @configuration.heartbeater.signal_completion(job_id, result) + @configuration.heartbeater.heartbeat(nil) + end + + def seconds_since_job_start + return (Time.now - @job_start) + end + + def maybe_terminate_job + if (seconds_since_job_start >= + @configuration.worker_timeout || + @job_termination_requested) + Amalgam::Worker.logger.info("Job has timed-out.") + @runner.terminate + @job_termination_requested = false + end + end +end diff --git a/lib/amalgam/worker/queue.rb b/lib/amalgam/worker/queue.rb new file mode 100644 index 0000000..402a3dd --- /dev/null +++ b/lib/amalgam/worker/queue.rb @@ -0,0 +1,34 @@ +class Amalgam::Worker::Queue + def poll + raise "Attempt to poll an abstract queue." + end + + class << self + def register_queue(identifier, klass) + @queues ||= {} + + unless @queues[identifier].nil? + raise "Queue with identiier '#{identifier}' already registered." + end + + @queues[identifier] = klass + end + + def unregister_queue(identifier) + @queues[identifier] = nil + end + + def create(identifier, options, previous_queue = nil) + queues = @queues || {} + + if queues[identifier].nil? + raise "No queue type registered for identifier '#{identifier}'." + end + + return queues[identifier].new(options, previous_queue) + end + end +end + +require_relative 'queue/sqs_queue' +require_relative 'queue/test_queue' diff --git a/lib/amalgam/worker/queue/sqs_queue.rb b/lib/amalgam/worker/queue/sqs_queue.rb new file mode 100644 index 0000000..fae2401 --- /dev/null +++ b/lib/amalgam/worker/queue/sqs_queue.rb @@ -0,0 +1,27 @@ +class Amalgam::Worker::Queue::SqsQueue + def initialize(options, old_queue) + sqs_client = AWS::SQS.new + @sqs_queue = sqs_client.queues.named(options[:sqs_queue]) + end + + def poll + message = @sqs_queue.receive_message + + return nil if message.nil? + + message_body = YAML.safe_load(message.body) + + return nil if message_body[:version] != 2 + + message_body[:secret_key] = message.id + + message.delete + + return message_body + end +end + +Amalgam::Worker::Queue.register_queue( + :sqs, + Amalgam::Worker::Queue::SqsQueue +) diff --git a/lib/amalgam/worker/queue/test_queue.rb b/lib/amalgam/worker/queue/test_queue.rb new file mode 100644 index 0000000..075fcce --- /dev/null +++ b/lib/amalgam/worker/queue/test_queue.rb @@ -0,0 +1,34 @@ +class Amalgam::Worker::Queue::TestQueue + + def initialize(options, old_queue) + Amalgam::Worker.logger.info("Created test queue") + + options ||= {} + @items = [] + + unless options[:start_items].nil? + Amalgam::Worker.logger.info("Start items are:") + options[:start_items].each do |item| + Amalgam::Worker.logger.info("\t" + item.inspect) + end + + options[:start_items].each do |item| + enqueue(item) + end + end + end + + def enqueue(job_description) + @items << job_description + end + + def poll + @items.shift + end + +end + +Amalgam::Worker::Queue.register_queue( + :test, + Amalgam::Worker::Queue::TestQueue +) \ No newline at end of file diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb new file mode 100644 index 0000000..210ea20 --- /dev/null +++ b/lib/amalgam/worker/runner.rb @@ -0,0 +1,78 @@ +require 'fileutils' +require 'tmpdir' + +class Amalgam::Worker::Runner + + attr_reader :result + + def initialize(configuration, job_description) + @configuration = configuration + @job_description = job_description + @thread = nil + @result = nil + @job = nil + @running = false + end + + def run + @running = true + @thread = Thread.new { + thread_main + } + end + + def join + @thread.join + @thread = nil + end + + def terminate + unless @job.nil? + @job.terminate + end + end + + def running? + return @running + end + + private + + def thread_main + begin + original_working_dir = Dir.getwd + + temp_dir = Dir.mktmpdir(nil, @configuration.tmp_dir) + Dir.chdir(temp_dir) + + error_caught = false + + begin + @job = Amalgam::Worker::Job.create(@job_description, @configuration) + @result = @job.run + rescue => err + Amalgam::Worker.logger.error("Worker caught error.") + Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) + + error_caught = true + @result = { :return_code => 255 } + end + + if !@result[:return_code].nil? && @result[:return_code] != 0 + error_caught = true + end + + Dir.chdir(original_working_dir) + if error_caught + Amalgam::Worker.logger.error("Leaving the job directory behind.") + Amalgam::Worker.logger.error(temp_dir) + else + FileUtils.rm_rf(temp_dir) + end + ensure + @running = false + end + end + +end diff --git a/lib/amalgam/worker/uploader.rb b/lib/amalgam/worker/uploader.rb new file mode 100644 index 0000000..fd39a8e --- /dev/null +++ b/lib/amalgam/worker/uploader.rb @@ -0,0 +1,38 @@ +class Amalgam::Worker::Uploader + def initialize + raise "Attempt to initialize abstract Uploader." + end + + def upload(src_path, key) + raise "Attempt to invoke abstract method upload." + end + + class << self + def register_uploader(identifier, klass) + @uploaders ||= {} + + unless @uploaders[identifier].nil? + raise "Uploader with identifier '#{identifier}' already registered." + end + + @uploaders[identifier] = klass + end + + def unregister_uploader(identifier) + @uploaders[identifier] = nil + end + + def create(identifier, options, previous_uploader = nil) + uploaders = @uploaders || {} + + if uploaders[identifier].nil? + raise "No uploader type registered for identifier '#{identifier}'." + end + + return uploaders[identifier].new(options, previous_uploader) + end + end +end + +require_relative "uploader/test_uploader" +require_relative "uploader/s3_uploader" diff --git a/lib/amalgam/worker/uploader/s3_uploader.rb b/lib/amalgam/worker/uploader/s3_uploader.rb new file mode 100644 index 0000000..1d4ce57 --- /dev/null +++ b/lib/amalgam/worker/uploader/s3_uploader.rb @@ -0,0 +1,15 @@ +class Amalgam::Worker::Uploader::S3Uploader + def initialize(options, old_uploader) + s3_client = AWS::S3.new + @s3_bucket = s3_client.buckets[options[:s3_bucket]] + end + + def upload(src_path, key) + @s3_bucket.objects[key].write(:file => src_path) + end +end + +Amalgam::Worker::Uploader.register_uploader( + :s3, + Amalgam::Worker::Uploader::S3Uploader +) \ No newline at end of file diff --git a/lib/amalgam/worker/uploader/test_uploader.rb b/lib/amalgam/worker/uploader/test_uploader.rb new file mode 100644 index 0000000..6406dd6 --- /dev/null +++ b/lib/amalgam/worker/uploader/test_uploader.rb @@ -0,0 +1,17 @@ +require 'fileutils' +class Amalgam::Worker::Uploader::TestUploader + def initialize(options, old_uploader = nil) + @destination_directory = options[:destination_directory] + end + + def upload(src_path, key) + dest_path = File.join(@destination_directory, key) + FileUtils.mkdir_p(File.dirname(dest_path)) + FileUtils.copy_file(src_path, dest_path) + end +end + +Amalgam::Worker::Uploader.register_uploader( + :test, + Amalgam::Worker::Uploader::TestUploader +) \ No newline at end of file diff --git a/main.rb b/main.rb deleted file mode 100755 index 48011fb..0000000 --- a/main.rb +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env ruby - -require 'aws-sdk' -require 'safe_yaml' -require 'digest' -require 'tmpdir' -require 'benchmark' -require 'json' -require 'httparty' - -# Configuration depends on deserializing symbols. -# Worst case is that we run out of memory. -SafeYAML::OPTIONS[:deserialize_symbols] = true - -require_relative 'common.rb' -require_relative 'configuration.rb' -require_relative 'runner.rb' -require_relative 'manager.rb' - -config_file_path = ARGV[0] -raise "Configuration file path must be specified." if config_file_path.nil? -configuration = Configuration.new(config_file_path) - -puts "Timeout is: #{configuration.worker_timeout}" - -manager = Manager.new(configuration) -manager.run - -int_count = 0 -Signal.trap("USR1") do - int_count += 1 - if int_count == 1 - puts "Requesting Terminate" - manager.request_termination - elsif int_count == 2 - puts "Forcing Terminate" - manager.terminate - else - Kernel.exit! - end -end - -Signal.trap("USR2") do - manager.terminate_job -end - -Signal.trap("HUP") do - puts "Requesting configuration update" - manager.update_configuration -end - -manager.join diff --git a/manager.rb b/manager.rb deleted file mode 100644 index a933b73..0000000 --- a/manager.rb +++ /dev/null @@ -1,142 +0,0 @@ -require 'socket' - -class Manager - - def initialize(configuration) - @configuration = configuration - - @worker = Runner.new(configuration) - end - - def run - puts "Running with PID: #{Process.pid}" - @thread = Thread.new { - register - - @worker.run(@worker_id) - catch(:terminate) do - while true - 16.times do - sleep(15) - if @termination_requested - @worker.request_termination - @termination_requested = false - end - - if @termination_required - @worker.terminate - @termination_required = false - end - - if @configuration_update_requested - @configuration.update - @configuration_update_requested = false - puts "Configuration updated" - end - - if !@worker.run_start_time.nil? && - ((Time.now - @worker.run_start_time) >= @configuration.worker_timeout) - terminate_job - end - - if @worker.terminated? - throw :terminate - end - end - - if @termination_requested - @worker.request_termination - @termination_requested = false - end - - heartbeat(@worker.current_test_result_id) - end - end - @worker.join - unregister - } - end - - def join - @thread.join - end - - def request_termination - @termination_requested = true - end - - def terminate - @termination_required = true - end - - def update_configuration - @configuration_update_requested = true - end - - def terminate_job - puts "Terminating the current job" - @worker.terminate_job - end - -private - - def register - settings = @configuration.read_multiple([:server_base_url, :username, :password]) - puts "Registering with #{register_url(settings[:server_base_url])}" - response = HTTParty.post(register_url(settings[:server_base_url]), { - :body => { :hostname => hostname }.to_json, - :basic_auth => http_auth_params(settings) - }) - - parsed_response = JSON.parse(response.parsed_response) - @worker_id = parsed_response["worker_id"].to_i - end - - def unregister - settings = @configuration.read_multiple([:server_base_url, :username, :password]) - HTTParty.post(unregister_url(settings[:server_base_url]), { - :basic_auth => http_auth_params(settings) - }) - end - - def heartbeat(current_test_result_id) - settings = @configuration.read_multiple([:server_base_url, :username, :password]) - begin - HTTParty.post(heartbeat_url(settings[:server_base_url]), { - :body => { :test_id => current_test_result_id }.to_json, - :basic_auth => http_auth_params(settings) - }) - rescue Exception => e - puts "Manager failed to heartbeat:" - puts e.inspect - end - end - - def hostname - Socket.gethostname - end - - def http_auth_params(params_hash) - if params_hash[:username] || params_hash[:password] - return { - :username => params_hash[:username], - :password => params_hash[:password] - } - else - return nil - end - end - - def register_url(base_url) - "#{base_url}/workers/register" - end - - def unregister_url(base_url) - "#{base_url}/workers/#{@worker_id}/unregister" - end - - def heartbeat_url(base_url) - "#{base_url}/workers/#{@worker_id}/heartbeat" - end - -end diff --git a/runner.rb b/runner.rb deleted file mode 100644 index 57842a4..0000000 --- a/runner.rb +++ /dev/null @@ -1,360 +0,0 @@ -require 'fileutils' -class Runner - - SIGTERM = -15 - - # Process.kill uses negative signals to specify that a process group - # should be acted on instead of a single process. - PGROUP_SIGTERM = -1 * SIGTERM - - def initialize(configuration) - @configuration = configuration - @termination_requested = false - @current_test_result_id = nil - @run_start_time = nil - @worker_process_group = nil - end - - def run(worker_id) - @worker_id = worker_id - @thread = Thread.new do - catch(:termination) do - while !@termination_requested - get_sqs_queue.poll(:idle_timeout => 2 * 60) do |message| - begin - process_message(message) - rescue Exception => e - puts e.inspect - raise e - end - throw :termination if @termination_requested - end - end - end - end - end - - def terminated? - !@thread.status - end - - def request_termination - @termination_requested = true - end - - def terminate - @termination_requested = true - terminate_job - @thread.exit - end - - def join - @thread.join - end - - def current_test_result_id - return @current_test_result_id - end - - def run_start_time - return @run_start_time - end - - def terminate_job - unless @worker_process_group.nil? - - Process.kill(PGROUP_SIGTERM, @worker_process_group) - end - end - -private - - def process_message(message) - puts "Received Job:" - job_description = YAML.safe_load(message.body) - puts job_description.inspect - - @current_test_result_id = job_description[:test_id] - - started_at = Time.now - - post_start(job_description[:test_id]) - - results = nil - tarball_s3_key = nil - - Dir.mktmpdir(nil, @configuration.tmp_dir) do |temp_dir| - puts "Using temporary directory #{temp_dir}" - Dir.chdir(temp_dir) do - download_model(temp_dir, job_description[:model_s3_key]) - - # Delete the message since we are about to do a lot of work - # and we don't want another worker to pick it up. - message.delete - - compile_moolloy(temp_dir, job_description[:commit]) - results = run_moolloy(temp_dir) - tarball_s3_key = upload_results(temp_dir, message.id) - end - end - - post_results(job_description[:test_id], - message.id, - started_at, - results, - tarball_s3_key) - - puts "Finished processing job: #{message.id}." - @current_test_result_id = nil - end - - def download_model(temporary_directory, s3_key) - obj = get_s3_bucket.objects[s3_key] - tarball_filename = File.basename(s3_key) - model_directory = File.join(temporary_directory, "model") - - Dir.mkdir(model_directory) - Dir.chdir(model_directory) do - # Download the model - puts "Downloading the model" - - tarball_path = File.join(model_directory, tarball_filename) - File.open(tarball_path, "w") do |f| - obj.read do |chunk| - f.write(chunk) - end - end - - # Unpack the model - puts "Unpacking the model" - `tar -xf #{tarball_path}` - raise "Failed to extract" unless $?.to_i == 0 - end - end - - def compile_moolloy(temporary_directory, commit) - puts "Cloning repo." - - # If we have a seed repo then we will copy it into place and pull - # instead of cloning. This saves us bandwidth since the alloy repo is quite - # large. By using a seed repo we only need to pull the latest commits. - seed_repo_path = @configuration.seed_repo_path - if seed_repo_path - # We use --reflink=auto to reduce disk usage, it performs a shallow copy - # with copy-on-write if the operating system supports it. Otherwise, it - # will perform a regular copy. - puts "cp -r #{seed_repo_path} ./moolloy" - `cp -r #{seed_repo_path} ./moolloy` - raise "Failed to copy seed repo." unless $?.to_i == 0 - else - # Clone the repo using the ssh key specified in the configuration. - # We accomplish this by spawning a new ssh agent for the command and - # loading the key into it. - puts "ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git clone #{@configuration.git_repo} moolloy'" - `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git clone #{@configuration.git_repo} moolloy'` - raise "Failed to clone git repo." unless $?.to_i == 0 - end - - Dir.chdir(File.join(temporary_directory, "moolloy")) do - if seed_repo_path - # If we copied a seed we need to pull it to get the latest commits. - # Once again we use the key specified in the configuration. - puts "ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git pull" - `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git pull'` - raise "Failed to pull git repo." unless $?.to_i == 0 - end - - # Checkout the specific commit referenced by the job. - puts "Checking out commit #{commit}" - `git checkout #{commit}` - raise "Commit checkout failed." unless $?.to_i == 0 - - # Assert that we have successfully checked out the commit. - actual_commit = `git rev-parse HEAD`.chomp - unless $?.to_i == 0 && commit == actual_commit - raise "Didn't checkout the correct commit." - end - - `git submodule init` - raise "Submodule init failed." unless $?.to_i == 0 - - commit_file = File.absolute_path(File.join(temporary_directory, "commit")) - `git rev-parse HEAD > #{commit_file}` - `git submodule foreach 'echo $path \`git rev-parse HEAD\` >> #{commit_file}'` - - # Update the submodules using the ssh key given by the configuration. - puts "ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git submodule update'" - `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git submodule update'` - raise "Submodule update failed." unless $?.to_i == 0 - - # Build Moolloy - puts "Building moolloy" - `ant deps` - raise "Failed to download dependencies." unless $?.to_i == 0 - `ant configure` - raise "Failed to configure build." unless $?.to_i == 0 - `ant dist` - raise "Failed to build." unless $?.to_i == 0 - end - - - puts "Acquiring jar file" - dist_path = File.join(temporary_directory, - "moolloy", - "dist", - "alloy-dev.jar") - FileUtils.mv(dist_path, File.join(temporary_directory, "moolloy.jar")) - end - - def run_moolloy(temporary_directory) - model_directory = File.join(temporary_directory, "model") - - puts "Running moolloy." - - @run_start_time = Time.now - benchmark_result = Benchmark.measure do - pid = Process.spawn("#{@configuration.command} -jar #{File.join(temporary_directory, "moolloy.jar")} \"#{model_directory}/model.als\" > stdout.out 2> stderr.out", :pgroup => true) - @worker_process_group = Process.getpgid(pid) - Process.wait(pid) - @worker_process_group = nil - end - @run_start_time = nil - - return_code = $?.to_i - - # Determine if the solutions match the model solutions - correct = (return_code == 0) - test_solution_files = Dir[File.join(temporary_directory, "alloy_solutions_*.xml")] - model_solution_files = Dir[File.join(model_directory, "alloy_solutions_*.xml")] - - if test_solution_files.count != model_solution_files.count - puts "Wrong number of solutions generated." - correct = false - end - - hash_to_model_solution = {} - model_solution_files.each do |model_file| - `tail -n +2 #{model_file} > #{model_file}.trimmed` - raise "Failed to trim model solution #{model_file}." unless $?.to_i == 0 - hash = sha2_hash(model_file + ".trimmed") - hash_to_model_solution[hash] ||= [] - hash_to_model_solution[hash] << model_file - end - - test_solution_files.each do |test_file| - `tail -n +2 #{test_file} > #{test_file}.trimmed` - raise "Failed to trim test solution #{model_file}." unless $?.to_i == 0 - hash = sha2_hash(test_file + ".trimmed") - if !hash_to_model_solution[hash].nil? && - !hash_to_model_solution[hash].empty? - matching_file = hash_to_model_solution[hash].shift - puts "#{test_file} matches #{matching_file}." - else - puts "#{test_file} has no match." - correct = false - end - end - - if correct - puts "The solutions are correct." - else - puts "The solutions are incorrect." - end - - return { - :correct => correct, - :return_code => return_code, - :benchmark_result => benchmark_result - } - end - - def upload_results(temporary_directory, message_id) - package_directory = File.join(temporary_directory, "package") - stdout_path = File.join(temporary_directory, "stdout.out") - stderr_path = File.join(temporary_directory, "stderr.out") - model_path = File.join(temporary_directory, "model") - alloy_solutions_path = File.join(temporary_directory, "alloy_solutions_*.xml") - commit_path = File.join(temporary_directory, "commit") - - FileUtils.mkdir(package_directory) - FileUtils.mv(stdout_path, package_directory) - FileUtils.mv(stderr_path, package_directory) - FileUtils.mv(model_path, package_directory) - FileUtils.mv(commit_path, package_directory) - - Dir[alloy_solutions_path].each do |fpath| - FileUtils.mv(fpath, package_directory) - end - - # Tarball the entire temporary directory - tarball_path = File.join(temporary_directory, "tarball.tar.bz2") - Dir.chdir(package_directory) do - `hostname > ./hostname` - `tar -cjf "#{tarball_path}" #{File.join(".", "*")}` - raise "Failed to create package archive." unless $?.to_i == 0 - end - - # Upload the tarball to s3 - key = "results/" + message_id + ".tar.bz2" - get_s3_bucket.objects[key].write(:file => tarball_path) - - return key - end - - def post_start(test_id) - post_url = "#{@configuration.server_base_url}/workers/#{@worker_id}/start" - puts "Making post request to #{post_url}" - - body = { - :test_id => test_id - } - - HTTParty.post(post_url, { - :body => body.to_json, - :basic_auth => get_http_auth - }) - end - - def post_results(test_id, message_id, started_at, results, s3_key) - post_url = "#{@configuration.server_base_url}/workers/#{@worker_id}/result" - puts "Making post request to #{post_url}" - - completion_body = { - :test_id => test_id, - :secret_key => message_id, - :return_code => results[:return_code], - :correct => results[:correct] ? 1 : 0, - :started_at => started_at, - :runtime_seconds => results[:benchmark_result].real, - :cpu_time_seconds => results[:benchmark_result].total, - :tarball_s3_key => s3_key - } - - HTTParty.post(post_url, { - :body => completion_body.to_json, - :basic_auth => get_http_auth - }) - end - - def get_http_auth - auth = @configuration.read_multiple([:username, :password]) - if auth[:username] || auth[:password] - return auth - else - return nil - end - return auth - end - - def get_s3_bucket - s3_client = AWS::S3.new - s3_bucket = s3_client.buckets[@configuration.s3_bucket] - return s3_bucket - end - - def get_sqs_queue - sqs_client = AWS::SQS.new - sqs_queue = sqs_client.queues.named(@configuration.sqs_queue_name) - return sqs_queue - end -end diff --git a/spec/amalgam/worker/job_spec.rb b/spec/amalgam/worker/job_spec.rb new file mode 100644 index 0000000..488a76c --- /dev/null +++ b/spec/amalgam/worker/job_spec.rb @@ -0,0 +1,47 @@ +require 'amalgam/worker' +require 'logger' + +class SampleJob < Amalgam::Worker::Job + def initialize(job_description, configuration) + + end + + def run + end +end + +describe Amalgam::Worker::Job do + before :all do + Amalgam::Worker.logger = Logger.new('/dev/null') + Amalgam::Worker::Job.unregister_job('asdf') + end + + after :each do + Amalgam::Worker::Job.unregister_job('asdf') + end + + after :all do + Amalgam::Worker.logger = nil + end + + it "disallows duplicate identifier registrations" do + Amalgam::Worker::Job.register_job('asdf', SampleJob) + + expect { + Amalgam::Worker::Job.register_job('asdf', SampleJob) + }.to raise_error + end + + it "creates the correctly registered job" do + Amalgam::Worker::Job.register_job('asdf', SampleJob) + job = Amalgam::Worker::Job.create({:job_type => 'asdf'}, nil) + + expect(job).to be_an_instance_of(SampleJob) + end + + it "should raise an error when creating an unregistered job" do + expect { + Amalgam::Worker::Job.create({:job_type => 'asdf'}, nil) + }.to raise_error + end +end diff --git a/spec/amalgam/worker/queue/test_queue_spec.rb b/spec/amalgam/worker/queue/test_queue_spec.rb new file mode 100644 index 0000000..b814e97 --- /dev/null +++ b/spec/amalgam/worker/queue/test_queue_spec.rb @@ -0,0 +1,28 @@ +require 'amalgam/worker' +require 'logger' + +describe Amalgam::Worker::Queue::TestQueue do + + before :all do + Amalgam::Worker.logger = Logger.new('/dev/null') + @queue = Amalgam::Worker::Queue.create(:test, nil) + end + + after :all do + Amalgam::Worker.logger = nil + end + + it 'should return the job descriptions added in order' do + a = {:type => 'build'} + b = {:type => 'test'} + @queue.enqueue(a) + @queue.enqueue(b) + + expect(@queue.poll).to eq(a) + expect(@queue.poll).to eq(b) + end + + it 'should return nil if there is no job available' do + expect(@queue.poll).to be_nil + end +end