lib/ruote/beanstalk/receiver.rb in ruote-beanstalk-2.1.10 vs lib/ruote/beanstalk/receiver.rb in ruote-beanstalk-2.1.11

- old
+ new

@@ -31,33 +31,33 @@ module Beanstalk # # An error class for error emitted by the "remote side" and received here. # - class BsReceiveError < RuntimeError + class ReceiveError < RuntimeError attr_reader :fei - def initialize (fei) + def initialize(fei) @fei = fei super("for #{Ruote::FlowExpressionId.to_storage_id(fei)}") end end # - # Whereas BsParticipant emits workitems (and cancelitems) to a Beanstalk + # Whereas ParticipantProxy emits workitems(and cancelitems) to a Beanstalk # queue, the receiver watches a Beanstalk queue/tube. # # An example initialization : # - # Ruote::Beanstalk::BsReceiver.new( + # Ruote::Beanstalk::Receiver.new( # engine, '127.0.0.1:11300', :tube => 'out') # # # == workitem format # - # BsParticipant and BsReceiver share the same format :3 + # ParticipantProxy and Receiver share the same format :3 # # [ 'workitem', workitem_as_a_hash ] # # or # [ 'error', error_details_as_a_string ] # @@ -70,29 +70,29 @@ # # == :tube # # Indicates to the receiver which beanstalk tube it should listen to. # - # Ruote::Beanstalk::BsReceiver.new( + # Ruote::Beanstalk::Receiver.new( # engine, '127.0.0.1:11300', :tube => 'out') # - class BsReceiver < Ruote::Receiver + class Receiver < ::Ruote::Receiver # cwes = context, worker, engine or storage # - def initialize (cwes, beanstalk, options={}) + def initialize(cwes, beanstalk, options={}) super(cwes, options) Thread.new do listen(beanstalk, options['tube'] || options[:tube] || 'default') end end protected - def listen (beanstalk, tube) + def listen(beanstalk, tube) con = ::Beanstalk::Connection.new(beanstalk) con.watch(tube) con.ignore('default') unless tube == 'default' @@ -106,34 +106,34 @@ rescue EOFError => ee # over end # Is meant to return a hash with a first element that is either - # 'workitem', 'error' or 'launchitem' (a type). + # 'workitem', 'error' or 'launchitem'(a type). # The second element depends on the type. - # It's mappend on Ruote::Beanstalk::BsParticipant anyway. + # It's mappend on Ruote::Beanstalk::ParticipantProxy anyway. # - def decode (job) + def decode(job) Rufus::Json.decode(job.body) end - def process (job) + def process(job) type, data = decode(job) if type == 'workitem' - # data holds a workitem (as a Hash) + # data holds a workitem(as a Hash) reply(data) elsif type == 'error' - # data holds a fei (FlowExpressionId) (as a Hash) + # data holds a fei(FlowExpressionId) (as a Hash) @context.error_handler.action_handle( - 'dispatch', data, BsReceiveError.new(data)) + 'dispatch', data, ReceiveError.new(data)) elsif type == 'launchitem' pdef, fields, variables = data