lib/pheme/queue_poller.rb in pheme-0.0.3 vs lib/pheme/queue_poller.rb in pheme-0.0.4
- old
+ new
@@ -1,41 +1,67 @@
module Pheme
class QueuePoller
- attr_accessor :queue_url, :queue_poller, :connection_pool_block, :poller_configuration
+ attr_accessor :queue_url, :queue_poller, :connection_pool_block, :format, :max_messages, :poller_configuration
- def initialize(queue_url:, connection_pool_block: false, poller_configuration: {})
+ def initialize(queue_url:, connection_pool_block: false, max_messages: nil, format: :json, poller_configuration: {})
raise ArgumentError, "must specify non-nil queue_url" unless queue_url.present?
@queue_url = queue_url
@queue_poller = Aws::SQS::QueuePoller.new(queue_url)
@connection_pool_block = connection_pool_block
+ @format = format
+ @max_messages = max_messages
@poller_configuration = {
wait_time_seconds: 10, # amount of time a long polling receive call can wait for a mesage before receiving a empty response (which will trigger another polling request)
idle_timeout: 20, # disconnects poller after 20 seconds of idle time
skip_delete: true, # manually delete messages
}.merge(poller_configuration || {})
+
+ if max_messages
+ queue_poller.before_request do |stats|
+ throw :stop_polling if stats.received_message_count >= max_messages
+ end
+ end
end
def poll
Pheme.log(:info, "Long-polling for messages on #{queue_url}")
with_optional_connection_pool_block do
queue_poller.poll(poller_configuration) do |message|
+ data = parse_message(message)
begin
- handle(parse_message(message))
+ handle(data)
queue_poller.delete_message(message)
rescue => e
Pheme.log(:error, "Exception: #{e.inspect}")
Pheme.log(:error, e.backtrace.join("\n"))
+ Pheme.rollbar(e, "#{self.class} failed to process message", data)
end
end
end
Pheme.log(:info, "Finished long-polling after #{@poller_configuration[:idle_timeout]} seconds.")
end
def parse_message(message)
Pheme.log(:info, "Received JSON payload: #{message.body}")
body = JSON.parse(message.body)
- parsed_body = JSON.parse(body['Message'])
- RecursiveOpenStruct.new(parsed_body, recurse_over_arrays: true)
+ case format
+ when :csv
+ parse_csv(body['Message'])
+ when :json
+ parse_json(body['Message'])
+ else
+ raise ArgumentError.new("Unknown format #{format}. Valid formats: :csv, :json.")
+ end
+ end
+
+ def parse_csv(message_contents)
+ parsed_body = SmarterCSV.process(StringIO.new(message_contents))
+ parsed_body.map{ |item| RecursiveOpenStruct.new(item, recurse_over_arrays: true) }
+ end
+
+ def parse_json(message_contents)
+ parsed_body = JSON.parse(message_contents)
+ RecursiveOpenStruct.new({wrapper: parsed_body}, recurse_over_arrays: true).wrapper
end
def handle(message)
raise NotImplementedError
end