lib/rzmq-enhancement.rb in rzmq-enhancement-0.0.26 vs lib/rzmq-enhancement.rb in rzmq-enhancement-0.0.27

- old
+ new

@@ -8,47 +8,56 @@ IPCDIR = '/tmp' Thread.abort_on_exception = true module ZeroMQ - + # This bit will seem confusing, but we must redo grand_pusher to + # to make this more sane. def zeromq_push name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", - ctx: :default, + ctx: :push, + payload: nil, &block - grand_pusher ZMQ::PUSH, name, endpoint, ctx: ctx, &block - end + if block_given? + grand_pusher ZMQ::PUSH, name, endpoint, ctx: ctx, payload: payload, &block + else + grand_pusher(ZMQ::PUSH, name, endpoint, ctx: ctx) { payload } + end + end - # this does an endless loop as a "server" + # this does an endless loop as a "server" def zeromq_pull_server name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", - ctx: :default, + ctx: :pull, &block grand_server ZMQ::PULL, name, endpoint, ctx: ctx, bind: true, &block end # we make the request and return the response def zeromq_request name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", **opts, &block h = grand_pusher ZMQ::REQ, name, endpoint, **opts, &block - end + end def zeromq_response_server name, endpoint = "ipc://#{IPCDIR}/#{name}.ipc", ctx: :default, &block grand_server ZMQ::REP, name, endpoint, bind: true, respond: true, ctx: ctx, &block end - - private + + private_class_method def ctx_name name, opts :"#{name}.#{opts[:ctx] || :default}" end - + # TODO: We don't handle the non-block req case at all. Do we want to? + # TODO: We need to rewrite this, it works as is, but is too tricky + # TODO: to get the semantics right. + private_class_method def grand_pusher type, name, endpoint, **opts, &block init_sys ctxname = ctx_name(name,opts) h = if @ctxh[ctxname].nil? h = (@ctxh[ctxname] ||= OpenStruct.new) @@ -59,11 +68,11 @@ error_check(rc) h else @ctxh[ctxname] end - + if block_given? unless opts[:payload] # here, we get the payload from the block payload = block.(h.ctx) rc = h.push_sock.send_string(JSON.generate(payload)) @@ -78,10 +87,11 @@ end end h end + private_class_method def grand_server type, name, endpoint, **opts, &block init_sys ctxname = ctx_name(name,opts) h = (@ctxh[ctxname] ||= OpenStruct.new) h.ctx = ZMQ::Context.create(1) @@ -93,29 +103,31 @@ else h.server_sock.connect(endpoint) end error_check(rc) - loop do + loop do rc = h.server_sock.recv_string payload = '' error_check(rc) - + result = block.(JSON.parse(payload)) if opts[:respond] - rc = h.server_sock.send_string JSON.generate(result) + rc = h.server_sock.send_string JSON.generate(result) end end if block_given? h end - + + private_class_method def init_sys @ctxh ||= {} end - + + private_class_method def error_check rc if ZMQ::Util.resultcode_ok?(rc) false else - raise "Operation failed, errno [#{ZMQ::Util.errno}] description [#{ZMQ::Util.error_string}]" + raise "ZeroMQ Operation failed, errno [#{ZMQ::Util.errno}] description [#{ZMQ::Util.error_string}]" end end end