lib/pushyd/consumer.rb in pushyd-0.25.0 vs lib/pushyd/consumer.rb in pushyd-0.26.0
- old
+ new
@@ -3,32 +3,25 @@
# class ShouterChannelClosed < StandardError; end
# class ShouterPreconditionFailed < StandardError; end
# class ShouterInterrupted < StandardError; end
class ConsumerError < StandardError; end
class ConsumerRuleMissing < StandardError; end
+ class ConsumerSubscribeError < StandardError; end
class Consumer < BmcDaemonLib::MqConsumer
#include ::NewRelic::Agent::Instrumentation::ControllerInstrumentation
include Shared::HmacSignature
- attr_accessor :logger
- def initialize(conn, rule_name, rule)
+ def initialize(channel, rule_name, rule)
+ # Init MqConsumer
+ log_pipe :consumer
+ super
+
# Init
@queue = nil
- @conn = conn
@rule = rule
@rule_name = rule_name
-
- # Prepare logger
- log_pipe :consumer
-
- # Create channel, prefetch only one message at a time
- @channel = @conn.create_channel
- @channel.prefetch(AMQP_PREFETCH)
-
- # OK
- log_info "Consumer initialized"
end
protected
def log_context
@@ -133,17 +126,16 @@
# rescue Bunny::ChannelAlreadyClosed => ex
# error "channel_ackit[#{@channel.id}.#{tag}]: exception: ChannelAlreadyClosed"
# rescue StandardError => ex
# log_debug "channel_ackit[#{@channel.id}.#{tag}]: exception: #{ex.inspect}"
- # # fail PushyDaemon::EndpointSubscribeError, "unhandled (#{e.inspect})"
+ # # fail PushyDaemon::ConsumerSubscribeError, "unhandled (#{e.inspect})"
# else
# log_debug "channel_ackit[#{@channel.id}.#{tag}]: done"
end
# NewRelic instrumentation
#add_transaction_tracer :receive, category: :task
#add_transaction_tracer :propagate, category: :task
-
end
end