lib/ruote/beanstalk/receiver.rb in ruote-beanstalk-2.1.10 vs lib/ruote/beanstalk/receiver.rb in ruote-beanstalk-2.1.11
- old
+ new
@@ -31,33 +31,33 @@
module Beanstalk
#
# An error class for error emitted by the "remote side" and received here.
#
- class BsReceiveError < RuntimeError
+ class ReceiveError < RuntimeError
attr_reader :fei
- def initialize (fei)
+ def initialize(fei)
@fei = fei
super("for #{Ruote::FlowExpressionId.to_storage_id(fei)}")
end
end
#
- # Whereas BsParticipant emits workitems (and cancelitems) to a Beanstalk
+ # Whereas ParticipantProxy emits workitems(and cancelitems) to a Beanstalk
# queue, the receiver watches a Beanstalk queue/tube.
#
# An example initialization :
#
- # Ruote::Beanstalk::BsReceiver.new(
+ # Ruote::Beanstalk::Receiver.new(
# engine, '127.0.0.1:11300', :tube => 'out')
#
#
# == workitem format
#
- # BsParticipant and BsReceiver share the same format :3
+ # ParticipantProxy and Receiver share the same format :3
#
# [ 'workitem', workitem_as_a_hash ]
# # or
# [ 'error', error_details_as_a_string ]
#
@@ -70,29 +70,29 @@
#
# == :tube
#
# Indicates to the receiver which beanstalk tube it should listen to.
#
- # Ruote::Beanstalk::BsReceiver.new(
+ # Ruote::Beanstalk::Receiver.new(
# engine, '127.0.0.1:11300', :tube => 'out')
#
- class BsReceiver < Ruote::Receiver
+ class Receiver < ::Ruote::Receiver
# cwes = context, worker, engine or storage
#
- def initialize (cwes, beanstalk, options={})
+ def initialize(cwes, beanstalk, options={})
super(cwes, options)
Thread.new do
listen(beanstalk, options['tube'] || options[:tube] || 'default')
end
end
protected
- def listen (beanstalk, tube)
+ def listen(beanstalk, tube)
con = ::Beanstalk::Connection.new(beanstalk)
con.watch(tube)
con.ignore('default') unless tube == 'default'
@@ -106,34 +106,34 @@
rescue EOFError => ee
# over
end
# Is meant to return a hash with a first element that is either
- # 'workitem', 'error' or 'launchitem' (a type).
+ # 'workitem', 'error' or 'launchitem'(a type).
# The second element depends on the type.
- # It's mappend on Ruote::Beanstalk::BsParticipant anyway.
+ # It's mappend on Ruote::Beanstalk::ParticipantProxy anyway.
#
- def decode (job)
+ def decode(job)
Rufus::Json.decode(job.body)
end
- def process (job)
+ def process(job)
type, data = decode(job)
if type == 'workitem'
- # data holds a workitem (as a Hash)
+ # data holds a workitem(as a Hash)
reply(data)
elsif type == 'error'
- # data holds a fei (FlowExpressionId) (as a Hash)
+ # data holds a fei(FlowExpressionId) (as a Hash)
@context.error_handler.action_handle(
- 'dispatch', data, BsReceiveError.new(data))
+ 'dispatch', data, ReceiveError.new(data))
elsif type == 'launchitem'
pdef, fields, variables = data