lib/sqs_buffer/client.rb in sqs_buffer-0.2.2 vs lib/sqs_buffer/client.rb in sqs_buffer-0.2.3
- old
+ new
@@ -7,23 +7,24 @@
def initialize(opts)
@queue_url = opts.fetch(:queue_url) { |k| missing_key!(k) }
client = opts.fetch(:client) { |k| missing_key!(k) }
@poller = Aws::SQS::QueuePoller.new(@queue_url, client: client)
- @skip_delete = opts.fetch(:skip_delete, true)
- @max_number_of_messages = opts.fetch(:max_number_of_messages, 10)
- @logger = opts.fetch(:logger, Logger.new(STDOUT))
- @process_block = Concurrent::MutexAtomicReference.new
+ @skip_delete = opts.fetch(:skip_delete, true)
+ @max_number_of_messages = opts.fetch(:max_number_of_messages, 10).to_i
+ @logger = opts.fetch(:logger, Logger.new(STDOUT))
+ @process_block = Concurrent::MutexAtomicReference.new
@before_request_block = Concurrent::MutexAtomicReference.new
- @process_block = Concurrent::MutexAtomicReference.new
- @message_queue = Concurrent::Array.new
- @last_process_time = Concurrent::AtomicFixnum.new(Time.now.to_i)
- @running = Concurrent::AtomicBoolean.new(false)
+ @process_block = Concurrent::MutexAtomicReference.new
+ @message_queue = Concurrent::Array.new
+ @last_process_time = Concurrent::AtomicFixnum.new(Time.now.to_i)
+ @running = Concurrent::AtomicBoolean.new(false)
+
@max_wait_time = Concurrent::AtomicFixnum.new(
- opts.fetch(:max_wait_time, 300)
+ opts.fetch(:max_wait_time, 300).to_i
)
@max_queue_threshold = Concurrent::AtomicFixnum.new(
- opts.fetch(:max_queue_threshold, 100)
+ opts.fetch(:max_queue_threshold, 100).to_i
)
configure_before_request_block
end
def start_polling