Sha256: 472e11c17ead9fbfcb467c2d7c8a049dbd5e5dce62b4a9fd5c9f8a03a8942cb1

Contents?: true

Size: 1.5 KB

Versions: 1

Compression:

Stored size: 1.5 KB

Contents

#!/usr/bin/env ruby

require "bundler/setup"
require "jflow"
require "slop"

opts = Slop.parse do |o|
  o.string '-f', '--file', 'worker configuration file'
end

configuration = JSON.parse(File.read(opts[:file]))

configuration_validator = {
  "number_of_workers" => "integer",
  "domain"            => "string",
  "tasklist"          => "string",
  "activities_path"   => "array"
}

validator = HashValidator.validate(configuration, configuration_validator)
raise "configuration is invalid! #{validator.errors}" unless validator.valid?

JFlow.configure do |c|
  c.load_paths = configuration["activities_path"]
  c.swf_client = Aws::SWF::Client.new
end

JFlow.load_activities

threads = []

configuration["number_of_workers"].times do |i|
  threads << Thread.new do
    domain   = configuration["domain"]
    tasklist = configuration["tasklist"]
    JFlow::Activity::Worker.new(domain, tasklist)
                         .start!
  end
end

def shutdown(threads)
  JFlow.configuration.logger.info "Sending kill signal to running threads. Please wait for current polling to finish"
  kill_threads = []
  threads.each do |thread|
    thread["do_exit"] = true
    if thread["state"] == :working
      kill_threads << Thread.new do
        sleep 60
        if thread.alive?
          thread.raise("Workers are going down!")
        end
      end
    end
  end
  kill_threads.each { |thr| thr.join }
end

Signal.trap("INT") {
  shutdown(threads)
}

# Trap `Kill `
Signal.trap("TERM") {
  shutdown(threads)
}

threads.each { |thr| thr.join }

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jflow-0.2.6 bin/jflow_worker