lib/sqewer/worker.rb in sqewer-4.2.0 vs lib/sqewer/worker.rb in sqewer-5.0.0

- old
+ new

@@ -25,13 +25,10 @@ attr_reader :execution_context_class # @return [Class] The class used to create the Submitter used by jobs to spawn other jobs attr_reader :submitter_class - # @return [#perform] The isolator to use when executing each job - attr_reader :isolator - # @return [Array<Thread>] all the currently running threads of the Worker attr_reader :threads # @return [Fixnum] the number of worker threads set up for this Worker attr_reader :num_threads @@ -51,28 +48,25 @@ # @param execution_context_class[Class] the class for the execution context (will be instantiated by # the worker for each job execution) # @param submitter_class[Class] the class used for submitting jobs (will be instantiated by the worker for each job execution) # @param middleware_stack[Sqewer::MiddlewareStack] the middleware stack that is going to be used # @param logger[Logger] the logger to log execution to and to pass to the jobs - # @param isolator[Sqewer::Isolator] the isolator to encapsulate job instantiation and execution, if desired # @param num_threads[Fixnum] how many worker threads to spawn def initialize(connection: Sqewer::Connection.default, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, middleware_stack: Sqewer::MiddlewareStack.default, logger: Logger.new($stderr), - isolator: Sqewer::Isolator.default, num_threads: DEFAULT_NUM_THREADS) @logger = logger @connection = connection @serializer = serializer @middleware_stack = middleware_stack @execution_context_class = execution_context_class @submitter_class = submitter_class - @isolator = isolator @num_threads = num_threads @threads = [] raise ArgumentError, "num_threads must be > 0" unless num_threads > 0 @@ -187,13 +181,37 @@ @execution_queue.length < (@num_threads * THROTTLE_FACTOR) end def handle_message(message) return unless message.receipt_handle - return @connection.delete_message(message.receipt_handle) unless message.has_body? - @isolator.perform(self, message) - # The message delete happens within the Isolator + + # Create a messagebox that buffers all the calls to Connection, so that + # we can send out those commands in one go (without interfering with senders + # on other threads, as it seems the Aws::SQS::Client is not entirely + # thread-safe - or at least not it's HTTP client part). + box = Sqewer::ConnectionMessagebox.new(connection) + return box.delete_message(message.receipt_handle) unless message.has_body? + + job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body) do + serializer.unserialize(message.body) + end + return unless job + + submitter = submitter_class.new(box, serializer) + context = execution_context_class.new(submitter, {'logger' => logger}) + + t = Time.now + middleware_stack.around_execution(job, context) do + job.method(:run).arity.zero? ? job.run : job.run(context) + end + box.delete_message(message.receipt_handle) + + delta = Time.now - t + logger.info { "[worker] Finished %s in %0.2fs" % [job.inspect, delta] } + ensure + n_flushed = box.flush! + logger.debug { "[worker] Flushed %d connection commands" % n_flushed } if n_flushed.nonzero? end def take_and_execute message = @execution_queue.pop(nonblock=true) handle_message(message) @@ -201,7 +219,38 @@ throw :goodbye if stopping? sleep SLEEP_SECONDS_ON_EMPTY_QUEUE rescue => e # anything else, at or below StandardError that does not need us to quit @logger.error { '[worker] Failed "%s..." with %s: %s' % [message.inspect[0..32], e.class, e.message] } e.backtrace.each { |s| @logger.error{"\t#{s}"} } + end + + def perform(message) + # Create a messagebox that buffers all the calls to Connection, so that + # we can send out those commands in one go (without interfering with senders + # on other threads, as it seems the Aws::SQS::Client is not entirely + # thread-safe - or at least not it's HTTP client part). + box = Sqewer::ConnectionMessagebox.new(connection) + + job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body) do + serializer.unserialize(message.body) + end + return unless job + + submitter = submitter_class.new(box, serializer) + context = execution_context_class.new(submitter, {'logger' => logger}) + + t = Time.now + middleware_stack.around_execution(job, context) do + job.method(:run).arity.zero? ? job.run : job.run(context) + end + + # Perform two flushes, one for any possible jobs the job has spawned, + # and one for the job delete afterwards + box.delete_message(message.receipt_handle) + + delta = Time.now - t + logger.info { "[worker] Finished %s in %0.2fs" % [job.inspect, delta] } + ensure + n_flushed = box.flush! + logger.debug { "[worker] Flushed %d connection commands" % n_flushed } if n_flushed.nonzero? end end