Sha256: bd10e7ff48d5fd246f24ad534ba5e0f4a8618b17966087e7ade7fdcd4e06d315

Contents?: true

Size: 1.08 KB

Versions: 5

Compression:

Stored size: 1.08 KB

Contents

module Larva
  class MessageReplayer

    def self.reprocess_failed(topic_name, count=1)
      new(topic_name).reprocess_failed(count)
    end

    def initialize(topic_name)
      @topic_name = topic_name
    end

    def reprocess_failed(count)
      Filum.logger.info "Reprocessing #{count} message(s) for topic: #{@topic_name}"

      subscription = Propono::QueueSubscription.create(@topic_name)
      original_url = subscription.queue.url
      failed_url = subscription.failed_queue.url

      sqs = Fog::AWS::SQS.new(Propono.aws_options)
      response = sqs.receive_message( failed_url, {'MaxNumberOfMessages' => count.to_i} )
      messages = response.body['Message']
      if messages.empty?
        raise StandardError.new "Message empty"
      else
        messages.each do |msg|
          sqs_message = Propono::SqsMessage.new(msg)
          puts "Message : #{sqs_message}"
          sqs.send_message(original_url, sqs_message.to_json_with_exception(StandardError.new "Fake Exception"))
          sqs.delete_message(failed_url, msg['ReceiptHandle'])
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
larva-0.9.2 lib/larva/message_replayer.rb
larva-0.9.1 lib/larva/message_replayer.rb
larva-0.9.0 lib/larva/message_replayer.rb
larva-0.8.0 lib/larva/message_replayer.rb
larva-0.7.3 lib/larva/message_replayer.rb