#!/usr/bin/env ruby require "bundler/setup" require "jflow" require "slop" opts = Slop.parse do |o| o.string '-f', '--file', 'worker configuration file' end configuration = JSON.parse(File.read(opts[:file])) configuration_validator = { "number_of_workers" => "integer", "domain" => "string", "tasklist" => "string", "activities_path" => "array" } validator = HashValidator.validate(configuration, configuration_validator) raise "configuration is invalid! #{validator.errors}" unless validator.valid? JFlow.configure do |c| c.load_paths = configuration["activities_path"] c.swf_client = Aws::SWF::Client.new end JFlow.load_activities threads = [] configuration["number_of_workers"].times do |i| threads << Thread.new do domain = configuration["domain"] tasklist = configuration["tasklist"] JFlow::Activity::Worker.new(domain, tasklist) .start! end end def shutdown(threads) JFlow.configuration.logger.info "Sending kill signal to running threads. Please wait for current polling to finish" kill_threads = [] threads.each do |thread| thread["do_exit"] = true if thread["state"] == :working kill_threads << Thread.new do sleep 60 if thread.alive? thread.raise("Workers are going down!") end end end end kill_threads.each { |thr| thr.join } end Signal.trap("INT") { shutdown(threads) } # Trap `Kill ` Signal.trap("TERM") { shutdown(threads) } threads.each { |thr| thr.join }