lib/sqs_buffer/client.rb in sqs_buffer-0.2.2 vs lib/sqs_buffer/client.rb in sqs_buffer-0.2.3

- old
+ new

@@ -7,23 +7,24 @@ def initialize(opts) @queue_url = opts.fetch(:queue_url) { |k| missing_key!(k) } client = opts.fetch(:client) { |k| missing_key!(k) } @poller = Aws::SQS::QueuePoller.new(@queue_url, client: client) - @skip_delete = opts.fetch(:skip_delete, true) - @max_number_of_messages = opts.fetch(:max_number_of_messages, 10) - @logger = opts.fetch(:logger, Logger.new(STDOUT)) - @process_block = Concurrent::MutexAtomicReference.new + @skip_delete = opts.fetch(:skip_delete, true) + @max_number_of_messages = opts.fetch(:max_number_of_messages, 10).to_i + @logger = opts.fetch(:logger, Logger.new(STDOUT)) + @process_block = Concurrent::MutexAtomicReference.new @before_request_block = Concurrent::MutexAtomicReference.new - @process_block = Concurrent::MutexAtomicReference.new - @message_queue = Concurrent::Array.new - @last_process_time = Concurrent::AtomicFixnum.new(Time.now.to_i) - @running = Concurrent::AtomicBoolean.new(false) + @process_block = Concurrent::MutexAtomicReference.new + @message_queue = Concurrent::Array.new + @last_process_time = Concurrent::AtomicFixnum.new(Time.now.to_i) + @running = Concurrent::AtomicBoolean.new(false) + @max_wait_time = Concurrent::AtomicFixnum.new( - opts.fetch(:max_wait_time, 300) + opts.fetch(:max_wait_time, 300).to_i ) @max_queue_threshold = Concurrent::AtomicFixnum.new( - opts.fetch(:max_queue_threshold, 100) + opts.fetch(:max_queue_threshold, 100).to_i ) configure_before_request_block end def start_polling