lib/sqewer/worker.rb in sqewer-4.2.0 vs lib/sqewer/worker.rb in sqewer-5.0.0
- old
+ new
@@ -25,13 +25,10 @@
attr_reader :execution_context_class
# @return [Class] The class used to create the Submitter used by jobs to spawn other jobs
attr_reader :submitter_class
- # @return [#perform] The isolator to use when executing each job
- attr_reader :isolator
-
# @return [Array<Thread>] all the currently running threads of the Worker
attr_reader :threads
# @return [Fixnum] the number of worker threads set up for this Worker
attr_reader :num_threads
@@ -51,28 +48,25 @@
# @param execution_context_class[Class] the class for the execution context (will be instantiated by
# the worker for each job execution)
# @param submitter_class[Class] the class used for submitting jobs (will be instantiated by the worker for each job execution)
# @param middleware_stack[Sqewer::MiddlewareStack] the middleware stack that is going to be used
# @param logger[Logger] the logger to log execution to and to pass to the jobs
- # @param isolator[Sqewer::Isolator] the isolator to encapsulate job instantiation and execution, if desired
# @param num_threads[Fixnum] how many worker threads to spawn
def initialize(connection: Sqewer::Connection.default,
serializer: Sqewer::Serializer.default,
execution_context_class: Sqewer::ExecutionContext,
submitter_class: Sqewer::Submitter,
middleware_stack: Sqewer::MiddlewareStack.default,
logger: Logger.new($stderr),
- isolator: Sqewer::Isolator.default,
num_threads: DEFAULT_NUM_THREADS)
@logger = logger
@connection = connection
@serializer = serializer
@middleware_stack = middleware_stack
@execution_context_class = execution_context_class
@submitter_class = submitter_class
- @isolator = isolator
@num_threads = num_threads
@threads = []
raise ArgumentError, "num_threads must be > 0" unless num_threads > 0
@@ -187,13 +181,37 @@
@execution_queue.length < (@num_threads * THROTTLE_FACTOR)
end
def handle_message(message)
return unless message.receipt_handle
- return @connection.delete_message(message.receipt_handle) unless message.has_body?
- @isolator.perform(self, message)
- # The message delete happens within the Isolator
+
+ # Create a messagebox that buffers all the calls to Connection, so that
+ # we can send out those commands in one go (without interfering with senders
+ # on other threads, as it seems the Aws::SQS::Client is not entirely
+ # thread-safe - or at least not it's HTTP client part).
+ box = Sqewer::ConnectionMessagebox.new(connection)
+ return box.delete_message(message.receipt_handle) unless message.has_body?
+
+ job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body) do
+ serializer.unserialize(message.body)
+ end
+ return unless job
+
+ submitter = submitter_class.new(box, serializer)
+ context = execution_context_class.new(submitter, {'logger' => logger})
+
+ t = Time.now
+ middleware_stack.around_execution(job, context) do
+ job.method(:run).arity.zero? ? job.run : job.run(context)
+ end
+ box.delete_message(message.receipt_handle)
+
+ delta = Time.now - t
+ logger.info { "[worker] Finished %s in %0.2fs" % [job.inspect, delta] }
+ ensure
+ n_flushed = box.flush!
+ logger.debug { "[worker] Flushed %d connection commands" % n_flushed } if n_flushed.nonzero?
end
def take_and_execute
message = @execution_queue.pop(nonblock=true)
handle_message(message)
@@ -201,7 +219,38 @@
throw :goodbye if stopping?
sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
rescue => e # anything else, at or below StandardError that does not need us to quit
@logger.error { '[worker] Failed "%s..." with %s: %s' % [message.inspect[0..32], e.class, e.message] }
e.backtrace.each { |s| @logger.error{"\t#{s}"} }
+ end
+
+ def perform(message)
+ # Create a messagebox that buffers all the calls to Connection, so that
+ # we can send out those commands in one go (without interfering with senders
+ # on other threads, as it seems the Aws::SQS::Client is not entirely
+ # thread-safe - or at least not it's HTTP client part).
+ box = Sqewer::ConnectionMessagebox.new(connection)
+
+ job = middleware_stack.around_deserialization(serializer, message.receipt_handle, message.body) do
+ serializer.unserialize(message.body)
+ end
+ return unless job
+
+ submitter = submitter_class.new(box, serializer)
+ context = execution_context_class.new(submitter, {'logger' => logger})
+
+ t = Time.now
+ middleware_stack.around_execution(job, context) do
+ job.method(:run).arity.zero? ? job.run : job.run(context)
+ end
+
+ # Perform two flushes, one for any possible jobs the job has spawned,
+ # and one for the job delete afterwards
+ box.delete_message(message.receipt_handle)
+
+ delta = Time.now - t
+ logger.info { "[worker] Finished %s in %0.2fs" % [job.inspect, delta] }
+ ensure
+ n_flushed = box.flush!
+ logger.debug { "[worker] Flushed %d connection commands" % n_flushed } if n_flushed.nonzero?
end
end