From 17e9ea020388ee459dd5697522354ecefa169a73 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Tue, 4 Feb 2014 22:58:26 -0500 Subject: [PATCH 01/42] Clean slate for Workers V2. --- Gemfile | 5 +- Gemfile.lock | 47 ++-- Rakefile | 7 + amalgam-worker.gemspec | 26 ++ bin/amalgam-worker | 3 + common.rb | 11 - configuration.rb | 117 --------- lib/amalgam.rb | 0 lib/amalgam/worker.rb | 0 lib/amalgam/worker/job.rb | 0 lib/amalgam/worker/jobs/build_job.rb | 0 lib/amalgam/worker/jobs/run_job.rb | 0 lib/amalgam/worker/manager.rb | 0 lib/amalgam/worker/runner.rb | 0 main.rb | 52 ---- manager.rb | 142 ----------- runner.rb | 360 --------------------------- 17 files changed, 70 insertions(+), 700 deletions(-) create mode 100644 Rakefile create mode 100644 amalgam-worker.gemspec create mode 100755 bin/amalgam-worker delete mode 100644 common.rb delete mode 100644 configuration.rb create mode 100644 lib/amalgam.rb create mode 100644 lib/amalgam/worker.rb create mode 100644 lib/amalgam/worker/job.rb create mode 100644 lib/amalgam/worker/jobs/build_job.rb create mode 100644 lib/amalgam/worker/jobs/run_job.rb create mode 100644 lib/amalgam/worker/manager.rb create mode 100644 lib/amalgam/worker/runner.rb delete mode 100755 main.rb delete mode 100644 manager.rb delete mode 100644 runner.rb diff --git a/Gemfile b/Gemfile index 081baab..b4e2a20 100644 --- a/Gemfile +++ b/Gemfile @@ -1,6 +1,3 @@ source "https://rubygems.org" -gem "aws-sdk" -gem "git" -gem "httparty" -gem "safe_yaml" +gemspec diff --git a/Gemfile.lock b/Gemfile.lock index 6d7c79a..d21153d 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,26 +1,45 @@ +PATH + remote: . + specs: + amalgam-worker (0.1.0) + aws-sdk (~> 1.33.0) + httparty (~> 0.12.0) + rugged (~> 0.19.0) + safe_yaml (~> 1.0.1) + GEM remote: https://rubygems.org/ specs: - aws-sdk (1.8.5) + aws-sdk (1.33.0) json (~> 1.4) nokogiri (>= 1.4.4) uuidtools (~> 2.1) - git (1.2.5) - httparty (0.10.2) - multi_json (~> 1.0) + diff-lcs (1.2.5) + httparty (0.12.0) + json (~> 1.8) multi_xml (>= 0.5.2) - json (1.7.7) - multi_json (1.7.2) - multi_xml (0.5.3) - nokogiri (1.5.9) - safe_yaml (0.9.3) - uuidtools (2.1.3) + json (1.8.1) + mini_portile (0.5.2) + multi_xml (0.5.5) + nokogiri (1.6.1) + mini_portile (~> 0.5.0) + rake (10.1.1) + rspec (2.14.1) + rspec-core (~> 2.14.0) + rspec-expectations (~> 2.14.0) + rspec-mocks (~> 2.14.0) + rspec-core (2.14.7) + rspec-expectations (2.14.5) + diff-lcs (>= 1.1.3, < 2.0) + rspec-mocks (2.14.5) + rugged (0.19.0) + safe_yaml (1.0.1) + uuidtools (2.1.4) PLATFORMS ruby DEPENDENCIES - aws-sdk - git - httparty - safe_yaml + amalgam-worker! + rake (~> 10.1.1) + rspec (~> 2.14.1) diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..39481a1 --- /dev/null +++ b/Rakefile @@ -0,0 +1,7 @@ +require 'rspec' +require 'rspec/core' +require 'rspec/core/rake_task' + +RSpec::Core::RakeTask.new(:spec) do |t| + t.pattern = 'spec/**/*_spec.rb' +end diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec new file mode 100644 index 0000000..2656bea --- /dev/null +++ b/amalgam-worker.gemspec @@ -0,0 +1,26 @@ +Gem::Specification.new do |s| + s.name = 'amalgam-worker' + s.version = '0.1.0' + s.date = '2014-02-04' + s.summary = 'Worker program for Team Amalgam' + s.description = 'Runs moolloy build jobs and test jobs' + s.authors = [ 'Chris Kleynhans' ] + s.email = 'chris@kleynhans.ca' + s.homepage = 'http://github.com/TeamAmalgam/worker' + s.files = ['lib/amalgam.rb', + 'lib/amalgam/worker.rb', + 'lib/amalgam/worker/manager.rb', + 'lib/amalgam/worker/runner.rb', + 'lib/amalgam/worker/job.rb', + 'lib/amalgam/worker/jobs/build_job.rb', + 'lib/amalgam/worker/jobs/run_job.rb'] + s.executables = ['amalgam-worker'] + + s.add_runtime_dependency 'aws-sdk', '~> 1.33.0' + s.add_runtime_dependency 'safe_yaml', '~> 1.0.1' + s.add_runtime_dependency 'rugged', '~> 0.19.0' + s.add_runtime_dependency 'httparty', '~> 0.12.0' + + s.add_development_dependency 'rake', '~> 10.1.1' + s.add_development_dependency 'rspec', '~> 2.14.1' +end diff --git a/bin/amalgam-worker b/bin/amalgam-worker new file mode 100755 index 0000000..da08c74 --- /dev/null +++ b/bin/amalgam-worker @@ -0,0 +1,3 @@ +#!/usr/bin/env ruby + +require 'amalgam/worker' diff --git a/common.rb b/common.rb deleted file mode 100644 index 569a3ca..0000000 --- a/common.rb +++ /dev/null @@ -1,11 +0,0 @@ -def sha2_hash(filename) - digest = Digest::SHA2.new - - File.open(filename) do |file| - while not file.eof - digest << file.read(digest.block_length) - end - end - - digest.digest -end diff --git a/configuration.rb b/configuration.rb deleted file mode 100644 index 63ad81a..0000000 --- a/configuration.rb +++ /dev/null @@ -1,117 +0,0 @@ -require 'aws-sdk' -require 'yaml' - -class Configuration - - SETTINGS = [ - :access_key_id, - :secret_access_key, - :command, - :s3_bucket, - :sqs_queue_name, - :server_base_url, - :username, - :password, - :tmp_dir, - :git_repo, - :ssh_key, - :seed_repo_path, - :worker_timeout - ] - - MANDATORY_SETTINGS = [ - :access_key_id, - :secret_access_key, - :command, - :s3_bucket, - :sqs_queue_name, - :server_base_url, - :git_repo - ] - - SECONDS_PER_SECOND = 1 - SECONDS_PER_MINUTE = 60 - MINUTES_PER_HOUR = 60 - SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR - - # Define a public accessor for each setting. - # We don't use attr_reader because we want to synchronize the read. - SETTINGS.each do |setting| - define_method(setting) do - @configuration_mutex.synchronize do - return instance_variable_get("@#{setting}") - end - - end - end - - def initialize(config_file_path) - @config_file_path = File.absolute_path(config_file_path) - @configuration_mutex = Mutex.new - - self.update - end - - def update - @configuration_mutex.synchronize do - load_configuration - update_global_objects - end - end - - # Atomically reads multiple setting from the configuration. - def read_multiple(settings) - values_to_return = {} - - # Grab the configuration mutex to ensure atomic execution - @configuration_mutex.synchronize do - settings.each do |setting| - unless SETTINGS.include?(setting) - raise "Setting #{setting} does not exist" - end - - values_to_return[setting] = self.instance_variable_get("@#{setting}") - end - end - - return values_to_return - end - - private - - def update_global_objects - AWS.config(:access_key_id => @access_key_id, - :secret_access_key => @secret_access_key) - end - - def load_configuration - conf = YAML.safe_load(File.read(@config_file_path)) - validate_configuration_hash(conf) - - conf.each do |key, value| - self.instance_variable_set("@#{key}", value) - end - - if @worker_timeout.is_a?(Hash) - @worker_timeout = SECONDS_PER_HOUR * (@worker_timeout[:hours] || 0) + - SECONDS_PER_MINUTE * (@worker_timeout[:minutes] || 0) + - SECONDS_PER_SECOND * (@worker_timeout[:seconds] || 0) - end - end - - def validate_configuration_hash(hash) - # All hash keys must be settings - hash.each_key do |key| - unless SETTINGS.include?(key) - raise "Unknown setting #{key} specified in the configuration file." - end - end - - # All mandatory settings must be provided. - MANDATORY_SETTINGS.each do |setting| - unless hash.has_key?(setting) - raise "#{setting} was not specified in the configuration file." - end - end - end -end diff --git a/lib/amalgam.rb b/lib/amalgam.rb new file mode 100644 index 0000000..e69de29 diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb new file mode 100644 index 0000000..e69de29 diff --git a/lib/amalgam/worker/job.rb b/lib/amalgam/worker/job.rb new file mode 100644 index 0000000..e69de29 diff --git a/lib/amalgam/worker/jobs/build_job.rb b/lib/amalgam/worker/jobs/build_job.rb new file mode 100644 index 0000000..e69de29 diff --git a/lib/amalgam/worker/jobs/run_job.rb b/lib/amalgam/worker/jobs/run_job.rb new file mode 100644 index 0000000..e69de29 diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb new file mode 100644 index 0000000..e69de29 diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb new file mode 100644 index 0000000..e69de29 diff --git a/main.rb b/main.rb deleted file mode 100755 index 48011fb..0000000 --- a/main.rb +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env ruby - -require 'aws-sdk' -require 'safe_yaml' -require 'digest' -require 'tmpdir' -require 'benchmark' -require 'json' -require 'httparty' - -# Configuration depends on deserializing symbols. -# Worst case is that we run out of memory. -SafeYAML::OPTIONS[:deserialize_symbols] = true - -require_relative 'common.rb' -require_relative 'configuration.rb' -require_relative 'runner.rb' -require_relative 'manager.rb' - -config_file_path = ARGV[0] -raise "Configuration file path must be specified." if config_file_path.nil? -configuration = Configuration.new(config_file_path) - -puts "Timeout is: #{configuration.worker_timeout}" - -manager = Manager.new(configuration) -manager.run - -int_count = 0 -Signal.trap("USR1") do - int_count += 1 - if int_count == 1 - puts "Requesting Terminate" - manager.request_termination - elsif int_count == 2 - puts "Forcing Terminate" - manager.terminate - else - Kernel.exit! - end -end - -Signal.trap("USR2") do - manager.terminate_job -end - -Signal.trap("HUP") do - puts "Requesting configuration update" - manager.update_configuration -end - -manager.join diff --git a/manager.rb b/manager.rb deleted file mode 100644 index a933b73..0000000 --- a/manager.rb +++ /dev/null @@ -1,142 +0,0 @@ -require 'socket' - -class Manager - - def initialize(configuration) - @configuration = configuration - - @worker = Runner.new(configuration) - end - - def run - puts "Running with PID: #{Process.pid}" - @thread = Thread.new { - register - - @worker.run(@worker_id) - catch(:terminate) do - while true - 16.times do - sleep(15) - if @termination_requested - @worker.request_termination - @termination_requested = false - end - - if @termination_required - @worker.terminate - @termination_required = false - end - - if @configuration_update_requested - @configuration.update - @configuration_update_requested = false - puts "Configuration updated" - end - - if !@worker.run_start_time.nil? && - ((Time.now - @worker.run_start_time) >= @configuration.worker_timeout) - terminate_job - end - - if @worker.terminated? - throw :terminate - end - end - - if @termination_requested - @worker.request_termination - @termination_requested = false - end - - heartbeat(@worker.current_test_result_id) - end - end - @worker.join - unregister - } - end - - def join - @thread.join - end - - def request_termination - @termination_requested = true - end - - def terminate - @termination_required = true - end - - def update_configuration - @configuration_update_requested = true - end - - def terminate_job - puts "Terminating the current job" - @worker.terminate_job - end - -private - - def register - settings = @configuration.read_multiple([:server_base_url, :username, :password]) - puts "Registering with #{register_url(settings[:server_base_url])}" - response = HTTParty.post(register_url(settings[:server_base_url]), { - :body => { :hostname => hostname }.to_json, - :basic_auth => http_auth_params(settings) - }) - - parsed_response = JSON.parse(response.parsed_response) - @worker_id = parsed_response["worker_id"].to_i - end - - def unregister - settings = @configuration.read_multiple([:server_base_url, :username, :password]) - HTTParty.post(unregister_url(settings[:server_base_url]), { - :basic_auth => http_auth_params(settings) - }) - end - - def heartbeat(current_test_result_id) - settings = @configuration.read_multiple([:server_base_url, :username, :password]) - begin - HTTParty.post(heartbeat_url(settings[:server_base_url]), { - :body => { :test_id => current_test_result_id }.to_json, - :basic_auth => http_auth_params(settings) - }) - rescue Exception => e - puts "Manager failed to heartbeat:" - puts e.inspect - end - end - - def hostname - Socket.gethostname - end - - def http_auth_params(params_hash) - if params_hash[:username] || params_hash[:password] - return { - :username => params_hash[:username], - :password => params_hash[:password] - } - else - return nil - end - end - - def register_url(base_url) - "#{base_url}/workers/register" - end - - def unregister_url(base_url) - "#{base_url}/workers/#{@worker_id}/unregister" - end - - def heartbeat_url(base_url) - "#{base_url}/workers/#{@worker_id}/heartbeat" - end - -end diff --git a/runner.rb b/runner.rb deleted file mode 100644 index 57842a4..0000000 --- a/runner.rb +++ /dev/null @@ -1,360 +0,0 @@ -require 'fileutils' -class Runner - - SIGTERM = -15 - - # Process.kill uses negative signals to specify that a process group - # should be acted on instead of a single process. - PGROUP_SIGTERM = -1 * SIGTERM - - def initialize(configuration) - @configuration = configuration - @termination_requested = false - @current_test_result_id = nil - @run_start_time = nil - @worker_process_group = nil - end - - def run(worker_id) - @worker_id = worker_id - @thread = Thread.new do - catch(:termination) do - while !@termination_requested - get_sqs_queue.poll(:idle_timeout => 2 * 60) do |message| - begin - process_message(message) - rescue Exception => e - puts e.inspect - raise e - end - throw :termination if @termination_requested - end - end - end - end - end - - def terminated? - !@thread.status - end - - def request_termination - @termination_requested = true - end - - def terminate - @termination_requested = true - terminate_job - @thread.exit - end - - def join - @thread.join - end - - def current_test_result_id - return @current_test_result_id - end - - def run_start_time - return @run_start_time - end - - def terminate_job - unless @worker_process_group.nil? - - Process.kill(PGROUP_SIGTERM, @worker_process_group) - end - end - -private - - def process_message(message) - puts "Received Job:" - job_description = YAML.safe_load(message.body) - puts job_description.inspect - - @current_test_result_id = job_description[:test_id] - - started_at = Time.now - - post_start(job_description[:test_id]) - - results = nil - tarball_s3_key = nil - - Dir.mktmpdir(nil, @configuration.tmp_dir) do |temp_dir| - puts "Using temporary directory #{temp_dir}" - Dir.chdir(temp_dir) do - download_model(temp_dir, job_description[:model_s3_key]) - - # Delete the message since we are about to do a lot of work - # and we don't want another worker to pick it up. - message.delete - - compile_moolloy(temp_dir, job_description[:commit]) - results = run_moolloy(temp_dir) - tarball_s3_key = upload_results(temp_dir, message.id) - end - end - - post_results(job_description[:test_id], - message.id, - started_at, - results, - tarball_s3_key) - - puts "Finished processing job: #{message.id}." - @current_test_result_id = nil - end - - def download_model(temporary_directory, s3_key) - obj = get_s3_bucket.objects[s3_key] - tarball_filename = File.basename(s3_key) - model_directory = File.join(temporary_directory, "model") - - Dir.mkdir(model_directory) - Dir.chdir(model_directory) do - # Download the model - puts "Downloading the model" - - tarball_path = File.join(model_directory, tarball_filename) - File.open(tarball_path, "w") do |f| - obj.read do |chunk| - f.write(chunk) - end - end - - # Unpack the model - puts "Unpacking the model" - `tar -xf #{tarball_path}` - raise "Failed to extract" unless $?.to_i == 0 - end - end - - def compile_moolloy(temporary_directory, commit) - puts "Cloning repo." - - # If we have a seed repo then we will copy it into place and pull - # instead of cloning. This saves us bandwidth since the alloy repo is quite - # large. By using a seed repo we only need to pull the latest commits. - seed_repo_path = @configuration.seed_repo_path - if seed_repo_path - # We use --reflink=auto to reduce disk usage, it performs a shallow copy - # with copy-on-write if the operating system supports it. Otherwise, it - # will perform a regular copy. - puts "cp -r #{seed_repo_path} ./moolloy" - `cp -r #{seed_repo_path} ./moolloy` - raise "Failed to copy seed repo." unless $?.to_i == 0 - else - # Clone the repo using the ssh key specified in the configuration. - # We accomplish this by spawning a new ssh agent for the command and - # loading the key into it. - puts "ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git clone #{@configuration.git_repo} moolloy'" - `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git clone #{@configuration.git_repo} moolloy'` - raise "Failed to clone git repo." unless $?.to_i == 0 - end - - Dir.chdir(File.join(temporary_directory, "moolloy")) do - if seed_repo_path - # If we copied a seed we need to pull it to get the latest commits. - # Once again we use the key specified in the configuration. - puts "ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git pull" - `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git pull'` - raise "Failed to pull git repo." unless $?.to_i == 0 - end - - # Checkout the specific commit referenced by the job. - puts "Checking out commit #{commit}" - `git checkout #{commit}` - raise "Commit checkout failed." unless $?.to_i == 0 - - # Assert that we have successfully checked out the commit. - actual_commit = `git rev-parse HEAD`.chomp - unless $?.to_i == 0 && commit == actual_commit - raise "Didn't checkout the correct commit." - end - - `git submodule init` - raise "Submodule init failed." unless $?.to_i == 0 - - commit_file = File.absolute_path(File.join(temporary_directory, "commit")) - `git rev-parse HEAD > #{commit_file}` - `git submodule foreach 'echo $path \`git rev-parse HEAD\` >> #{commit_file}'` - - # Update the submodules using the ssh key given by the configuration. - puts "ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git submodule update'" - `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; git submodule update'` - raise "Submodule update failed." unless $?.to_i == 0 - - # Build Moolloy - puts "Building moolloy" - `ant deps` - raise "Failed to download dependencies." unless $?.to_i == 0 - `ant configure` - raise "Failed to configure build." unless $?.to_i == 0 - `ant dist` - raise "Failed to build." unless $?.to_i == 0 - end - - - puts "Acquiring jar file" - dist_path = File.join(temporary_directory, - "moolloy", - "dist", - "alloy-dev.jar") - FileUtils.mv(dist_path, File.join(temporary_directory, "moolloy.jar")) - end - - def run_moolloy(temporary_directory) - model_directory = File.join(temporary_directory, "model") - - puts "Running moolloy." - - @run_start_time = Time.now - benchmark_result = Benchmark.measure do - pid = Process.spawn("#{@configuration.command} -jar #{File.join(temporary_directory, "moolloy.jar")} \"#{model_directory}/model.als\" > stdout.out 2> stderr.out", :pgroup => true) - @worker_process_group = Process.getpgid(pid) - Process.wait(pid) - @worker_process_group = nil - end - @run_start_time = nil - - return_code = $?.to_i - - # Determine if the solutions match the model solutions - correct = (return_code == 0) - test_solution_files = Dir[File.join(temporary_directory, "alloy_solutions_*.xml")] - model_solution_files = Dir[File.join(model_directory, "alloy_solutions_*.xml")] - - if test_solution_files.count != model_solution_files.count - puts "Wrong number of solutions generated." - correct = false - end - - hash_to_model_solution = {} - model_solution_files.each do |model_file| - `tail -n +2 #{model_file} > #{model_file}.trimmed` - raise "Failed to trim model solution #{model_file}." unless $?.to_i == 0 - hash = sha2_hash(model_file + ".trimmed") - hash_to_model_solution[hash] ||= [] - hash_to_model_solution[hash] << model_file - end - - test_solution_files.each do |test_file| - `tail -n +2 #{test_file} > #{test_file}.trimmed` - raise "Failed to trim test solution #{model_file}." unless $?.to_i == 0 - hash = sha2_hash(test_file + ".trimmed") - if !hash_to_model_solution[hash].nil? && - !hash_to_model_solution[hash].empty? - matching_file = hash_to_model_solution[hash].shift - puts "#{test_file} matches #{matching_file}." - else - puts "#{test_file} has no match." - correct = false - end - end - - if correct - puts "The solutions are correct." - else - puts "The solutions are incorrect." - end - - return { - :correct => correct, - :return_code => return_code, - :benchmark_result => benchmark_result - } - end - - def upload_results(temporary_directory, message_id) - package_directory = File.join(temporary_directory, "package") - stdout_path = File.join(temporary_directory, "stdout.out") - stderr_path = File.join(temporary_directory, "stderr.out") - model_path = File.join(temporary_directory, "model") - alloy_solutions_path = File.join(temporary_directory, "alloy_solutions_*.xml") - commit_path = File.join(temporary_directory, "commit") - - FileUtils.mkdir(package_directory) - FileUtils.mv(stdout_path, package_directory) - FileUtils.mv(stderr_path, package_directory) - FileUtils.mv(model_path, package_directory) - FileUtils.mv(commit_path, package_directory) - - Dir[alloy_solutions_path].each do |fpath| - FileUtils.mv(fpath, package_directory) - end - - # Tarball the entire temporary directory - tarball_path = File.join(temporary_directory, "tarball.tar.bz2") - Dir.chdir(package_directory) do - `hostname > ./hostname` - `tar -cjf "#{tarball_path}" #{File.join(".", "*")}` - raise "Failed to create package archive." unless $?.to_i == 0 - end - - # Upload the tarball to s3 - key = "results/" + message_id + ".tar.bz2" - get_s3_bucket.objects[key].write(:file => tarball_path) - - return key - end - - def post_start(test_id) - post_url = "#{@configuration.server_base_url}/workers/#{@worker_id}/start" - puts "Making post request to #{post_url}" - - body = { - :test_id => test_id - } - - HTTParty.post(post_url, { - :body => body.to_json, - :basic_auth => get_http_auth - }) - end - - def post_results(test_id, message_id, started_at, results, s3_key) - post_url = "#{@configuration.server_base_url}/workers/#{@worker_id}/result" - puts "Making post request to #{post_url}" - - completion_body = { - :test_id => test_id, - :secret_key => message_id, - :return_code => results[:return_code], - :correct => results[:correct] ? 1 : 0, - :started_at => started_at, - :runtime_seconds => results[:benchmark_result].real, - :cpu_time_seconds => results[:benchmark_result].total, - :tarball_s3_key => s3_key - } - - HTTParty.post(post_url, { - :body => completion_body.to_json, - :basic_auth => get_http_auth - }) - end - - def get_http_auth - auth = @configuration.read_multiple([:username, :password]) - if auth[:username] || auth[:password] - return auth - else - return nil - end - return auth - end - - def get_s3_bucket - s3_client = AWS::S3.new - s3_bucket = s3_client.buckets[@configuration.s3_bucket] - return s3_bucket - end - - def get_sqs_queue - sqs_client = AWS::SQS.new - sqs_queue = sqs_client.queues.named(@configuration.sqs_queue_name) - return sqs_queue - end -end From ae7cd75e71b5456f1544a9342fa447d58c855bdd Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Tue, 4 Feb 2014 23:32:38 -0500 Subject: [PATCH 02/42] Initial skeleton. --- lib/amalgam.rb | 4 +++ lib/amalgam/worker.rb | 9 ++++++ lib/amalgam/worker/job.rb | 46 ++++++++++++++++++++++++++++ lib/amalgam/worker/job/build_job.rb | 5 +++ lib/amalgam/worker/job/run_job.rb | 5 +++ lib/amalgam/worker/jobs/build_job.rb | 0 lib/amalgam/worker/jobs/run_job.rb | 0 spec/amalgam/worker/job_spec.rb | 41 +++++++++++++++++++++++++ 8 files changed, 110 insertions(+) create mode 100644 lib/amalgam/worker/job/build_job.rb create mode 100644 lib/amalgam/worker/job/run_job.rb delete mode 100644 lib/amalgam/worker/jobs/build_job.rb delete mode 100644 lib/amalgam/worker/jobs/run_job.rb create mode 100644 spec/amalgam/worker/job_spec.rb diff --git a/lib/amalgam.rb b/lib/amalgam.rb index e69de29..9fd1104 100644 --- a/lib/amalgam.rb +++ b/lib/amalgam.rb @@ -0,0 +1,4 @@ +module Amalgam +end + +require_relative 'amalgam/worker' diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index e69de29..5f0ac4f 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -0,0 +1,9 @@ +require_relative '../amalgam' + +module Amalgam::Worker + +end + +require_relative 'worker/manager' +require_relative 'worker/runner' +require_relative 'worker/job' diff --git a/lib/amalgam/worker/job.rb b/lib/amalgam/worker/job.rb index e69de29..1b3bacb 100644 --- a/lib/amalgam/worker/job.rb +++ b/lib/amalgam/worker/job.rb @@ -0,0 +1,46 @@ +class Amalgam::Worker::Job + + def initialize(description) + raise "Attempt to create abstract job." + end + + def run + raise "Attempt to run abstract job." + end + + class << self + def register_job(identifier, klass) + @jobs ||= {} + + unless @jobs[identifier].nil? + raise "Job with identifier '#{identifier}' already registered." + end + + @jobs[identifier] = klass + end + + def unregister_job(identifier) + @jobs[identifier] = nil + end + + def create(job_description) + jobs = @jobs || {} + + job_identifier = job_description[:type] + + if job_identifier.nil? + raise "Job description does not specify a job type." + end + + if @jobs[job_identifier].nil? + raise "No job type registered for identifier '#{identifier}'" + end + + return @jobs[job_identifier].new(job_description) + end + end + +end + +require_relative 'job/build_job' +require_relative 'job/run_job' diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb new file mode 100644 index 0000000..5f60750 --- /dev/null +++ b/lib/amalgam/worker/job/build_job.rb @@ -0,0 +1,5 @@ +class Amalgam::Worker::Job::BuildJob + +end + +Amalgam::Worker::Job.register_job('build', Amalgam::Worker::Job::BuildJob) diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb new file mode 100644 index 0000000..eb6bb30 --- /dev/null +++ b/lib/amalgam/worker/job/run_job.rb @@ -0,0 +1,5 @@ +class Amalgam::Worker::Job::RunJob < Amalgam::Worker::Job + +end + +Amalgam::Worker::Job.register_job('run', Amalgam::Worker::Job::RunJob) diff --git a/lib/amalgam/worker/jobs/build_job.rb b/lib/amalgam/worker/jobs/build_job.rb deleted file mode 100644 index e69de29..0000000 diff --git a/lib/amalgam/worker/jobs/run_job.rb b/lib/amalgam/worker/jobs/run_job.rb deleted file mode 100644 index e69de29..0000000 diff --git a/spec/amalgam/worker/job_spec.rb b/spec/amalgam/worker/job_spec.rb new file mode 100644 index 0000000..abadb86 --- /dev/null +++ b/spec/amalgam/worker/job_spec.rb @@ -0,0 +1,41 @@ +require 'amalgam/worker' + +class SampleJob < Amalgam::Worker::Job + def initialize(job_description) + + end + + def run + end +end + +describe Amalgam::Worker::Job do + before :all do + Amalgam::Worker::Job.unregister_job('asdf') + end + + after :each do + Amalgam::Worker::Job.unregister_job('asdf') + end + + it "disallows duplicate identifier registrations" do + Amalgam::Worker::Job.register_job('asdf', SampleJob) + + expect { + Amalgam::Worker::Job.register_job('asdf', SampleJob) + }.to raise_error + end + + it "creates the correctly registered job" do + Amalgam::Worker::Job.register_job('asdf', SampleJob) + job = Amalgam::Worker::Job.create({:type => 'asdf'}) + + expect(job).to be_an_instance_of(SampleJob) + end + + it "should raise an error when creating an unregistered job" do + expect { + Amalgam::Worker::Job.create({:type => 'asdf'}) + }.to raise_error + end +end From 20fecb1cd515064046cb58dcbec1c973aeece74a Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Tue, 4 Feb 2014 23:34:54 -0500 Subject: [PATCH 03/42] Forgot to update the files array. --- amalgam-worker.gemspec | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index 2656bea..c9cc6e0 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -12,8 +12,8 @@ Gem::Specification.new do |s| 'lib/amalgam/worker/manager.rb', 'lib/amalgam/worker/runner.rb', 'lib/amalgam/worker/job.rb', - 'lib/amalgam/worker/jobs/build_job.rb', - 'lib/amalgam/worker/jobs/run_job.rb'] + 'lib/amalgam/worker/job/build_job.rb', + 'lib/amalgam/worker/job/run_job.rb'] s.executables = ['amalgam-worker'] s.add_runtime_dependency 'aws-sdk', '~> 1.33.0' From 0e6c12d2fecf45e61584500ff01ec09b8a707730 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Tue, 4 Feb 2014 23:40:20 -0500 Subject: [PATCH 04/42] First try at getting Travis set up. --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..85387a3 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +language: ruby +rvm: + - 2.1.0 + - 2.0.0 +script: rake spec From dfb2af4c82d3b28da7a61564d4938ae1ea886fe9 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Tue, 4 Feb 2014 23:44:55 -0500 Subject: [PATCH 05/42] Need to install the development group to run tests. --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 85387a3..980226f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,4 +2,5 @@ language: ruby rvm: - 2.1.0 - 2.0.0 +bundler_args: --with development script: rake spec From 23b8f17c869696b112371da91e397ec787d373c8 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Tue, 4 Feb 2014 23:48:39 -0500 Subject: [PATCH 06/42] Specify our own install script for travis. --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 980226f..6ae3653 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,5 +2,5 @@ language: ruby rvm: - 2.1.0 - 2.0.0 -bundler_args: --with development +install: bundle install script: rake spec From c6186f44b784039745f4b0df3c6a49a048274214 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Tue, 4 Feb 2014 23:56:55 -0500 Subject: [PATCH 07/42] Made :spec the default Rake task. --- Rakefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Rakefile b/Rakefile index 39481a1..29771fa 100644 --- a/Rakefile +++ b/Rakefile @@ -2,6 +2,8 @@ require 'rspec' require 'rspec/core' require 'rspec/core/rake_task' +task :default => :spec + RSpec::Core::RakeTask.new(:spec) do |t| t.pattern = 'spec/**/*_spec.rb' end From b7b8a0171ea6f951985a763f49ee0684d5155212 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 00:09:48 -0500 Subject: [PATCH 08/42] Added Queue class and TestQueue implementation. --- lib/amalgam/worker.rb | 1 + lib/amalgam/worker/queue.rb | 8 +++++++ lib/amalgam/worker/queue/sqs_queue.rb | 3 +++ lib/amalgam/worker/queue/test_queue.rb | 15 +++++++++++++ spec/amalgam/worker/queue/test_queue_spec.rb | 22 ++++++++++++++++++++ 5 files changed, 49 insertions(+) create mode 100644 lib/amalgam/worker/queue.rb create mode 100644 lib/amalgam/worker/queue/sqs_queue.rb create mode 100644 lib/amalgam/worker/queue/test_queue.rb create mode 100644 spec/amalgam/worker/queue/test_queue_spec.rb diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index 5f0ac4f..2d919bb 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -7,3 +7,4 @@ module Amalgam::Worker require_relative 'worker/manager' require_relative 'worker/runner' require_relative 'worker/job' +require_relative 'worker/queue' diff --git a/lib/amalgam/worker/queue.rb b/lib/amalgam/worker/queue.rb new file mode 100644 index 0000000..c0a0744 --- /dev/null +++ b/lib/amalgam/worker/queue.rb @@ -0,0 +1,8 @@ +class Amalgam::Worker::Queue + def poll + raise "Attempt to poll an abstract queue." + end +end + +require_relative 'queue/sqs_queue' +require_relative 'queue/test_queue' diff --git a/lib/amalgam/worker/queue/sqs_queue.rb b/lib/amalgam/worker/queue/sqs_queue.rb new file mode 100644 index 0000000..cf5dd08 --- /dev/null +++ b/lib/amalgam/worker/queue/sqs_queue.rb @@ -0,0 +1,3 @@ +class Amalgam::Worker::Queue::SqsQueue + +end diff --git a/lib/amalgam/worker/queue/test_queue.rb b/lib/amalgam/worker/queue/test_queue.rb new file mode 100644 index 0000000..57df5cb --- /dev/null +++ b/lib/amalgam/worker/queue/test_queue.rb @@ -0,0 +1,15 @@ +class Amalgam::Worker::Queue::TestQueue + + def initialize + @items = [] + end + + def enqueue(job_description) + @items << job_description + end + + def poll + @items.shift + end + +end diff --git a/spec/amalgam/worker/queue/test_queue_spec.rb b/spec/amalgam/worker/queue/test_queue_spec.rb new file mode 100644 index 0000000..66ce003 --- /dev/null +++ b/spec/amalgam/worker/queue/test_queue_spec.rb @@ -0,0 +1,22 @@ +require 'amalgam/worker' + +describe Amalgam::Worker::Queue::TestQueue do + + before :all do + @queue = Amalgam::Worker::Queue::TestQueue.new + end + + it 'should return the job descriptions added in order' do + a = {:type => 'build'} + b = {:type => 'test'} + @queue.enqueue(a) + @queue.enqueue(b) + + expect(@queue.poll).to eq(a) + expect(@queue.poll).to eq(b) + end + + it 'should return nil if there is no job available' do + expect(@queue.poll).to be_nil + end +end From dc32868e75745b87d96e4859288f8bc51a9969b8 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 00:11:18 -0500 Subject: [PATCH 09/42] Added *.swp to gitignore. --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 560d1a6..62422b2 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ tmp .yardoc _yardoc doc/ + +*.swp From bfca834058a33979f2945139306a84721194396a Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 00:34:44 -0500 Subject: [PATCH 10/42] Added some placeholder methods to Worker. --- bin/amalgam-worker | 3 +++ lib/amalgam/worker.rb | 8 +++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/bin/amalgam-worker b/bin/amalgam-worker index da08c74..be0ae87 100755 --- a/bin/amalgam-worker +++ b/bin/amalgam-worker @@ -1,3 +1,6 @@ #!/usr/bin/env ruby require 'amalgam/worker' + +worker = Amalgam::Worker.new +worker.run diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index 2d919bb..d56bac1 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -1,7 +1,13 @@ require_relative '../amalgam' -module Amalgam::Worker +class Amalgam::Worker + def initialize + end + + def run + + end end require_relative 'worker/manager' From acd035f67cd8820703faedcac1228887f94aa25e Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 00:56:03 -0500 Subject: [PATCH 11/42] More skeleton. --- lib/amalgam/worker.rb | 2 ++ lib/amalgam/worker/configuration.rb | 3 +++ lib/amalgam/worker/heartbeater.rb | 3 +++ 3 files changed, 8 insertions(+) create mode 100644 lib/amalgam/worker/configuration.rb create mode 100644 lib/amalgam/worker/heartbeater.rb diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index d56bac1..dd7df87 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -14,3 +14,5 @@ def run require_relative 'worker/runner' require_relative 'worker/job' require_relative 'worker/queue' +require_relative 'worker/configuration' +require_relative 'worker/heartbeater' diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb new file mode 100644 index 0000000..5d44bc7 --- /dev/null +++ b/lib/amalgam/worker/configuration.rb @@ -0,0 +1,3 @@ +class Amalgam::Worker::Configuration + +end diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb new file mode 100644 index 0000000..4f1fccc --- /dev/null +++ b/lib/amalgam/worker/heartbeater.rb @@ -0,0 +1,3 @@ +class Amalgam::Worker::Heartbeater + +end From f00aa28d31f39838c95485d4410c2c06b240ba47 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 18:31:57 -0500 Subject: [PATCH 12/42] More skeleton work. --- amalgam-worker.gemspec | 4 ++ lib/amalgam/worker.rb | 2 + lib/amalgam/worker/configuration.rb | 105 ++++++++++++++++++++++++++++ lib/amalgam/worker/downloader.rb | 5 ++ lib/amalgam/worker/heartbeater.rb | 18 +++++ lib/amalgam/worker/manager.rb | 23 ++++++ lib/amalgam/worker/runner.rb | 22 ++++++ lib/amalgam/worker/uploader.rb | 5 ++ 8 files changed, 184 insertions(+) create mode 100644 lib/amalgam/worker/downloader.rb create mode 100644 lib/amalgam/worker/uploader.rb diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index c9cc6e0..c2db745 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -11,6 +11,10 @@ Gem::Specification.new do |s| 'lib/amalgam/worker.rb', 'lib/amalgam/worker/manager.rb', 'lib/amalgam/worker/runner.rb', + 'lib/amalgam/worker/configuration.rb', + 'lib/amalgam/worker/downloader.rb', + 'lib/amalgam/worker/heartbeater.rb', + 'lib/amalgam/worker/uploader.rb', 'lib/amalgam/worker/job.rb', 'lib/amalgam/worker/job/build_job.rb', 'lib/amalgam/worker/job/run_job.rb'] diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index dd7df87..b31e63e 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -16,3 +16,5 @@ def run require_relative 'worker/queue' require_relative 'worker/configuration' require_relative 'worker/heartbeater' +require_relative 'worker/downloader' +require_relative 'worker/uploader' diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index 5d44bc7..5253e83 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -1,3 +1,108 @@ class Amalgam::Worker::Configuration + SETTINGS = [ + :access_key_id, + :secret_access_key, + :s3_bucket, + :sqs_queue_name, + :server_base_url, + :username, + :password, + :tmp_dir, + :git_repo, + :ssh_key, + :worker_timeout + ] + + MANDATORY_SETTINGS = [ + :access_key_id, + :secret_access_key, + :s3_bucket, + :sqs_queue_name, + :server_base_url, + :git_repo + ] + + ATTRIBUTES = SETTINGS + [ + :uploader, + :downloader, + :queue, + :heartbeater + ] + + SECONDS_PER_SECOND = 1 + SECONDS_PER_MINUTE = 60 + MINUTES_PER_HOUR = 60 + SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR + + ATTRIBUTES.each do |name| + define_method(name) do + @configuration_mutex.synchronize { + return instance_variable_get("@#{name}") + } + end + end + + def initialize(config_file_path) + @attributes.each do |name| + instance_variable_set("@#{name}", nil) + end + + @config_file_path = File.absolute_path(config_file_path) + @configuration_mutex = Mutex.new + self.reload + end + + def reload + @configuration_mutex.synchronize { + load_configuration + update_global_objects + update_configuration_objects + } + end + + private + + def load_configuration + conf = YAML.safe_load(File.read(@config_file_path)) + validate_configuration_hash(conf) + + conf.each do |key, value| + self.instance_variable_set("@#{key}", value) + end + + if @worker_timeout.is_a?(Hash) + @worker_timeout = SECONDS_PER_HOUR * (@worker_timeout[:hours] || 0) + + SECONDS_PER_MINUTE * (@worker_timeout[:minutes] || 0) + + SECONDS_PER_SECOND * (@worker_timeout[:seconds] || 0) + end + end + + def update_global_objects + AWS.config(:access_key_id => @access_key_id, + :secret_access_key => @secret_access_key) + end + + def update_configuration_objects + @uploader = Amalgam::Worker::Uploader.new(@s3_bucket) + @downloader = Amalgam::Worker::Downloader.new(@s3_bucket) + @queue = Amalgam::Worker::Queue::SqsQueue.new(@sqs_queue_name) + @heartbeater = Amalgam::Worker::Heartbeater.new(@server_base_url, @username, @password) + end + + def validate_configuration_hash(hash) + # All hash keys provided must be settings + hash.each_key do |key| + unless SETTINGS.include?(key) + raise "Unknown setting #{key} specified in the configuration file." + end + end + + # All mandatory settings must be provided + MANDATORY_SETTINGS.each do |setting| + unless hash.has_key?(setting) + raise "#{setting} was not specified in the configuration file." + end + end + end end diff --git a/lib/amalgam/worker/downloader.rb b/lib/amalgam/worker/downloader.rb new file mode 100644 index 0000000..e98d73a --- /dev/null +++ b/lib/amalgam/worker/downloader.rb @@ -0,0 +1,5 @@ +class Amalgam::Worker::Downloader + def initialize(s3_bucket_name) + + end +end diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb index 4f1fccc..b11c59e 100644 --- a/lib/amalgam/worker/heartbeater.rb +++ b/lib/amalgam/worker/heartbeater.rb @@ -1,3 +1,21 @@ class Amalgam::Worker::Heartbeater + def initialize(server_base_url, username, password) + end + + def register + + end + + def heartbeat + + end + + def signal_completion + + end + + def unregister + + end end diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb index e69de29..279939e 100644 --- a/lib/amalgam/worker/manager.rb +++ b/lib/amalgam/worker/manager.rb @@ -0,0 +1,23 @@ +class Amalgam::Worker::Manager + + def initialize + @thread = nil + end + + def run + @thread = Thread.new { + thread_main + } + end + + def join + @thread.join + @thread = nil + end + + private + + def thread_main + + end +end diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index e69de29..0e6eb3d 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -0,0 +1,22 @@ +class Amalgam::Worker::Runner + def initialize(job_description) + @thread = nil + end + + def run + @thread = Thread.new { + thread_main + } + end + + def join + @thread.join + @thread = nil + end + + private + + def thread_main + + end +end diff --git a/lib/amalgam/worker/uploader.rb b/lib/amalgam/worker/uploader.rb new file mode 100644 index 0000000..949f707 --- /dev/null +++ b/lib/amalgam/worker/uploader.rb @@ -0,0 +1,5 @@ +class Amalgam::Worker::Uploader + def initialize(s3_bucket_name) + + end +end From 7835e29db5d1fe03f9e0d9a3f60681716dbc1caa Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 21:19:55 -0500 Subject: [PATCH 13/42] Filled out how we want the executable to work. --- bin/amalgam-worker | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/bin/amalgam-worker b/bin/amalgam-worker index be0ae87..c5ee4e5 100755 --- a/bin/amalgam-worker +++ b/bin/amalgam-worker @@ -2,5 +2,24 @@ require 'amalgam/worker' -worker = Amalgam::Worker.new +SafeYAML::OPTIONS[:deserialize_symbols] = true + +config_file_path = ARGV[0] +raise "Configuration path must be specified." if config_file_path.nil? + +worker = Amalgam::Worker.new(config_file_path) worker.run + +Signal.trap("USR1") do + worker.termination_request +end + +Signal.trap("USR2") do + worker.terminate_current_job +end + +Signal.trap("HUP") do + worker.update_configuration +end + +worker.join \ No newline at end of file From bd05bc45d9c517be917126e4f6d1f77b983381fe Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 21:20:54 -0500 Subject: [PATCH 14/42] Add .rvmrc to the gitignore. --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 62422b2..0431c10 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ _yardoc doc/ *.swp +.rvmrc From 9c89cde9c1d76a5bf9ad50a19ce586f7b5dc3792 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 21:21:13 -0500 Subject: [PATCH 15/42] Fill out how the heartbeater will work. --- lib/amalgam/worker/heartbeater.rb | 97 ++++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 3 deletions(-) diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb index b11c59e..1b65b3f 100644 --- a/lib/amalgam/worker/heartbeater.rb +++ b/lib/amalgam/worker/heartbeater.rb @@ -1,21 +1,112 @@ +require 'socket' + class Amalgam::Worker::Heartbeater - def initialize(server_base_url, username, password) + + attr_reader :worker_id + + def initialize(server_base_url, username, password, old_heartbeater = nil) + @server_base_url = server_base_url + + @worker_id = nil + unless old_heartbeater.nil? + @worker_id = old_heartbeater.worker_id + end + @auth_params = nil + unless username.nil? && password.nil? + @auth_params = { + :username => username, + :password => password + } + end end + def register + raise "Already registered." unless @worker_id.nil? + + Amalgam::Worker.logger.info("Registering with server.") + + response = HTTParty.post(register_url, { + :body => { :hostname => hostname }.to_json, + :basic_auth => @auth_params + }) + parsed_response = JSON.parse(response.parsed_response) + + # Validate the response. + + if parsed_response["worker_id"].nil? + raise "Response did not contain worker_id." + end + + @worker_id = parsed_response["worker_id"].to_i end - def heartbeat + def heartbeat(current_job_id) + raise "Not registered." if @worker_id.nil? + begin + HTTTParty.post(heartbeat_url, { + :body => { :job_id => current_job_id }.to_json, + :basic_auth => @auth_params + }) + rescue => err + Amalgam::Worker.logger.error("Manager Failed to Heartbeat") + Amalgam::Worker.logger.error(err.inspect) + end end - def signal_completion + def signal_start(job_id) + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Signalling start to server.") + HTTParty.post(start_url(job_id), { + :basic_auth => @auth_params + }) + end + def signal_completion(job_id, result_body) + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Signalling completion to server.") + HTTParty.post(completion_url(job_id), { + :body => result_body.to_json, + :basic_auth => @auth_params + }) end def unregister + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Unregistering with server.") + + HTTParty.post(unregister_url, { + :basic_auth => @auth_params + }) + end + private + + def hostname + Socket.gethostname + end + + def register_url + "#{@server_base_url}/workers/register" + end + + def unregister_url + "#{@server_base_url}/workers/#{@worker_id}/unregister" + end + + def heartbeat_url + "#{@server_base_url}/workers/#{@worker_id}/heartbeat" + end + + def start_url(job_id) + "#{@server_base_url}/jobs/#{job_id}/start" end + + def completion_url(job_id) + "#{@server_base_url}/jobs/#{job_id}/complete" + end + end From 5e4adec9ae2f0eaae08ae20bbcac7a96e108db0c Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 22:04:22 -0500 Subject: [PATCH 16/42] Need to pass old heartbeater to the constructor. --- lib/amalgam/worker/configuration.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index 5253e83..e332bdb 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -87,7 +87,10 @@ def update_configuration_objects @uploader = Amalgam::Worker::Uploader.new(@s3_bucket) @downloader = Amalgam::Worker::Downloader.new(@s3_bucket) @queue = Amalgam::Worker::Queue::SqsQueue.new(@sqs_queue_name) - @heartbeater = Amalgam::Worker::Heartbeater.new(@server_base_url, @username, @password) + @heartbeater = Amalgam::Worker::Heartbeater.new(@server_base_url, + @username, + @password, + @heartbeater) end def validate_configuration_hash(hash) From 3d6f641d17535d9c536e654f10a27db73a4b6c7b Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 22:16:25 -0500 Subject: [PATCH 17/42] Filled out the manager. --- lib/amalgam/worker/configuration.rb | 38 +++++++-- lib/amalgam/worker/heartbeater.rb | 2 +- lib/amalgam/worker/manager.rb | 118 +++++++++++++++++++++++++++- lib/amalgam/worker/runner.rb | 2 +- 4 files changed, 150 insertions(+), 10 deletions(-) diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index e332bdb..2716f7b 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -1,5 +1,10 @@ class Amalgam::Worker::Configuration + SECONDS_PER_SECOND = 1 + SECONDS_PER_MINUTE = 60 + MINUTES_PER_HOUR = 60 + SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR + SETTINGS = [ :access_key_id, :secret_access_key, @@ -11,7 +16,9 @@ class Amalgam::Worker::Configuration :tmp_dir, :git_repo, :ssh_key, - :worker_timeout + :worker_timeout, + :heartbeat_period, + :sleep_interval ] MANDATORY_SETTINGS = [ @@ -20,9 +27,32 @@ class Amalgam::Worker::Configuration :s3_bucket, :sqs_queue_name, :server_base_url, - :git_repo + :git_repo, + :heartbeat_period, ] + NON_MANDATORY_SETTINGS = SETTINGS - MANDATORY_SETTINGS + + SETTING_DEFAULTS = { + :worker_timeout => 48 * SECONDS_PER_HOUR, + :heartbeat_period => 5 * SECONDS_PER_MINUTE, + :sleep_interval => 15 * SECONDS_PER_SECOND, + :tmp_dir => "/tmp", + :username => nil, + :password => nil, + :ssh_key => nil + } + + # All Non-Mandatory settings must have an entry in the + # SETTING_DEFAULTS hash (even if they are nil by default) + # to ensure we have considered what happens by default + # for the Non-Mandatory settings. + NON_MANDATORY_SETTINGS.each do |name| + unless SETTING_DEFAULTS.has_key?(name) + raise "No default for non-mandatory setting #{name}" + end + end + ATTRIBUTES = SETTINGS + [ :uploader, :downloader, @@ -30,10 +60,6 @@ class Amalgam::Worker::Configuration :heartbeater ] - SECONDS_PER_SECOND = 1 - SECONDS_PER_MINUTE = 60 - MINUTES_PER_HOUR = 60 - SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR ATTRIBUTES.each do |name| define_method(name) do diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb index 1b65b3f..c77aa7a 100644 --- a/lib/amalgam/worker/heartbeater.rb +++ b/lib/amalgam/worker/heartbeater.rb @@ -43,7 +43,7 @@ def register @worker_id = parsed_response["worker_id"].to_i end - def heartbeat(current_job_id) + def heartbeat(current_job_id = nil) raise "Not registered." if @worker_id.nil? begin diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb index 279939e..6f67fc4 100644 --- a/lib/amalgam/worker/manager.rb +++ b/lib/amalgam/worker/manager.rb @@ -1,7 +1,11 @@ class Amalgam::Worker::Manager - def initialize + def initialize(configuration) + @configuration = configuration @thread = nil + @time_of_last_heartbeat = nil + @configuration_update_requested = false + @termination_requested = false end def run @@ -14,10 +18,120 @@ def join @thread.join @thread = nil end - + + def request_termination + @termination_requested = true + end + + def terminate_current_job + @job_termination_requested = true + end + private def thread_main + begin + register + rescue => err + Amalgam::Worker.logger.error("Manager failed to register.") + Amalgam::Worker.logger.error(err.inspect) + end + + begin + while (!@termination_requested) + job = poll_for_job + + unless job.nil? + run_job(job) + end + end + rescue => err + Amalgam::Worker.logger.error("Manager encountered exception.") + Amalgam::Worker.logger.error(err.inspect) + ensure + unregister + end + end + + def register + heartbeater = @configuration.heartbeater + heartbeater.register + end + + def unregister + heartbeater = @configuration.heartbeater + heartbeater.unregister + end + + def poll_for_job + result = nil + + while (result.nil? && !@termination_requested) + queue = @configuration.queue + result = queue.poll + + maybe_do_heartbeat + maybe_update_configuration + end + + return result + end + + def maybe_do_heartbeat(job_id) + if (@time_of_last_heartbeat.nil? || + seconds_since_last_heartbeat >= @configuration.heartbeat_period) + + heartbeater = @configuration.heartbeater + heartbeater.heartbeat(job_id) + + @time_of_last_heartbeat = Time.now + end + end + + def maybe_update_configuration + if @configuration_update_requested + @configuration.reload + end + end + + def seconds_since_last_heartbeat + return (Time.now - @time_of_last_heartbeat) + end + + def run_job(job_description) + job_id = job_description[:job_id] + @configuration.heartbeater.signal_start(job_id) + + # Ensure that a previous job termination will not carry-over + @job_termination_requested = false + + @runner = Amalgam::Worker::Runner.new(@configuration, job_description) + @runner.run + + @job_start = Time.now + + while (@runner.running) + maybe_do_heartbeat(job_id) + maybe_update_configuration + maybe_terminate_job + + sleep(@configuration.sleep_interval) + end + + result = @runner.result + @configuration.heartbeater.signal_completion(job_id, result) + end + + def seconds_since_job_start + return (Time.now - @job_start) + end + def maybe_terminate_job + if (seconds_since_job_start >= + @configuration.worker_timeout || + @job_termination_requested) + @runner.terminate + @job_termination_requested = false + end end end diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index 0e6eb3d..ea3a74d 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -1,5 +1,5 @@ class Amalgam::Worker::Runner - def initialize(job_description) + def initialize(configuration, job_description) @thread = nil end From a8a924a906c937adb6de8487d93c9d3249e68c88 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 5 Feb 2014 23:06:24 -0500 Subject: [PATCH 18/42] Started filling out the worker. --- lib/amalgam/worker/configuration.rb | 2 +- lib/amalgam/worker/runner.rb | 32 +++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index 2716f7b..7eaf954 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -37,7 +37,7 @@ class Amalgam::Worker::Configuration :worker_timeout => 48 * SECONDS_PER_HOUR, :heartbeat_period => 5 * SECONDS_PER_MINUTE, :sleep_interval => 15 * SECONDS_PER_SECOND, - :tmp_dir => "/tmp", + :tmp_dir => nil, :username => nil, :password => nil, :ssh_key => nil diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index ea3a74d..6fcc334 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -1,6 +1,14 @@ +require 'fileutils' + class Amalgam::Worker::Runner + + attr_reader :result + def initialize(configuration, job_description) + @configuration = configuration + @job_description = job_description @thread = nil + @result = nil end def run @@ -14,9 +22,33 @@ def join @thread = nil end + def terminate + + end + private def thread_main + original_working_dir = Dir.getwd + temp_dir = Dir.mktmpdir(configuration.tmp_dir) + error_caught = fale + begin + @job = Amalgam::Worker::Job.create(@job_description) + @result = @job.run + rescue => err + Amalgam::Worker.logger.error("Worker caught error.") + Amalgam::Worker.logger.error(err.inspect) + error_caught = true + @result = { :return_code => 255 } + end + + Dir.chdir(original_working_dir) + unless error_caught + Amalgam::Worker.logger.error("Leaving the job directory behind.") + Amalgam::Worker.logger.error(temp_dir) + FileUtils.rm_rf(temp_dir) + end end + end From b3f047d305d623a80a3bd34a724b915916d425d9 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 00:21:20 -0500 Subject: [PATCH 19/42] Use the MIT license. --- LICENSE | 21 +++++++++++++++++++++ amalgam-worker.gemspec | 1 + 2 files changed, 22 insertions(+) create mode 100644 LICENSE diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..96b0ddc --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Christopher Kleynhans + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index c2db745..ec4c154 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -7,6 +7,7 @@ Gem::Specification.new do |s| s.authors = [ 'Chris Kleynhans' ] s.email = 'chris@kleynhans.ca' s.homepage = 'http://github.com/TeamAmalgam/worker' + s.license = 'MIT' s.files = ['lib/amalgam.rb', 'lib/amalgam/worker.rb', 'lib/amalgam/worker/manager.rb', From 26583237f0ce49af82f2c13d03beb04d0f80eafc Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 00:22:32 -0500 Subject: [PATCH 20/42] Include the LICENSE file in the gem. --- amalgam-worker.gemspec | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index ec4c154..5c305f1 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -8,7 +8,8 @@ Gem::Specification.new do |s| s.email = 'chris@kleynhans.ca' s.homepage = 'http://github.com/TeamAmalgam/worker' s.license = 'MIT' - s.files = ['lib/amalgam.rb', + s.files = ['LICENSE', + 'lib/amalgam.rb', 'lib/amalgam/worker.rb', 'lib/amalgam/worker/manager.rb', 'lib/amalgam/worker/runner.rb', From aedaa39264a99d875423664f817e1211767c860e Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 00:27:53 -0500 Subject: [PATCH 21/42] Fill out terminate method. --- lib/amalgam/worker/job.rb | 4 ++++ lib/amalgam/worker/runner.rb | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lib/amalgam/worker/job.rb b/lib/amalgam/worker/job.rb index 1b3bacb..1c2056b 100644 --- a/lib/amalgam/worker/job.rb +++ b/lib/amalgam/worker/job.rb @@ -8,6 +8,10 @@ def run raise "Attempt to run abstract job." end + def terminate + raise "Attempt to terminate abstract job." + end + class << self def register_job(identifier, klass) @jobs ||= {} diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index 6fcc334..29de08d 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -9,6 +9,7 @@ def initialize(configuration, job_description) @job_description = job_description @thread = nil @result = nil + @job = nil end def run @@ -23,7 +24,9 @@ def join end def terminate - + unless @job.nil? + @job.terminate + end end private From 499d086d40d2fed5e0bce4d9f67d3a6f834922f0 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 00:58:29 -0500 Subject: [PATCH 22/42] Implemented the SqsQueue type. --- lib/amalgam/worker/queue/sqs_queue.rb | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/amalgam/worker/queue/sqs_queue.rb b/lib/amalgam/worker/queue/sqs_queue.rb index cf5dd08..7751faf 100644 --- a/lib/amalgam/worker/queue/sqs_queue.rb +++ b/lib/amalgam/worker/queue/sqs_queue.rb @@ -1,3 +1,14 @@ class Amalgam::Worker::Queue::SqsQueue + def initialize(sqs_queue_name, idle_timeout) + sqs_client = AWS::SQS.new + @sqs_queue = sqs_client.queues.named(sqs_queue_name) + @idle_timeout = idle_timeout + end + def poll + message = @sqs_queue.poll(:idle_timeout => @idle_timeout) + + return nil if message.nil? + return YAML.safe_load(message.body) + end end From 649fae3fab28c406498ad8f5a833b72d46037763 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 00:59:00 -0500 Subject: [PATCH 23/42] Worker and some fixes. --- bin/amalgam-worker | 3 ++- lib/amalgam/worker.rb | 15 ++++++++++++++- lib/amalgam/worker/configuration.rb | 20 ++++++++++++-------- lib/amalgam/worker/heartbeater.rb | 1 + 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/bin/amalgam-worker b/bin/amalgam-worker index c5ee4e5..e5fee17 100755 --- a/bin/amalgam-worker +++ b/bin/amalgam-worker @@ -1,6 +1,7 @@ #!/usr/bin/env ruby require 'amalgam/worker' +require 'safe_yaml' SafeYAML::OPTIONS[:deserialize_symbols] = true @@ -22,4 +23,4 @@ Signal.trap("HUP") do worker.update_configuration end -worker.join \ No newline at end of file +worker.join diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index b31e63e..005a831 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -1,12 +1,25 @@ +require 'logger' require_relative '../amalgam' class Amalgam::Worker - def initialize + class << self + def logger + @logger ||= Logger.new(STDERR) + return @logger + end + end + def initialize(configuration_path) + @configuration = Amalgam::Worker::Configuration.new(configuration_path) + @manager = Amalgam::Worker::Manager.new(@configuration) end def run + @manager.run + end + def join + @manager.join end end diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index 7eaf954..c1e0d48 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -1,3 +1,5 @@ +require 'aws-sdk' + class Amalgam::Worker::Configuration SECONDS_PER_SECOND = 1 @@ -18,7 +20,8 @@ class Amalgam::Worker::Configuration :ssh_key, :worker_timeout, :heartbeat_period, - :sleep_interval + :sleep_interval, + :idle_timeout ] MANDATORY_SETTINGS = [ @@ -27,8 +30,7 @@ class Amalgam::Worker::Configuration :s3_bucket, :sqs_queue_name, :server_base_url, - :git_repo, - :heartbeat_period, + :git_repo ] NON_MANDATORY_SETTINGS = SETTINGS - MANDATORY_SETTINGS @@ -40,7 +42,8 @@ class Amalgam::Worker::Configuration :tmp_dir => nil, :username => nil, :password => nil, - :ssh_key => nil + :ssh_key => nil, + :idle_timeout => 2 * SECONDS_PER_MINUTE } # All Non-Mandatory settings must have an entry in the @@ -70,7 +73,7 @@ class Amalgam::Worker::Configuration end def initialize(config_file_path) - @attributes.each do |name| + ATTRIBUTES.each do |name| instance_variable_set("@#{name}", nil) end @@ -105,14 +108,15 @@ def load_configuration end def update_global_objects - AWS.config(:access_key_id => @access_key_id, - :secret_access_key => @secret_access_key) + ::AWS.config(:access_key_id => @access_key_id, + :secret_access_key => @secret_access_key) end def update_configuration_objects @uploader = Amalgam::Worker::Uploader.new(@s3_bucket) @downloader = Amalgam::Worker::Downloader.new(@s3_bucket) - @queue = Amalgam::Worker::Queue::SqsQueue.new(@sqs_queue_name) + @queue = Amalgam::Worker::Queue::SqsQueue.new(@sqs_queue_name, + @idle_timeout) @heartbeater = Amalgam::Worker::Heartbeater.new(@server_base_url, @username, @password, diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb index c77aa7a..bbfb6db 100644 --- a/lib/amalgam/worker/heartbeater.rb +++ b/lib/amalgam/worker/heartbeater.rb @@ -1,4 +1,5 @@ require 'socket' +require 'httparty' class Amalgam::Worker::Heartbeater From 9d204f0548f912bdee8baebbb074ba62ecc2923e Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 01:32:55 -0500 Subject: [PATCH 24/42] Fix some issues. --- lib/amalgam/worker.rb | 12 ++++++++++++ lib/amalgam/worker/configuration.rb | 4 +++- lib/amalgam/worker/heartbeater.rb | 3 ++- lib/amalgam/worker/manager.rb | 8 ++++++-- lib/amalgam/worker/runner.rb | 2 ++ 5 files changed, 25 insertions(+), 4 deletions(-) diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index 005a831..7bddb03 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -21,6 +21,18 @@ def run def join @manager.join end + + def termination_request + @manager.request_termination + end + + def terminate_current_job + @manager.terminate_job + end + + def update_configuration + @manager.update_configuration + end end require_relative 'worker/manager' diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index c1e0d48..0be590d 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -74,9 +74,11 @@ class Amalgam::Worker::Configuration def initialize(config_file_path) ATTRIBUTES.each do |name| - instance_variable_set("@#{name}", nil) + default_value = SETTING_DEFAULTS[name] + instance_variable_set("@#{name}", default_value) end + @config_file_path = File.absolute_path(config_file_path) @configuration_mutex = Mutex.new self.reload diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb index bbfb6db..51f6d76 100644 --- a/lib/amalgam/worker/heartbeater.rb +++ b/lib/amalgam/worker/heartbeater.rb @@ -48,13 +48,14 @@ def heartbeat(current_job_id = nil) raise "Not registered." if @worker_id.nil? begin - HTTTParty.post(heartbeat_url, { + HTTParty.post(heartbeat_url, { :body => { :job_id => current_job_id }.to_json, :basic_auth => @auth_params }) rescue => err Amalgam::Worker.logger.error("Manager Failed to Heartbeat") Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) end end diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb index 6f67fc4..f1ddd79 100644 --- a/lib/amalgam/worker/manager.rb +++ b/lib/amalgam/worker/manager.rb @@ -15,7 +15,9 @@ def run end def join - @thread.join + unless @thread.nil? + @thread.join + end @thread = nil end @@ -35,6 +37,7 @@ def thread_main rescue => err Amalgam::Worker.logger.error("Manager failed to register.") Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) end begin @@ -48,6 +51,7 @@ def thread_main rescue => err Amalgam::Worker.logger.error("Manager encountered exception.") Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) ensure unregister end @@ -77,7 +81,7 @@ def poll_for_job return result end - def maybe_do_heartbeat(job_id) + def maybe_do_heartbeat(job_id = nil) if (@time_of_last_heartbeat.nil? || seconds_since_last_heartbeat >= @configuration.heartbeat_period) diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index 29de08d..908079f 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -42,6 +42,8 @@ def thread_main rescue => err Amalgam::Worker.logger.error("Worker caught error.") Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) + error_caught = true @result = { :return_code => 255 } end From 041873eb357db3b605d3d72bcdc1346661fda7c5 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 15:31:54 -0500 Subject: [PATCH 25/42] Make components more modular. --- amalgam-worker.gemspec | 6 + lib/amalgam/worker/configuration.rb | 68 ++++++---- lib/amalgam/worker/downloader.rb | 35 ++++- .../worker/downloader/s3_downloader.rb | 14 ++ .../worker/downloader/test_downloader.rb | 18 +++ lib/amalgam/worker/heartbeater.rb | 112 ++++------------ .../worker/heartbeater/http_heartbeater.rb | 120 ++++++++++++++++++ .../worker/heartbeater/test_heartbeater.rb | 35 +++++ lib/amalgam/worker/manager.rb | 6 + lib/amalgam/worker/queue.rb | 26 ++++ lib/amalgam/worker/queue/sqs_queue.rb | 5 + lib/amalgam/worker/queue/test_queue.rb | 7 +- lib/amalgam/worker/uploader.rb | 35 ++++- lib/amalgam/worker/uploader/s3_uploader.rb | 14 ++ lib/amalgam/worker/uploader/test_uploader.rb | 17 +++ spec/amalgam/worker/queue/test_queue_spec.rb | 2 +- 16 files changed, 404 insertions(+), 116 deletions(-) create mode 100644 lib/amalgam/worker/downloader/s3_downloader.rb create mode 100644 lib/amalgam/worker/downloader/test_downloader.rb create mode 100644 lib/amalgam/worker/heartbeater/http_heartbeater.rb create mode 100644 lib/amalgam/worker/heartbeater/test_heartbeater.rb create mode 100644 lib/amalgam/worker/uploader/s3_uploader.rb create mode 100644 lib/amalgam/worker/uploader/test_uploader.rb diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index 5c305f1..c7ada73 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -15,8 +15,14 @@ Gem::Specification.new do |s| 'lib/amalgam/worker/runner.rb', 'lib/amalgam/worker/configuration.rb', 'lib/amalgam/worker/downloader.rb', + 'lib/amalgam/worker/downloader/s3_downloader.rb', + 'lib/amalgam/worker/downloader/test_downloader.rb', 'lib/amalgam/worker/heartbeater.rb', + 'lib/amalgam/worker/heartbeater/http_heartbeater.rb', + 'lib/amalgam/worker/heartbeater/test_heartbeater.rb', 'lib/amalgam/worker/uploader.rb', + 'lib/amalgam/worker/uploader/s3_uploader.rb', + 'lib/amalgam/worker/uploader/test_uploader.rb', 'lib/amalgam/worker/job.rb', 'lib/amalgam/worker/job/build_job.rb', 'lib/amalgam/worker/job/run_job.rb'] diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index 0be590d..4bef8de 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -8,29 +8,41 @@ class Amalgam::Worker::Configuration SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR SETTINGS = [ + # AWS Credentials. :access_key_id, :secret_access_key, - :s3_bucket, - :sqs_queue_name, - :server_base_url, - :username, - :password, - :tmp_dir, - :git_repo, - :ssh_key, - :worker_timeout, - :heartbeat_period, - :sleep_interval, - :idle_timeout + + :tmp_dir, # The temporary directory to work in. + :git_repo, # URL of the git repo containing moolloy. + :ssh_key, # The ssh key to use when cloning the repo. + :worker_timeout, # Maximum amount of time a worker will spec on a job. + :heartbeat_period, # Time between heartbeats. + :sleep_interval, # Time to sleep between worker checks. + :idle_timeout, # Time to spend polling for jobs before checking other state + + :heartbeater_type, # The type of heartbeater to use. + :uploader_type, # The type of uploader to use. + :downloader_type, # The type of downloader to use. + :queue_type, # The type of queue to use. + + :heartbeater_options, # Options to pass to the heartbeater. + :uploader_options, # Options to pass to the uploader. + :downloader_options, # Options to pass to the downloader. + :queue_options # Options to pass to the queue. ] MANDATORY_SETTINGS = [ :access_key_id, :secret_access_key, - :s3_bucket, - :sqs_queue_name, - :server_base_url, - :git_repo + :git_repo, + :heartbeater_type, + :uploader_type, + :downloader_type, + :queue_type, + :heartbeater_options, + :uploader_options, + :downloader_options, + :queue_options ] NON_MANDATORY_SETTINGS = SETTINGS - MANDATORY_SETTINGS @@ -40,10 +52,8 @@ class Amalgam::Worker::Configuration :heartbeat_period => 5 * SECONDS_PER_MINUTE, :sleep_interval => 15 * SECONDS_PER_SECOND, :tmp_dir => nil, - :username => nil, - :password => nil, - :ssh_key => nil, - :idle_timeout => 2 * SECONDS_PER_MINUTE + :idle_timeout => 2 * SECONDS_PER_MINUTE, + :ssh_key => nil } # All Non-Mandatory settings must have an entry in the @@ -115,14 +125,16 @@ def update_global_objects end def update_configuration_objects - @uploader = Amalgam::Worker::Uploader.new(@s3_bucket) - @downloader = Amalgam::Worker::Downloader.new(@s3_bucket) - @queue = Amalgam::Worker::Queue::SqsQueue.new(@sqs_queue_name, - @idle_timeout) - @heartbeater = Amalgam::Worker::Heartbeater.new(@server_base_url, - @username, - @password, - @heartbeater) + @uploader = Amalgam::Worker::Uploader.create(@uploader_type, + @uploader_options, + @uploader) + @downloader = Amalgam::Worker::Downloader.create(@downloader_type, + @downloader_options, + @downloader) + @queue = Amalgam::Worker::Queue.create(@queue_type, @queue_options, @queue) + @heartbeater = Amalgam::Worker::Heartbeater.create(@heartbeater_type, + @heartbeater_options, + @heartbeater) end def validate_configuration_hash(hash) diff --git a/lib/amalgam/worker/downloader.rb b/lib/amalgam/worker/downloader.rb index e98d73a..857750a 100644 --- a/lib/amalgam/worker/downloader.rb +++ b/lib/amalgam/worker/downloader.rb @@ -1,5 +1,38 @@ class Amalgam::Worker::Downloader - def initialize(s3_bucket_name) + def initialize + raise "Attempt to initialize abstract Downloader." + end + + def download(key, destination_path) + raise "Call to abstract Downloader download." + end + + class << self + def register_downloader(identifier, klass) + @downloaders ||= {} + + unless @downloaders[identifier].nil? + raise "Downloader with identifier '#{identifier} already registered." + end + @downloaders[identifier] = klass + end + + def unregister_downloader(identifier) + @downloaders[identifier] = nil + end + + def create(identifier, options, previous_downloader = nil) + downloaders = @downloaders || {} + + if downloaders[identifier].nil? + raise "No downloader type registered for identifier '#{identifier}." + end + + return downloaders[identifier].new(options, previous_downloader) + end end end + +require_relative "downloader/test_downloader" +require_relative "downloader/s3_downloader" diff --git a/lib/amalgam/worker/downloader/s3_downloader.rb b/lib/amalgam/worker/downloader/s3_downloader.rb new file mode 100644 index 0000000..dedd172 --- /dev/null +++ b/lib/amalgam/worker/downloader/s3_downloader.rb @@ -0,0 +1,14 @@ +class Amalgam::Worker::Downloader::S3Downloader + def initialize(options, old_downloader = nil) + @s3_bucket = options[:s3_bucket] + end + + def download(key, destination_path) + raise "Not implemented yet." + end +end + +Amalgam::Worker::Downloader.register_downloader( + :s3, + Amalgam::Worker::Downloader::S3Downloader +) \ No newline at end of file diff --git a/lib/amalgam/worker/downloader/test_downloader.rb b/lib/amalgam/worker/downloader/test_downloader.rb new file mode 100644 index 0000000..b3029e7 --- /dev/null +++ b/lib/amalgam/worker/downloader/test_downloader.rb @@ -0,0 +1,18 @@ +require 'fileutils' +class Amalgam::Worker::Downloader::TestDownloader + def initialize(file_map, old_downloader) + @file_map = file_map || {} + end + + def download(key, destination_path) + raise "No such key." if @file_map[key].nil? + + source_path = @file_map[key] + FileUtils.copy_file(source_path, destination_path) + end +end + +Amalgam::Worker::Downloader.register_downloader( + :test, + Amalgam::Worker::Downloader::TestDownloader +) \ No newline at end of file diff --git a/lib/amalgam/worker/heartbeater.rb b/lib/amalgam/worker/heartbeater.rb index 51f6d76..baeb862 100644 --- a/lib/amalgam/worker/heartbeater.rb +++ b/lib/amalgam/worker/heartbeater.rb @@ -1,114 +1,58 @@ -require 'socket' -require 'httparty' - class Amalgam::Worker::Heartbeater attr_reader :worker_id - def initialize(server_base_url, username, password, old_heartbeater = nil) - @server_base_url = server_base_url - - @worker_id = nil - unless old_heartbeater.nil? - @worker_id = old_heartbeater.worker_id - end - - @auth_params = nil - unless username.nil? && password.nil? - @auth_params = { - :username => username, - :password => password - } - end + def initialize + raise "Attempt to initialize abstract Heartbeater." end def register - raise "Already registered." unless @worker_id.nil? - - Amalgam::Worker.logger.info("Registering with server.") - - response = HTTParty.post(register_url, { - :body => { :hostname => hostname }.to_json, - :basic_auth => @auth_params - }) - - parsed_response = JSON.parse(response.parsed_response) - - # Validate the response. - - if parsed_response["worker_id"].nil? - raise "Response did not contain worker_id." - end - - @worker_id = parsed_response["worker_id"].to_i + raise "Attempt to invoke abstract method register." end def heartbeat(current_job_id = nil) - raise "Not registered." if @worker_id.nil? - - begin - HTTParty.post(heartbeat_url, { - :body => { :job_id => current_job_id }.to_json, - :basic_auth => @auth_params - }) - rescue => err - Amalgam::Worker.logger.error("Manager Failed to Heartbeat") - Amalgam::Worker.logger.error(err.inspect) - Amalgam::Worker.logger.error(err.backtrace.join("\n")) - end + raise "Attempt to invoke abstract method heartbeat." end def signal_start(job_id) - raise "Not registered." if @worker_id.nil? - Amalgam::Worker.logger.info("Signalling start to server.") - HTTParty.post(start_url(job_id), { - :basic_auth => @auth_params - }) + raise "Attempt to invoke abstract method signal_start." end def signal_completion(job_id, result_body) - raise "Not registered." if @worker_id.nil? - Amalgam::Worker.logger.info("Signalling completion to server.") - HTTParty.post(completion_url(job_id), { - :body => result_body.to_json, - :basic_auth => @auth_params - }) + raise "Attempt to invoke abstract method signal_completion." end def unregister - raise "Not registered." if @worker_id.nil? - Amalgam::Worker.logger.info("Unregistering with server.") - - HTTParty.post(unregister_url, { - :basic_auth => @auth_params - }) + raise "Attempt to invoke abstract method unregister." end - private + class << self + def register_heartbeater(identifier, klass) + @heartbeaters ||= {} - def hostname - Socket.gethostname - end + unless @heartbeaters[identifier].nil? + raise "Heartbeater with identifier '#{identifier}' already registered." + end - def register_url - "#{@server_base_url}/workers/register" - end + @heartbeaters[identifier] = klass + end - def unregister_url - "#{@server_base_url}/workers/#{@worker_id}/unregister" - end + def unregister_queue(identifier) + @heartbeaters[identifier] = nil + end - def heartbeat_url - "#{@server_base_url}/workers/#{@worker_id}/heartbeat" - end + def create(identifier, options, previous_queue = nil) + heartbeaters = @heartbeaters || {} - def start_url(job_id) - "#{@server_base_url}/jobs/#{job_id}/start" - end + if heartbeaters[identifier].nil? + raise "No heartbeater type registered for identifier '#{identifier}'." + end - def completion_url(job_id) - "#{@server_base_url}/jobs/#{job_id}/complete" + return heartbeaters[identifier].new(options, previous_queue) + end end - end + +require_relative "heartbeater/http_heartbeater" +require_relative "heartbeater/test_heartbeater" diff --git a/lib/amalgam/worker/heartbeater/http_heartbeater.rb b/lib/amalgam/worker/heartbeater/http_heartbeater.rb new file mode 100644 index 0000000..72d9db3 --- /dev/null +++ b/lib/amalgam/worker/heartbeater/http_heartbeater.rb @@ -0,0 +1,120 @@ +require 'socket' +require 'httparty' + +class Amalgam::Worker::Heartbeater::HttpHeartbeater + + attr_reader :worker_id + + def initialize(options, old_heartbeater = nil) + @server_base_url = options[:server_base_url] + + @worker_id = nil + unless old_heartbeater.nil? + @worker_id = old_heartbeater.worker_id + end + + @auth_params = nil + unless options[:username].nil? && options[:password].nil? + @auth_params = { + :username => options[:username], + :password => options[:password] + } + end + end + + + def register + raise "Already registered." unless @worker_id.nil? + + Amalgam::Worker.logger.info("Registering with server.") + + response = HTTParty.post(register_url, { + :body => { :hostname => hostname }.to_json, + :basic_auth => @auth_params + }) + + parsed_response = JSON.parse(response.parsed_response) + + # Validate the response. + + if parsed_response["worker_id"].nil? + raise "Response did not contain worker_id." + end + + @worker_id = parsed_response["worker_id"].to_i + end + + def heartbeat(current_job_id = nil) + raise "Not registered." if @worker_id.nil? + + begin + HTTParty.post(heartbeat_url, { + :body => { :job_id => current_job_id }.to_json, + :basic_auth => @auth_params + }) + rescue => err + Amalgam::Worker.logger.error("Manager Failed to Heartbeat") + Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) + end + end + + def signal_start(job_id) + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Signalling start to server.") + HTTParty.post(start_url(job_id), { + :basic_auth => @auth_params + }) + end + + def signal_completion(job_id, result_body) + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Signalling completion to server.") + HTTParty.post(completion_url(job_id), { + :body => result_body.to_json, + :basic_auth => @auth_params + }) + end + + def unregister + raise "Not registered." if @worker_id.nil? + Amalgam::Worker.logger.info("Unregistering with server.") + + HTTParty.post(unregister_url, { + :basic_auth => @auth_params + }) + end + + private + + def hostname + Socket.gethostname + end + + def register_url + "#{@server_base_url}/workers/register" + end + + def unregister_url + "#{@server_base_url}/workers/#{@worker_id}/unregister" + end + + def heartbeat_url + "#{@server_base_url}/workers/#{@worker_id}/heartbeat" + end + + def start_url(job_id) + "#{@server_base_url}/jobs/#{job_id}/start" + end + + def completion_url(job_id) + "#{@server_base_url}/jobs/#{job_id}/complete" + end + +end + + +Amalgam::Worker::Heartbeater.register_heartbeater( + :http, + Amalgam::Worker::Heartbeater::HttpHeartbeater +) \ No newline at end of file diff --git a/lib/amalgam/worker/heartbeater/test_heartbeater.rb b/lib/amalgam/worker/heartbeater/test_heartbeater.rb new file mode 100644 index 0000000..e1d1ac6 --- /dev/null +++ b/lib/amalgam/worker/heartbeater/test_heartbeater.rb @@ -0,0 +1,35 @@ +class Amalgam::Worker::Heartbeater::TestHeartbeater + + attr_reader :worker_id + + def initialize(options, old_heartbeater = nil) + @worker_id = nil + + unless old_heartbeater.nil? + @worker_id = old_heartbeater.worker_id + end + end + + def register + @worker_id = 1234 + end + + def heartbeat(current_job_id = nil) + end + + def signal_start(job_id) + end + + def signal_completion(job_id, result_body) + end + + def unregister + @worker_id = nil + end + +end + +Amalgam::Worker::Heartbeater.register_heartbeater( + :test, + Amalgam::Worker::Heartbeater::TestHeartbeater +) \ No newline at end of file diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb index f1ddd79..a23aac8 100644 --- a/lib/amalgam/worker/manager.rb +++ b/lib/amalgam/worker/manager.rb @@ -32,19 +32,24 @@ def terminate_current_job private def thread_main + Amalgam::Worker.logger.info("Manager trying to register.") begin register rescue => err Amalgam::Worker.logger.error("Manager failed to register.") Amalgam::Worker.logger.error(err.inspect) Amalgam::Worker.logger.error(err.backtrace.join("\n")) + return end + Amalgam::Worker.logger.info("Manager registered successfully.") + begin while (!@termination_requested) job = poll_for_job unless job.nil? + Amalgam::Worker.logger.info("Got job.") run_job(job) end end @@ -53,6 +58,7 @@ def thread_main Amalgam::Worker.logger.error(err.inspect) Amalgam::Worker.logger.error(err.backtrace.join("\n")) ensure + Amalgam::Worker.logger.info("Manager unregistering.") unregister end end diff --git a/lib/amalgam/worker/queue.rb b/lib/amalgam/worker/queue.rb index c0a0744..402a3dd 100644 --- a/lib/amalgam/worker/queue.rb +++ b/lib/amalgam/worker/queue.rb @@ -2,6 +2,32 @@ class Amalgam::Worker::Queue def poll raise "Attempt to poll an abstract queue." end + + class << self + def register_queue(identifier, klass) + @queues ||= {} + + unless @queues[identifier].nil? + raise "Queue with identiier '#{identifier}' already registered." + end + + @queues[identifier] = klass + end + + def unregister_queue(identifier) + @queues[identifier] = nil + end + + def create(identifier, options, previous_queue = nil) + queues = @queues || {} + + if queues[identifier].nil? + raise "No queue type registered for identifier '#{identifier}'." + end + + return queues[identifier].new(options, previous_queue) + end + end end require_relative 'queue/sqs_queue' diff --git a/lib/amalgam/worker/queue/sqs_queue.rb b/lib/amalgam/worker/queue/sqs_queue.rb index 7751faf..b9bbfc1 100644 --- a/lib/amalgam/worker/queue/sqs_queue.rb +++ b/lib/amalgam/worker/queue/sqs_queue.rb @@ -12,3 +12,8 @@ def poll return YAML.safe_load(message.body) end end + +Amalgam::Worker::Queue.register_queue( + :sqs, + Amalgam::Worker::Queue::SqsQueue +) \ No newline at end of file diff --git a/lib/amalgam/worker/queue/test_queue.rb b/lib/amalgam/worker/queue/test_queue.rb index 57df5cb..59bf6d1 100644 --- a/lib/amalgam/worker/queue/test_queue.rb +++ b/lib/amalgam/worker/queue/test_queue.rb @@ -1,6 +1,6 @@ class Amalgam::Worker::Queue::TestQueue - def initialize + def initialize(options, old_queue) @items = [] end @@ -13,3 +13,8 @@ def poll end end + +Amalgam::Worker::Queue.register_queue( + :test, + Amalgam::Worker::Queue::TestQueue +) \ No newline at end of file diff --git a/lib/amalgam/worker/uploader.rb b/lib/amalgam/worker/uploader.rb index 949f707..fd39a8e 100644 --- a/lib/amalgam/worker/uploader.rb +++ b/lib/amalgam/worker/uploader.rb @@ -1,5 +1,38 @@ class Amalgam::Worker::Uploader - def initialize(s3_bucket_name) + def initialize + raise "Attempt to initialize abstract Uploader." + end + + def upload(src_path, key) + raise "Attempt to invoke abstract method upload." + end + + class << self + def register_uploader(identifier, klass) + @uploaders ||= {} + + unless @uploaders[identifier].nil? + raise "Uploader with identifier '#{identifier}' already registered." + end + @uploaders[identifier] = klass + end + + def unregister_uploader(identifier) + @uploaders[identifier] = nil + end + + def create(identifier, options, previous_uploader = nil) + uploaders = @uploaders || {} + + if uploaders[identifier].nil? + raise "No uploader type registered for identifier '#{identifier}'." + end + + return uploaders[identifier].new(options, previous_uploader) + end end end + +require_relative "uploader/test_uploader" +require_relative "uploader/s3_uploader" diff --git a/lib/amalgam/worker/uploader/s3_uploader.rb b/lib/amalgam/worker/uploader/s3_uploader.rb new file mode 100644 index 0000000..fe7845a --- /dev/null +++ b/lib/amalgam/worker/uploader/s3_uploader.rb @@ -0,0 +1,14 @@ +class Amalgam::Worker::Uploader::S3Uploader + def initialize(options, old_uploader) + @s3_bucket = options[:s3_bucket] + end + + def upload(src_path, key) + raise "Not implemented yet." + end +end + +Amalgam::Worker::Uploader.register_uploader( + :s3, + Amalgam::Worker::Uploader::S3Uploader +) \ No newline at end of file diff --git a/lib/amalgam/worker/uploader/test_uploader.rb b/lib/amalgam/worker/uploader/test_uploader.rb new file mode 100644 index 0000000..6406dd6 --- /dev/null +++ b/lib/amalgam/worker/uploader/test_uploader.rb @@ -0,0 +1,17 @@ +require 'fileutils' +class Amalgam::Worker::Uploader::TestUploader + def initialize(options, old_uploader = nil) + @destination_directory = options[:destination_directory] + end + + def upload(src_path, key) + dest_path = File.join(@destination_directory, key) + FileUtils.mkdir_p(File.dirname(dest_path)) + FileUtils.copy_file(src_path, dest_path) + end +end + +Amalgam::Worker::Uploader.register_uploader( + :test, + Amalgam::Worker::Uploader::TestUploader +) \ No newline at end of file diff --git a/spec/amalgam/worker/queue/test_queue_spec.rb b/spec/amalgam/worker/queue/test_queue_spec.rb index 66ce003..7916b21 100644 --- a/spec/amalgam/worker/queue/test_queue_spec.rb +++ b/spec/amalgam/worker/queue/test_queue_spec.rb @@ -3,7 +3,7 @@ describe Amalgam::Worker::Queue::TestQueue do before :all do - @queue = Amalgam::Worker::Queue::TestQueue.new + @queue = Amalgam::Worker::Queue.create(:test, nil) end it 'should return the job descriptions added in order' do From 144a27438b0188d07a50b525afc2a3aa30e6c802 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 16:31:01 -0500 Subject: [PATCH 26/42] Everything except for jobs seems to work. --- lib/amalgam/worker/job.rb | 2 +- lib/amalgam/worker/job/build_job.rb | 10 ++++-- lib/amalgam/worker/job/run_job.rb | 8 ++++- lib/amalgam/worker/manager.rb | 9 ++++- lib/amalgam/worker/queue/test_queue.rb | 12 +++++++ lib/amalgam/worker/runner.rb | 50 ++++++++++++++++---------- 6 files changed, 67 insertions(+), 24 deletions(-) diff --git a/lib/amalgam/worker/job.rb b/lib/amalgam/worker/job.rb index 1c2056b..ece1f24 100644 --- a/lib/amalgam/worker/job.rb +++ b/lib/amalgam/worker/job.rb @@ -37,7 +37,7 @@ def create(job_description) end if @jobs[job_identifier].nil? - raise "No job type registered for identifier '#{identifier}'" + raise "No job type registered for identifier '#{job_identifier}'" end return @jobs[job_identifier].new(job_description) diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb index 5f60750..745d033 100644 --- a/lib/amalgam/worker/job/build_job.rb +++ b/lib/amalgam/worker/job/build_job.rb @@ -1,5 +1,11 @@ -class Amalgam::Worker::Job::BuildJob +class Amalgam::Worker::Job::BuildJob < Amalgam::Worker::Job + def initialize(job_description) + Amalgam::Worker.logger.info("Creating BuildJob") + end + def run + Amalgam::Worker.logger.info("Running BuildJob") + end end -Amalgam::Worker::Job.register_job('build', Amalgam::Worker::Job::BuildJob) +Amalgam::Worker::Job.register_job(:build, Amalgam::Worker::Job::BuildJob) diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index eb6bb30..0105e8a 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -1,5 +1,11 @@ class Amalgam::Worker::Job::RunJob < Amalgam::Worker::Job + def initialize(job_description) + Amalgam::Worker.logger.info("Creating RunJob") + end + def run + Amalgam::Worker.logger.info("Running RunJob") + end end -Amalgam::Worker::Job.register_job('run', Amalgam::Worker::Job::RunJob) +Amalgam::Worker::Job.register_job(:run, Amalgam::Worker::Job::RunJob) diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb index a23aac8..3e0a416 100644 --- a/lib/amalgam/worker/manager.rb +++ b/lib/amalgam/worker/manager.rb @@ -46,6 +46,7 @@ def thread_main begin while (!@termination_requested) + Amalgam::Worker.logger.info("Polling for job.") job = poll_for_job unless job.nil? @@ -110,6 +111,7 @@ def seconds_since_last_heartbeat def run_job(job_description) job_id = job_description[:job_id] + Amalgam::Worker.logger.info("Signalling start of job: #{job_id}") @configuration.heartbeater.signal_start(job_id) # Ensure that a previous job termination will not carry-over @@ -120,7 +122,7 @@ def run_job(job_description) @job_start = Time.now - while (@runner.running) + while (@runner.running?) maybe_do_heartbeat(job_id) maybe_update_configuration maybe_terminate_job @@ -128,7 +130,11 @@ def run_job(job_description) sleep(@configuration.sleep_interval) end + @runner.join result = @runner.result + + Amalgam::Worker.logger.info("Job Completed, Result: #{result}") + Amalgam::Worker.logger.info("Signalling compleion of job: #{job_id}") @configuration.heartbeater.signal_completion(job_id, result) end @@ -137,6 +143,7 @@ def seconds_since_job_start end def maybe_terminate_job + Amalgam::Worker.logger.info("Job has timed-out.") if (seconds_since_job_start >= @configuration.worker_timeout || @job_termination_requested) diff --git a/lib/amalgam/worker/queue/test_queue.rb b/lib/amalgam/worker/queue/test_queue.rb index 59bf6d1..1cf4f6e 100644 --- a/lib/amalgam/worker/queue/test_queue.rb +++ b/lib/amalgam/worker/queue/test_queue.rb @@ -1,7 +1,19 @@ class Amalgam::Worker::Queue::TestQueue def initialize(options, old_queue) + Amalgam::Worker.logger.info("Created test queue") @items = [] + + unless options[:start_items].nil? + Amalgam::Worker.logger.info("Start items are:") + options[:start_items].each do |item| + Amalgam::Worker.logger.info("\t" + item.inspect) + end + + options[:start_items].each do |item| + enqueue(item) + end + end end def enqueue(job_description) diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index 908079f..b2b6a88 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -1,4 +1,5 @@ require 'fileutils' +require 'tmpdir' class Amalgam::Worker::Runner @@ -10,6 +11,7 @@ def initialize(configuration, job_description) @thread = nil @result = nil @job = nil + @running = false end def run @@ -29,30 +31,40 @@ def terminate end end + def running? + return @running + end + private def thread_main - original_working_dir = Dir.getwd - temp_dir = Dir.mktmpdir(configuration.tmp_dir) - error_caught = fale - begin - @job = Amalgam::Worker::Job.create(@job_description) - @result = @job.run - rescue => err - Amalgam::Worker.logger.error("Worker caught error.") - Amalgam::Worker.logger.error(err.inspect) - Amalgam::Worker.logger.error(err.backtrace.join("\n")) - - error_caught = true - @result = { :return_code => 255 } - end + @running = true + + original_working_dir = Dir.getwd + temp_dir = Dir.mktmpdir(@configuration.tmp_dir) + error_caught = false + + begin + @job = Amalgam::Worker::Job.create(@job_description) + @result = @job.run + rescue => err + Amalgam::Worker.logger.error("Worker caught error.") + Amalgam::Worker.logger.error(err.inspect) + Amalgam::Worker.logger.error(err.backtrace.join("\n")) + + error_caught = true + @result = { :return_code => 255 } + end - Dir.chdir(original_working_dir) - unless error_caught - Amalgam::Worker.logger.error("Leaving the job directory behind.") - Amalgam::Worker.logger.error(temp_dir) - FileUtils.rm_rf(temp_dir) + Dir.chdir(original_working_dir) + if error_caught + Amalgam::Worker.logger.error("Leaving the job directory behind.") + Amalgam::Worker.logger.error(temp_dir) + FileUtils.rm_rf(temp_dir) + end + ensure + @running = false end end From 9a723d11a2d3c92dd6b1b32026f3c07a190f1c1d Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 16:45:28 -0500 Subject: [PATCH 27/42] Fix some things. --- lib/amalgam/worker.rb | 4 ++++ lib/amalgam/worker/job.rb | 4 ++-- lib/amalgam/worker/job/build_job.rb | 4 +++- lib/amalgam/worker/job/run_job.rb | 4 +++- lib/amalgam/worker/queue/test_queue.rb | 2 ++ spec/amalgam/worker/job_spec.rb | 12 +++++++++--- spec/amalgam/worker/queue/test_queue_spec.rb | 6 ++++++ 7 files changed, 29 insertions(+), 7 deletions(-) diff --git a/lib/amalgam/worker.rb b/lib/amalgam/worker.rb index 7bddb03..4ec2f52 100644 --- a/lib/amalgam/worker.rb +++ b/lib/amalgam/worker.rb @@ -3,6 +3,10 @@ class Amalgam::Worker class << self + def logger=(value) + @logger = value + end + def logger @logger ||= Logger.new(STDERR) return @logger diff --git a/lib/amalgam/worker/job.rb b/lib/amalgam/worker/job.rb index ece1f24..21f99f5 100644 --- a/lib/amalgam/worker/job.rb +++ b/lib/amalgam/worker/job.rb @@ -27,7 +27,7 @@ def unregister_job(identifier) @jobs[identifier] = nil end - def create(job_description) + def create(job_description, configuration) jobs = @jobs || {} job_identifier = job_description[:type] @@ -40,7 +40,7 @@ def create(job_description) raise "No job type registered for identifier '#{job_identifier}'" end - return @jobs[job_identifier].new(job_description) + return @jobs[job_identifier].new(job_description, configuration) end end diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb index 745d033..18fb83d 100644 --- a/lib/amalgam/worker/job/build_job.rb +++ b/lib/amalgam/worker/job/build_job.rb @@ -1,6 +1,8 @@ class Amalgam::Worker::Job::BuildJob < Amalgam::Worker::Job - def initialize(job_description) + def initialize(job_description, configuration) Amalgam::Worker.logger.info("Creating BuildJob") + @job_description = job_description + @configuration = configuration end def run diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index 0105e8a..ef4f2b4 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -1,6 +1,8 @@ class Amalgam::Worker::Job::RunJob < Amalgam::Worker::Job - def initialize(job_description) + def initialize(job_description, configuration) Amalgam::Worker.logger.info("Creating RunJob") + @job_description = job_description + @configuration = configuration end def run diff --git a/lib/amalgam/worker/queue/test_queue.rb b/lib/amalgam/worker/queue/test_queue.rb index 1cf4f6e..075fcce 100644 --- a/lib/amalgam/worker/queue/test_queue.rb +++ b/lib/amalgam/worker/queue/test_queue.rb @@ -2,6 +2,8 @@ class Amalgam::Worker::Queue::TestQueue def initialize(options, old_queue) Amalgam::Worker.logger.info("Created test queue") + + options ||= {} @items = [] unless options[:start_items].nil? diff --git a/spec/amalgam/worker/job_spec.rb b/spec/amalgam/worker/job_spec.rb index abadb86..7a75a89 100644 --- a/spec/amalgam/worker/job_spec.rb +++ b/spec/amalgam/worker/job_spec.rb @@ -1,7 +1,8 @@ require 'amalgam/worker' +require 'logger' class SampleJob < Amalgam::Worker::Job - def initialize(job_description) + def initialize(job_description, configuration) end @@ -11,6 +12,7 @@ def run describe Amalgam::Worker::Job do before :all do + Amalgam::Worker.logger = Logger.new('/dev/null') Amalgam::Worker::Job.unregister_job('asdf') end @@ -18,6 +20,10 @@ def run Amalgam::Worker::Job.unregister_job('asdf') end + after :all do + Amalgam::Worker.logger = nil + end + it "disallows duplicate identifier registrations" do Amalgam::Worker::Job.register_job('asdf', SampleJob) @@ -28,14 +34,14 @@ def run it "creates the correctly registered job" do Amalgam::Worker::Job.register_job('asdf', SampleJob) - job = Amalgam::Worker::Job.create({:type => 'asdf'}) + job = Amalgam::Worker::Job.create({:type => 'asdf'}, nil) expect(job).to be_an_instance_of(SampleJob) end it "should raise an error when creating an unregistered job" do expect { - Amalgam::Worker::Job.create({:type => 'asdf'}) + Amalgam::Worker::Job.create({:type => 'asdf'}, nil) }.to raise_error end end diff --git a/spec/amalgam/worker/queue/test_queue_spec.rb b/spec/amalgam/worker/queue/test_queue_spec.rb index 7916b21..b814e97 100644 --- a/spec/amalgam/worker/queue/test_queue_spec.rb +++ b/spec/amalgam/worker/queue/test_queue_spec.rb @@ -1,11 +1,17 @@ require 'amalgam/worker' +require 'logger' describe Amalgam::Worker::Queue::TestQueue do before :all do + Amalgam::Worker.logger = Logger.new('/dev/null') @queue = Amalgam::Worker::Queue.create(:test, nil) end + after :all do + Amalgam::Worker.logger = nil + end + it 'should return the job descriptions added in order' do a = {:type => 'build'} b = {:type => 'test'} From a6f1180b2ead7703eb099963f92896c6c76640cc Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 17:44:17 -0500 Subject: [PATCH 28/42] Building works. --- lib/amalgam/worker/job/build_job.rb | 115 ++++++++++++++++++++++++++++ lib/amalgam/worker/job/run_job.rb | 4 + lib/amalgam/worker/runner.rb | 10 ++- 3 files changed, 128 insertions(+), 1 deletion(-) diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb index 18fb83d..a357543 100644 --- a/lib/amalgam/worker/job/build_job.rb +++ b/lib/amalgam/worker/job/build_job.rb @@ -1,12 +1,127 @@ class Amalgam::Worker::Job::BuildJob < Amalgam::Worker::Job def initialize(job_description, configuration) Amalgam::Worker.logger.info("Creating BuildJob") + validate_job(job_description) + @job_description = job_description @configuration = configuration end def run + run_main + end + + def terminate + # Need to implement + end + + private + + REQUIRED_PARAMETERS = [ + :commit + ] + + def validate_job(job_description) + REQUIRED_PARAMETERS.each do |param| + if job_description[param].nil? + raise "Job missing required parameter: #{param}" + end + end + end + + def run_with_ssh_key(cmd) + `ssh-agent bash -c 'ssh-add #{@configuration.ssh_key}; #{cmd}'` + end + + def run_main Amalgam::Worker.logger.info("Running BuildJob") + working_directory = Dir.getwd + + # Clone the repo. + repo_url = @configuration.git_repo + clone_results = run_with_ssh_key("git clone #{repo_url} moolloy") + if $? != 0 + return { + :return_code => $?, + :error_message => "Failed to clone git repo.", + :error_details => clone_results + } + end + + begin + Dir.chdir(File.join(working_directory, "moolloy")) + + # Checkout the commit we want. + commit = @job_description[:commit] + checkout_results = `git checkout #{commit}` + if $? != 0 + return { + :return_code => $?, + :error_message => "Failed to checkout commit.", + :error_details => checkout_results + } + end + + # Git submodule init / update + + submodule_results = `git submodule init` + if $? != 0 + return { + :return_code => $?, + :error_message => "Failed to submodule init.", + :error_details => submodule_results + } + end + + submodule_resuls = run_with_ssh_key('git submodule update') + if $? != 0 + return { + :return_code => $?, + :error_message => "Failed to submodule update.", + :error_details => submodule_results + } + end + + # ant dist + + build_results = `ant deps && ant configure && ant dist` + if $? != 0 + return { + :return_code => $?, + :error_message => "Failed to build.", + :error_details => build_results + } + end + + # Sanity test + + alloy_path = File.join(working_directory, + "moolloy", + "dist", + "alloy-dev.jar") + unless File.exists?(alloy_path) + return { + :return_code => 255, + :error_message => "Build failed to produce moolloy.", + :error_details => "Unable to find moolloy at: #{alloy_path}" + } + end + + # Upload jar + + uploader = @configuration.uploader + uploader.upload(alloy_path, "builds/#{commit}.jar") + + # Generate result + + return { + :return_code => 0, + :build_key => "builds/#{commit}.jar" + } + + ensure + Dir.chdir(working_directory) + end end end diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index ef4f2b4..7cf0c91 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -7,6 +7,10 @@ def initialize(job_description, configuration) def run Amalgam::Worker.logger.info("Running RunJob") + + return { + :return_code => 255 + } end end diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index b2b6a88..ac950ac 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -42,11 +42,14 @@ def thread_main @running = true original_working_dir = Dir.getwd + temp_dir = Dir.mktmpdir(@configuration.tmp_dir) + Dir.chdir(temp_dir) + error_caught = false begin - @job = Amalgam::Worker::Job.create(@job_description) + @job = Amalgam::Worker::Job.create(@job_description, @configuration) @result = @job.run rescue => err Amalgam::Worker.logger.error("Worker caught error.") @@ -57,10 +60,15 @@ def thread_main @result = { :return_code => 255 } end + if !@result[:return_code].nil? && @result[:return_code] != 0 + error_caught = true + end + Dir.chdir(original_working_dir) if error_caught Amalgam::Worker.logger.error("Leaving the job directory behind.") Amalgam::Worker.logger.error(temp_dir) + else FileUtils.rm_rf(temp_dir) end ensure From ca91dbf974fd73d0642e58390aab46b721b83cab Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 19:24:26 -0500 Subject: [PATCH 29/42] Build and Run both work with test interfaces. --- .../worker/downloader/test_downloader.rb | 8 +- lib/amalgam/worker/job/build_job.rb | 16 +- lib/amalgam/worker/job/run_job.rb | 169 +++++++++++++++++- 3 files changed, 180 insertions(+), 13 deletions(-) diff --git a/lib/amalgam/worker/downloader/test_downloader.rb b/lib/amalgam/worker/downloader/test_downloader.rb index b3029e7..be81340 100644 --- a/lib/amalgam/worker/downloader/test_downloader.rb +++ b/lib/amalgam/worker/downloader/test_downloader.rb @@ -1,13 +1,11 @@ require 'fileutils' class Amalgam::Worker::Downloader::TestDownloader - def initialize(file_map, old_downloader) - @file_map = file_map || {} + def initialize(options, old_downloader) + @source_directory = options[:source_directory] end def download(key, destination_path) - raise "No such key." if @file_map[key].nil? - - source_path = @file_map[key] + source_path = File.join(@source_directory, key) FileUtils.copy_file(source_path, destination_path) end end diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb index a357543..e74a2af 100644 --- a/lib/amalgam/worker/job/build_job.rb +++ b/lib/amalgam/worker/job/build_job.rb @@ -39,7 +39,8 @@ def run_main # Clone the repo. repo_url = @configuration.git_repo - clone_results = run_with_ssh_key("git clone #{repo_url} moolloy") + Amalgam::Worker.logger.info("Cloning git repo.") + clone_results = run_with_ssh_key("git clone #{repo_url} moolloy 2>&1") if $? != 0 return { :return_code => $?, @@ -53,7 +54,8 @@ def run_main # Checkout the commit we want. commit = @job_description[:commit] - checkout_results = `git checkout #{commit}` + Amalgam::Worker.logger.info("Checking out commit: #{commit}") + checkout_results = `git checkout #{commit} 2>&1` if $? != 0 return { :return_code => $?, @@ -64,7 +66,8 @@ def run_main # Git submodule init / update - submodule_results = `git submodule init` + Amalgam::Worker.logger.info("Updating submodules.") + submodule_results = `git submodule init 2>&1` if $? != 0 return { :return_code => $?, @@ -73,7 +76,7 @@ def run_main } end - submodule_resuls = run_with_ssh_key('git submodule update') + submodule_results = run_with_ssh_key('git submodule update 2>&1') if $? != 0 return { :return_code => $?, @@ -84,7 +87,8 @@ def run_main # ant dist - build_results = `ant deps && ant configure && ant dist` + Amalgam::Worker.logger.info("Building...") + build_results = `ant deps 2>&1 && ant configure 2>&1 && ant dist 2>&1` if $? != 0 return { :return_code => $?, @@ -92,6 +96,7 @@ def run_main :error_details => build_results } end + Amalgam::Worker.logger.info("Build Complete.") # Sanity test @@ -109,6 +114,7 @@ def run_main # Upload jar + Amalgam::Worker.logger.info("Uploading jar.") uploader = @configuration.uploader uploader.upload(alloy_path, "builds/#{commit}.jar") diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index 7cf0c91..fbfd738 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -1,16 +1,179 @@ +require 'benchmark' +require 'fileutils' +require 'digest' + class Amalgam::Worker::Job::RunJob < Amalgam::Worker::Job def initialize(job_description, configuration) Amalgam::Worker.logger.info("Creating RunJob") + validate_job(job_description) + @job_description = job_description @configuration = configuration end def run + run_main + end + + def terminate + # Need to implement + end + + private + + REQUIRED_PARAMETERS = [ + :jar_file_key, + :model_file_key + ] + + def validate_job(job_description) + REQUIRED_PARAMETERS.each do |param| + if job_description[param].nil? + raise "Job missing required parametter: #{param}" + end + end + end + + def sha2_hash(filename) + digest = Digest::SHA2.new + + File.open(filename) do |file| + while not file.eof + digest << file.read(digest.block_length) + end + end + + digest.digest + end + + def run_main Amalgam::Worker.logger.info("Running RunJob") + working_directory = Dir.getwd + model_directory = File.join(working_directory, "model") + run_directory = File.join(working_directory, "run") + package_directory = File.join(working_directory, "package") + + Dir.mkdir(model_directory) + Dir.mkdir(run_directory) + Dir.mkdir(package_directory) + + begin + Dir.chdir(model_directory) + + # Download the model into the model directory. + + model_tar_file_path = File.join(model_directory, + "model.tar.bz2") + downloader = @configuration.downloader + downloader.download(@job_description[:model_file_key], model_tar_file_path) + + # Unpack the model. + + tar_result = `tar -xf #{model_tar_file_path}` + if $? != 0 + return { + :return_code => $?, + :error_message => "Failed to unpack the model.", + :error_detail => tar_result + } + end + + model_als_path = File.join(model_directory, "model.als") + + Dir.chdir(run_directory) + + # Download the moolloy jar. + + jar_file_path = File.join(run_directory, + "moolloy.jar") + + downloader = @configuration.downloader + downloader.download(@job_description[:jar_file_key], jar_file_path) + + # Run moolloy. + stdout_path = File.join(run_directory, "stdout.out") + stderr_path = File.join(run_directory, "stderr.out") + + benchmark_result = Benchmark.measure do + `java -jar "#{jar_file_path}" "#{model_als_path}" > #{stdout_path} 2> #{stderr_path}` + end + + return_code = $? + + # Determine correctness. + + correct = (return_code == 0) + test_solution_files = Dir[File.join(run_directory, + "alloy_solutions_*.xml")] + model_solution_files = Dir[File.join(model_directory, + "alloy_solutions_*.xml")] + + if test_solution_files.count != model_solution_files.count + correct = false + end + + hash_to_model_solution = {} + model_solution_files.each do |model_file| + `tail -n +2 #{model_file} > #{model_file}.trimmed` + hash = sha2_hash(model_file + ".trimmed") + hash_to_model_solution[hash] ||= [] + hash_to_model_solution[hash] << model_file + end + + test_solution_files.each do |test_file| + `tail -n +2 #{test_file} > #{test_file}.trimmed` + hash = sha2_hash(test_file + ".trimmed") + + if !hash_to_model_solution[hash].nil? && + !hash_to_model_solution[hash].empty? + + matching_file = hash_to_model_solution[hash].shift + Amalgam::Worker.logger.info("#{test_file} matches #{matching_file}.") + else + Amalgam::Worker.logger.info("#{test_file} has no match.") + correct = false + end + end + + if correct + Amalgam::Worker.logger.info("The solutions are correct.") + else + Amalgam::Worker.logger.info("The solutions are incorrect.") + end + + # Package results + FileUtils.mv(model_directory, package_directory) + FileUtils.mv(run_directory, package_directory) + + tarball_path = File.join(working_directory, "tarball.tar.bz2") + + Dir.chdir(package_directory) + + `hostname > ./hostname` + tar_result = `tar -cjf "#{tarball_path}" #{File.join(".", "*")}` + if $? != 0 + return { + :return_code => $?, + :error_message => "Failed to package results.", + :error_detail => tar_result + } + end + + # Upload the package to s3 + key = "results/#{@job_description[:job_id]}.tar.bz2" + uploader = @configuration.uploader + uploader.upload(tarball_path, key) - return { - :return_code => 255 - } + return { + :return_code => return_code, + :correct => (correct ? 1 : 0), + :runtime_seconds => benchmark_result.real, + :cpu_time_seconds => benchmark_result.total, + :tarball_s3_key => key + } + ensure + Dir.chdir(working_directory) + end end end From 05da900d19952c7c304072426e0014bec583856d Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 6 Feb 2014 19:31:56 -0500 Subject: [PATCH 30/42] Filled out the S3 uploader and downloader. --- lib/amalgam/worker/downloader/s3_downloader.rb | 10 ++++++++-- lib/amalgam/worker/uploader/s3_uploader.rb | 5 +++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/amalgam/worker/downloader/s3_downloader.rb b/lib/amalgam/worker/downloader/s3_downloader.rb index dedd172..b93633c 100644 --- a/lib/amalgam/worker/downloader/s3_downloader.rb +++ b/lib/amalgam/worker/downloader/s3_downloader.rb @@ -1,10 +1,16 @@ class Amalgam::Worker::Downloader::S3Downloader def initialize(options, old_downloader = nil) - @s3_bucket = options[:s3_bucket] + s3_client = AWS::S3.new + @s3_bucket = s3_client.buckets[options[:s3_bucket]] end def download(key, destination_path) - raise "Not implemented yet." + obj = @s3_bucket.objects[key] + File.open(destination_path, "w") do |f| + obj.read do |chunk| + f.write(chunk) + end + end end end diff --git a/lib/amalgam/worker/uploader/s3_uploader.rb b/lib/amalgam/worker/uploader/s3_uploader.rb index fe7845a..1d4ce57 100644 --- a/lib/amalgam/worker/uploader/s3_uploader.rb +++ b/lib/amalgam/worker/uploader/s3_uploader.rb @@ -1,10 +1,11 @@ class Amalgam::Worker::Uploader::S3Uploader def initialize(options, old_uploader) - @s3_bucket = options[:s3_bucket] + s3_client = AWS::S3.new + @s3_bucket = s3_client.buckets[options[:s3_bucket]] end def upload(src_path, key) - raise "Not implemented yet." + @s3_bucket.objects[key].write(:file => src_path) end end From 05a3ec73b6a8c5c2b97640e997e9774fb2a78d00 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Sat, 8 Feb 2014 21:44:17 -0500 Subject: [PATCH 31/42] Make the worker work with the dashboard properly. --- .../worker/heartbeater/http_heartbeater.rb | 4 ++-- lib/amalgam/worker/job.rb | 2 +- lib/amalgam/worker/job/build_job.rb | 2 +- lib/amalgam/worker/job/run_job.rb | 4 ++-- lib/amalgam/worker/manager.rb | 4 +++- lib/amalgam/worker/queue/sqs_queue.rb | 22 +++++++++++++------ 6 files changed, 24 insertions(+), 14 deletions(-) diff --git a/lib/amalgam/worker/heartbeater/http_heartbeater.rb b/lib/amalgam/worker/heartbeater/http_heartbeater.rb index 72d9db3..eb13f05 100644 --- a/lib/amalgam/worker/heartbeater/http_heartbeater.rb +++ b/lib/amalgam/worker/heartbeater/http_heartbeater.rb @@ -108,7 +108,7 @@ def start_url(job_id) end def completion_url(job_id) - "#{@server_base_url}/jobs/#{job_id}/complete" + "#{@server_base_url}/jobs/#{job_id}/finish" end end @@ -117,4 +117,4 @@ def completion_url(job_id) Amalgam::Worker::Heartbeater.register_heartbeater( :http, Amalgam::Worker::Heartbeater::HttpHeartbeater -) \ No newline at end of file +) diff --git a/lib/amalgam/worker/job.rb b/lib/amalgam/worker/job.rb index 21f99f5..8e152ad 100644 --- a/lib/amalgam/worker/job.rb +++ b/lib/amalgam/worker/job.rb @@ -30,7 +30,7 @@ def unregister_job(identifier) def create(job_description, configuration) jobs = @jobs || {} - job_identifier = job_description[:type] + job_identifier = job_description[:job_type] if job_identifier.nil? raise "Job description does not specify a job type." diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb index e74a2af..9dcf4a1 100644 --- a/lib/amalgam/worker/job/build_job.rb +++ b/lib/amalgam/worker/job/build_job.rb @@ -122,7 +122,7 @@ def run_main return { :return_code => 0, - :build_key => "builds/#{commit}.jar" + :result_s3_key => "builds/#{commit}.jar" } ensure diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index fbfd738..f559363 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -167,9 +167,9 @@ def run_main return { :return_code => return_code, :correct => (correct ? 1 : 0), - :runtime_seconds => benchmark_result.real, + :real_time_seconds => benchmark_result.real, :cpu_time_seconds => benchmark_result.total, - :tarball_s3_key => key + :result_s3_key => key } ensure Dir.chdir(working_directory) diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb index 3e0a416..a4ac031 100644 --- a/lib/amalgam/worker/manager.rb +++ b/lib/amalgam/worker/manager.rb @@ -131,7 +131,9 @@ def run_job(job_description) end @runner.join - result = @runner.result + result = { + :secret_key => job_description[:secret_key] + }.merge(@runner.result) Amalgam::Worker.logger.info("Job Completed, Result: #{result}") Amalgam::Worker.logger.info("Signalling compleion of job: #{job_id}") diff --git a/lib/amalgam/worker/queue/sqs_queue.rb b/lib/amalgam/worker/queue/sqs_queue.rb index b9bbfc1..fae2401 100644 --- a/lib/amalgam/worker/queue/sqs_queue.rb +++ b/lib/amalgam/worker/queue/sqs_queue.rb @@ -1,19 +1,27 @@ class Amalgam::Worker::Queue::SqsQueue - def initialize(sqs_queue_name, idle_timeout) + def initialize(options, old_queue) sqs_client = AWS::SQS.new - @sqs_queue = sqs_client.queues.named(sqs_queue_name) - @idle_timeout = idle_timeout + @sqs_queue = sqs_client.queues.named(options[:sqs_queue]) end def poll - message = @sqs_queue.poll(:idle_timeout => @idle_timeout) - + message = @sqs_queue.receive_message + return nil if message.nil? - return YAML.safe_load(message.body) + + message_body = YAML.safe_load(message.body) + + return nil if message_body[:version] != 2 + + message_body[:secret_key] = message.id + + message.delete + + return message_body end end Amalgam::Worker::Queue.register_queue( :sqs, Amalgam::Worker::Queue::SqsQueue -) \ No newline at end of file +) From 778680bb8f6f255f9642b851238c6838d97ebbd3 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 12 Feb 2014 14:36:39 -0500 Subject: [PATCH 32/42] Add the Queue files to the gemspec. --- amalgam-worker.gemspec | 3 +++ 1 file changed, 3 insertions(+) diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index c7ada73..00da78a 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -23,6 +23,9 @@ Gem::Specification.new do |s| 'lib/amalgam/worker/uploader.rb', 'lib/amalgam/worker/uploader/s3_uploader.rb', 'lib/amalgam/worker/uploader/test_uploader.rb', + 'lib/amalgam/worker/queue.rb', + 'lib/amalgam/worker/queue/sqs_queue.rb', + 'lib/amalgam/worker/queue/test_queue.rb', 'lib/amalgam/worker/job.rb', 'lib/amalgam/worker/job/build_job.rb', 'lib/amalgam/worker/job/run_job.rb'] From e4377dd71fed7f4a0798696c9cb68c684c2f2782 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 12 Feb 2014 14:37:20 -0500 Subject: [PATCH 33/42] Fix creation of tmp dir. --- lib/amalgam/worker/runner.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index ac950ac..8da4461 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -43,7 +43,7 @@ def thread_main original_working_dir = Dir.getwd - temp_dir = Dir.mktmpdir(@configuration.tmp_dir) + temp_dir = Dir.mktmpdir(nil, @configuration.tmp_dir) Dir.chdir(temp_dir) error_caught = false From 56cb27cab84f55e30fa1301ceb0c55dc3687777b Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 12 Feb 2014 15:05:50 -0500 Subject: [PATCH 34/42] Fix the broken job specs. --- spec/amalgam/worker/job_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/amalgam/worker/job_spec.rb b/spec/amalgam/worker/job_spec.rb index 7a75a89..488a76c 100644 --- a/spec/amalgam/worker/job_spec.rb +++ b/spec/amalgam/worker/job_spec.rb @@ -34,14 +34,14 @@ def run it "creates the correctly registered job" do Amalgam::Worker::Job.register_job('asdf', SampleJob) - job = Amalgam::Worker::Job.create({:type => 'asdf'}, nil) + job = Amalgam::Worker::Job.create({:job_type => 'asdf'}, nil) expect(job).to be_an_instance_of(SampleJob) end it "should raise an error when creating an unregistered job" do expect { - Amalgam::Worker::Job.create({:type => 'asdf'}, nil) + Amalgam::Worker::Job.create({:job_type => 'asdf'}, nil) }.to raise_error end end From c31e843f4b839128c94204b9949d930545097824 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Wed, 12 Feb 2014 15:06:21 -0500 Subject: [PATCH 35/42] Make the gem requirements less strict. --- Gemfile.lock | 14 ++++++-------- amalgam-worker.gemspec | 11 +++++------ 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index d21153d..b88119e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,15 +2,14 @@ PATH remote: . specs: amalgam-worker (0.1.0) - aws-sdk (~> 1.33.0) - httparty (~> 0.12.0) - rugged (~> 0.19.0) - safe_yaml (~> 1.0.1) + aws-sdk (~> 1.33) + httparty (~> 0.12) + safe_yaml (~> 1.0) GEM remote: https://rubygems.org/ specs: - aws-sdk (1.33.0) + aws-sdk (1.34.0) json (~> 1.4) nokogiri (>= 1.4.4) uuidtools (~> 2.1) @@ -32,7 +31,6 @@ GEM rspec-expectations (2.14.5) diff-lcs (>= 1.1.3, < 2.0) rspec-mocks (2.14.5) - rugged (0.19.0) safe_yaml (1.0.1) uuidtools (2.1.4) @@ -41,5 +39,5 @@ PLATFORMS DEPENDENCIES amalgam-worker! - rake (~> 10.1.1) - rspec (~> 2.14.1) + rake (~> 10.1) + rspec (~> 2.14) diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index 00da78a..100dca7 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -31,11 +31,10 @@ Gem::Specification.new do |s| 'lib/amalgam/worker/job/run_job.rb'] s.executables = ['amalgam-worker'] - s.add_runtime_dependency 'aws-sdk', '~> 1.33.0' - s.add_runtime_dependency 'safe_yaml', '~> 1.0.1' - s.add_runtime_dependency 'rugged', '~> 0.19.0' - s.add_runtime_dependency 'httparty', '~> 0.12.0' + s.add_runtime_dependency 'aws-sdk', '~> 1.33' + s.add_runtime_dependency 'safe_yaml', '~> 1.0' + s.add_runtime_dependency 'httparty', '~> 0.12' - s.add_development_dependency 'rake', '~> 10.1.1' - s.add_development_dependency 'rspec', '~> 2.14.1' + s.add_development_dependency 'rake', '~> 10.1' + s.add_development_dependency 'rspec', '~> 2.14' end From eb66b26c64c45cf4b703ed59431ec2173a09314e Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Thu, 13 Feb 2014 14:02:46 -0500 Subject: [PATCH 36/42] Populate solutions for models without solutions. --- Gemfile.lock | 4 ++ amalgam-worker.gemspec | 1 + lib/amalgam/worker/configuration.rb | 8 +++- lib/amalgam/worker/job/build_job.rb | 20 +++++----- lib/amalgam/worker/job/run_job.rb | 58 ++++++++++++++++++++++++++--- 5 files changed, 74 insertions(+), 17 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index b88119e..81f80d8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH amalgam-worker (0.1.0) aws-sdk (~> 1.33) httparty (~> 0.12) + rest_client safe_yaml (~> 1.0) GEM @@ -20,9 +21,12 @@ GEM json (1.8.1) mini_portile (0.5.2) multi_xml (0.5.5) + netrc (0.7.7) nokogiri (1.6.1) mini_portile (~> 0.5.0) rake (10.1.1) + rest_client (1.7.2) + netrc (~> 0.7.7) rspec (2.14.1) rspec-core (~> 2.14.0) rspec-expectations (~> 2.14.0) diff --git a/amalgam-worker.gemspec b/amalgam-worker.gemspec index 100dca7..1f49d7f 100644 --- a/amalgam-worker.gemspec +++ b/amalgam-worker.gemspec @@ -34,6 +34,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'aws-sdk', '~> 1.33' s.add_runtime_dependency 'safe_yaml', '~> 1.0' s.add_runtime_dependency 'httparty', '~> 0.12' + s.add_runtime_dependency 'rest_client' s.add_development_dependency 'rake', '~> 10.1' s.add_development_dependency 'rspec', '~> 2.14' diff --git a/lib/amalgam/worker/configuration.rb b/lib/amalgam/worker/configuration.rb index 4bef8de..87c44ee 100644 --- a/lib/amalgam/worker/configuration.rb +++ b/lib/amalgam/worker/configuration.rb @@ -19,6 +19,9 @@ class Amalgam::Worker::Configuration :heartbeat_period, # Time between heartbeats. :sleep_interval, # Time to sleep between worker checks. :idle_timeout, # Time to spend polling for jobs before checking other state + :server_base_url, # Base url for jobs to talk to the server. + :username, # Username for jobs to talk to the server. + :password, # Password for jobs to talk to the server. :heartbeater_type, # The type of heartbeater to use. :uploader_type, # The type of uploader to use. @@ -35,6 +38,7 @@ class Amalgam::Worker::Configuration :access_key_id, :secret_access_key, :git_repo, + :server_base_url, :heartbeater_type, :uploader_type, :downloader_type, @@ -53,7 +57,9 @@ class Amalgam::Worker::Configuration :sleep_interval => 15 * SECONDS_PER_SECOND, :tmp_dir => nil, :idle_timeout => 2 * SECONDS_PER_MINUTE, - :ssh_key => nil + :ssh_key => nil, + :username => nil, + :password => nil } # All Non-Mandatory settings must have an entry in the diff --git a/lib/amalgam/worker/job/build_job.rb b/lib/amalgam/worker/job/build_job.rb index 9dcf4a1..592039c 100644 --- a/lib/amalgam/worker/job/build_job.rb +++ b/lib/amalgam/worker/job/build_job.rb @@ -41,9 +41,9 @@ def run_main repo_url = @configuration.git_repo Amalgam::Worker.logger.info("Cloning git repo.") clone_results = run_with_ssh_key("git clone #{repo_url} moolloy 2>&1") - if $? != 0 + if $?.to_i != 0 return { - :return_code => $?, + :return_code => $?.to_i, :error_message => "Failed to clone git repo.", :error_details => clone_results } @@ -56,9 +56,9 @@ def run_main commit = @job_description[:commit] Amalgam::Worker.logger.info("Checking out commit: #{commit}") checkout_results = `git checkout #{commit} 2>&1` - if $? != 0 + if $?.to_i != 0 return { - :return_code => $?, + :return_code => $?.to_i, :error_message => "Failed to checkout commit.", :error_details => checkout_results } @@ -68,18 +68,18 @@ def run_main Amalgam::Worker.logger.info("Updating submodules.") submodule_results = `git submodule init 2>&1` - if $? != 0 + if $?.to_i != 0 return { - :return_code => $?, + :return_code => $?.to_i, :error_message => "Failed to submodule init.", :error_details => submodule_results } end submodule_results = run_with_ssh_key('git submodule update 2>&1') - if $? != 0 + if $?.to_i != 0 return { - :return_code => $?, + :return_code => $?.to_i, :error_message => "Failed to submodule update.", :error_details => submodule_results } @@ -89,9 +89,9 @@ def run_main Amalgam::Worker.logger.info("Building...") build_results = `ant deps 2>&1 && ant configure 2>&1 && ant dist 2>&1` - if $? != 0 + if $?.to_i != 0 return { - :return_code => $?, + :return_code => $?.to_i, :error_message => "Failed to build.", :error_details => build_results } diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index f559363..241cfae 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -1,6 +1,7 @@ require 'benchmark' require 'fileutils' require 'digest' +require 'rest_client' class Amalgam::Worker::Job::RunJob < Amalgam::Worker::Job def initialize(job_description, configuration) @@ -70,9 +71,9 @@ def run_main # Unpack the model. tar_result = `tar -xf #{model_tar_file_path}` - if $? != 0 + if $?.to_i != 0 return { - :return_code => $?, + :return_code => $?.to_i, :error_message => "Failed to unpack the model.", :error_detail => tar_result } @@ -98,10 +99,55 @@ def run_main `java -jar "#{jar_file_path}" "#{model_als_path}" > #{stdout_path} 2> #{stderr_path}` end - return_code = $? + return_code = $?.to_i - # Determine correctness. + Dir.chdir(model_directory) + + # If moolloy finished successfully and the solutions for this model have not been populated + # we will assume that this run was correct and populate the solutions. + if (return_code == 0) && File.exists?(File.join(model_directory, "solutions_not_populated.txt")) + + Amalgam::Worker.logger.info("The solutions were not populated for this model.") + Amalgam::Worker.logger.info("Populating the solutions from this run.") + + # Delete the solutions_not_populated.txt file. + FileUtils.rm_f(File.join(model_directory, "solutions_not_populated.txt")) + + # Find the solutions this run generated and copy them into the model_directory. + test_solution_files = Dir[File.join(run_directory, "alloy_solutions_*.xml")] + FileUtils.cp(test_solution_files, model_directory) + # Package the model and the solutions into a tarball. + tar_result = `tar -cjf "#{File.join(model_directory, "populated_model.tar.bz2")}" model.als alloy_solutions_*.xml` + if $?.to_i != 0 + return { + :return_code => $?.to_i, + :error_message => "Failed to package populated model.", + :error_details => tar_result + } + end + + # Upload the model to the dashboard. + model_id = @job_description[:model_id] + + RestClient.logger = Amalgam::Worker.logger + + upload_request = RestClient::Request.new( + :method => :post, + :url => "#{@configuration.server_base_url}/models/#{model_id}/upload", + :username => @configuration.username, + :password => @configuration.password, + :payload => { + :file => File.new(File.join(model_directory, "populated_model.tar.bz2")) + } + ) + + upload_request.execute + end + + Dir.chdir(run_directory) + + # Determine correctness. correct = (return_code == 0) test_solution_files = Dir[File.join(run_directory, "alloy_solutions_*.xml")] @@ -151,9 +197,9 @@ def run_main `hostname > ./hostname` tar_result = `tar -cjf "#{tarball_path}" #{File.join(".", "*")}` - if $? != 0 + if $?.to_i != 0 return { - :return_code => $?, + :return_code => $?.to_i, :error_message => "Failed to package results.", :error_detail => tar_result } From 872523dcb9bdffbcf8c83f6066fa444f170ed046 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Fri, 14 Feb 2014 14:09:09 -0500 Subject: [PATCH 37/42] log not logger. --- lib/amalgam/worker/job/run_job.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index 241cfae..76fe85a 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -130,7 +130,7 @@ def run_main # Upload the model to the dashboard. model_id = @job_description[:model_id] - RestClient.logger = Amalgam::Worker.logger + RestClient.log = Amalgam::Worker.logger upload_request = RestClient::Request.new( :method => :post, From bc00e282ef222dabc01a29a5aa26e54dcd73d30e Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Mon, 10 Mar 2014 16:50:09 -0400 Subject: [PATCH 38/42] Add the MooAlgorithm flag to the command line arguments. --- lib/amalgam/worker/job/run_job.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index 76fe85a..9229878 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -95,8 +95,13 @@ def run_main stdout_path = File.join(run_directory, "stdout.out") stderr_path = File.join(run_directory, "stderr.out") + algorithm_string = nil + if @job_description[:algorithm] + algorithm_string = "--MooAlgorithm=#{@job_description[:algorithm]}" + end + benchmark_result = Benchmark.measure do - `java -jar "#{jar_file_path}" "#{model_als_path}" > #{stdout_path} 2> #{stderr_path}` + `java -jar "#{jar_file_path}" #{algorithm_string} "#{model_als_path}" > #{stdout_path} 2> #{stderr_path}` end return_code = $?.to_i From a9ba32c8550bff326ea80191555e511dd3d75389 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Mon, 10 Mar 2014 16:52:50 -0400 Subject: [PATCH 39/42] Fixed the HTTP auth parameters. --- lib/amalgam/worker/job/run_job.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index 9229878..410378c 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -140,7 +140,7 @@ def run_main upload_request = RestClient::Request.new( :method => :post, :url => "#{@configuration.server_base_url}/models/#{model_id}/upload", - :username => @configuration.username, + :user => @configuration.username, :password => @configuration.password, :payload => { :file => File.new(File.join(model_directory, "populated_model.tar.bz2")) From 0f5e3c511056c4844f1377e5224f0ff8f4e4bff1 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Mon, 10 Mar 2014 18:30:57 -0400 Subject: [PATCH 40/42] Need to set the running flag outside of the thread. Otherwise, it is likely that the manager will not see that the worker is running and join immediately. --- lib/amalgam/worker/runner.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/amalgam/worker/runner.rb b/lib/amalgam/worker/runner.rb index 8da4461..210ea20 100644 --- a/lib/amalgam/worker/runner.rb +++ b/lib/amalgam/worker/runner.rb @@ -15,6 +15,7 @@ def initialize(configuration, job_description) end def run + @running = true @thread = Thread.new { thread_main } @@ -39,8 +40,6 @@ def running? def thread_main begin - @running = true - original_working_dir = Dir.getwd temp_dir = Dir.mktmpdir(nil, @configuration.tmp_dir) From 2965af9fe95b2b9e6e8d01b1369aa0ba951da2f0 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Mon, 10 Mar 2014 19:08:22 -0400 Subject: [PATCH 41/42] Hard code the memory options for now. --- lib/amalgam/worker/job/run_job.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/amalgam/worker/job/run_job.rb b/lib/amalgam/worker/job/run_job.rb index 410378c..e43082e 100644 --- a/lib/amalgam/worker/job/run_job.rb +++ b/lib/amalgam/worker/job/run_job.rb @@ -100,8 +100,10 @@ def run_main algorithm_string = "--MooAlgorithm=#{@job_description[:algorithm]}" end + memory_string = "-Xss4m -Xms512m -Xmx8192m" + benchmark_result = Benchmark.measure do - `java -jar "#{jar_file_path}" #{algorithm_string} "#{model_als_path}" > #{stdout_path} 2> #{stderr_path}` + `java #{memory_string} -jar "#{jar_file_path}" #{algorithm_string} "#{model_als_path}" > #{stdout_path} 2> #{stderr_path}` end return_code = $?.to_i From 3e1e4b0d5149d02b43e20e39eda9e43b1c8b3624 Mon Sep 17 00:00:00 2001 From: Christopher Kleynhans Date: Mon, 10 Mar 2014 19:20:26 -0400 Subject: [PATCH 42/42] Fix heartbeating and job termination output. --- lib/amalgam/worker/manager.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/amalgam/worker/manager.rb b/lib/amalgam/worker/manager.rb index a4ac031..4f23341 100644 --- a/lib/amalgam/worker/manager.rb +++ b/lib/amalgam/worker/manager.rb @@ -113,6 +113,7 @@ def run_job(job_description) job_id = job_description[:job_id] Amalgam::Worker.logger.info("Signalling start of job: #{job_id}") @configuration.heartbeater.signal_start(job_id) + @configuration.heartbeater.heartbeat(job_id) # Ensure that a previous job termination will not carry-over @job_termination_requested = false @@ -138,6 +139,7 @@ def run_job(job_description) Amalgam::Worker.logger.info("Job Completed, Result: #{result}") Amalgam::Worker.logger.info("Signalling compleion of job: #{job_id}") @configuration.heartbeater.signal_completion(job_id, result) + @configuration.heartbeater.heartbeat(nil) end def seconds_since_job_start @@ -145,10 +147,10 @@ def seconds_since_job_start end def maybe_terminate_job - Amalgam::Worker.logger.info("Job has timed-out.") if (seconds_since_job_start >= @configuration.worker_timeout || @job_termination_requested) + Amalgam::Worker.logger.info("Job has timed-out.") @runner.terminate @job_termination_requested = false end