Sha256: 4cdbc4d00b1495132c81b0133e193fcd02ad29debde4a90d5b7baf01c18c93e2
Contents?: true
Size: 1.9 KB
Versions: 2
Compression:
Stored size: 1.9 KB
Contents
module BarnyardHarvester class GenericQueue def initialize(args) @queueing = args.fetch(:queueing) { raise "You must provide :queueing" } case @queueing when :sqs require "aws-sdk" @sqs_settings = args.fetch(:sqs_settings) { raise "You must provide :sqs_settings" } @sqs = AWS::SQS.new(@sqs_settings) when :rabbitmq require "bunny" @rabbitmq_settings = args.fetch(:rabbitmq_settings) { raise "You must provide :rabbitmq_settings" } @rabbitmq_settings[:logging] = true if @debug @bunny = Bunny.new(@rabbitmq_settings) @bunny.start when :hash @queues = Hash.new else raise "Unknown queueing method. #{@queuing}" end end def push(queue_name, message) case @queueing when :sqs queue = @sqs.queues.create(queue_name) queue.send_message(message) when :rabbitmq @bunny.queue(queue_name).publish(message) when :hash @queues[queue_name] = Array.new unless @queues.has_key?(queue_name) @queues[queue_name] << message File.open("#{queue_name}.yml", "w") { |file| file.puts(@queues[queue_name].to_yaml) } end end def pop(queue_name) case @queueing when :sqs msg = @sqs.queues.create(queue_name).receive_message unless msg.nil? msg.delete msg.body else nil end when :rabbitmq msg = @bunny.queue(queue_name).pop[:payload] if msg == :queue_empty return nil else msg end when :hash msg = @queue.pop File.open("#{queue_name}.yml", "w") { |file| file.puts(@queues[queue_name].to_yaml) } msg end end def empty(queue_name) while pop(queue_name) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
barnyard_harvester-0.0.13 | lib/barnyard_harvester/generic_queue.rb |
barnyard_harvester-0.0.12 | lib/barnyard_harvester/generic_queue.rb |