Sha256: 2e0feb1aa59187db2ec654f300fe75cebd61c1f2913a198e28357f4622af1cad

Contents?: true

Size: 1.98 KB

Versions: 1

Compression:

Stored size: 1.98 KB

Contents

module JFlow
  class ActivityWorker

    attr_reader :domain, :tasklist

    def initialize(domain, tasklist)
      @domain = domain
      @tasklist = tasklist
    end

    def start!
      loop do
        log "Polling for task on #{domain} - #{tasklist}"
        poll
      end
    end

    private

    def poll
      response = JFlow.configuration.swf_client.poll_for_activity_task(poll_params)
      if response.task_token
        process_task(response)
      else
        log "Got no task"
      end
    end

    def process_task(response)
      log "Got task #{response.task_token}"

      klass = class_for_activity(response.activity_type)
      raise "Could not find code to run for given activity" unless klass

      begin
        JFlow.configuration.logger.debug "Started #{klass}#process with #{YAML.load(response.input)}"
        if response.activity_type.name.split('.').size > 1
          method = response.activity_type.name.split('.').last
        else
          method = "process"
        end
        result = klass.new.send(method, YAML.load(response.input)) || true
        JFlow.configuration.logger.debug "Done #{klass}##{method}"
        JFlow.configuration.swf_client.respond_activity_task_completed({
          task_token: response.task_token,
          result: result,
        })
      rescue => e
        JFlow.configuration.swf_client.respond_activity_task_failed({
          task_token: response.task_token,
          reason: e.message,
          details: e.backtrace.join("\n"),
        })
      end

    end

    def identity
      @identity ||= "#{`hostname`.chomp}-#{Thread.current.object_id}"
    end

    def log(str)
      JFlow.configuration.logger.info "[#{Thread.current.object_id}] #{str}"
    end

    def poll_params
      {
        domain: domain,
        task_list: {
          name: tasklist,
        },
        identity: identity,
      }
    end

    def class_for_activity(activity_type)
      $activity_map[activity_type.name][activity_type.version][:class]
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jflow-0.1.0 lib/jflow/activity_worker.rb