Sha256: 58525e7baae91b6d5d29c95831cad2cc7be70f176b447fff3e8b53a527cf84f1

Contents?: true

Size: 1.7 KB

Versions: 1

Compression:

Stored size: 1.7 KB

Contents

module JFlow
  module Activity
    class Worker

      attr_reader :domain, :tasklist

      def initialize(domain, tasklist)
        @domain = domain
        @tasklist = tasklist
      end

      def start!
        while should_be_working?
          log "Polling for task on #{domain} - #{tasklist}"
          poll
        end
        log "Thread is marked as exiting, stopping the poll"
      end


      def poll
        Thread.current["state"] = :polling
        response = JFlow.configuration.swf_client.poll_for_activity_task(poll_params)
        if response.task_token
          log "Got task #{response.task_token}"
          task = JFlow::Activity::Task.new(response)
          if should_be_working?
            process(task)
          else
            #The worker is shuting down, we don't want to start working on anything
            #so we fail the task and let the decider queue it up for retry later
            task.failed!(Exception.new("Worker is going down!"))
          end
        else
          log "Got no task"
        end
      end

      def process(task)
        begin
          Thread.current["state"] = :working
          task.run!
        rescue => exception
          task.failed!(exception)
        end
      end

      private

      def identity
        @identity ||= "#{`hostname`.chomp}-#{Thread.current.object_id}"
      end

      def log(str)
        JFlow.configuration.logger.info "[#{Thread.current.object_id}] #{str}"
      end

      def poll_params
        {
          domain: domain,
          task_list: {
            name: tasklist,
          },
          identity: identity,
        }
      end

      def should_be_working?
        Thread.current["do_exit"] != true
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jflow-0.2.6 lib/jflow/activity/worker.rb