Sha256: 41f5e3f80be3c65ce378e96427dce23164769e97bb8faa35b3fbbc9664a4cd99

Contents?: true

Size: 1.65 KB

Versions: 22

Compression:

Stored size: 1.65 KB

Contents

require 'protobuf/rpc/server'
require 'protobuf/rpc/servers/zmq/util'
module Protobuf
  module Rpc
    module Zmq

      class Worker
        include ::Protobuf::Rpc::Server
        include ::Protobuf::Rpc::Zmq::Util

        ##
        # Constructor
        #
        def initialize(options = {})
          host = options[:host]
          port = options[:port]

          @zmq_context = ::ZMQ::Context.new
          @socket = @zmq_context.socket(::ZMQ::REP)
          zmq_error_check(@socket.connect("tcp://#{resolve_ip(host)}:#{port}"))

          @poller = ::ZMQ::Poller.new
          @poller.register(@socket, ::ZMQ::POLLIN)
        end

        ##
        # Instance Methods
        #
        def handle_request(socket)
          @request_data = ''
          zmq_error_check(socket.recv_string(@request_data))
          log_debug { sign_message("handling request") } unless @request_data.nil?
        end

        def run
          while ::Protobuf::Rpc::Zmq::Server.running? do
            # poll for 1_000 milliseconds then continue looping
            # This lets us see whether we need to die
            @poller.poll(1_000)
            @poller.readables.each do |socket|
              initialize_request!
              handle_request(socket)
              handle_client unless @request_data.nil?
            end
          end
        ensure
          @socket.close
          @zmq_context.terminate
        end

        def send_data
          response_data = @response.to_s # to_s is aliases as serialize_to_string in Message
          @stats.response_size = response_data.size
          zmq_error_check(@socket.send_string(response_data))
        end
      end

    end
  end
end

Version data entries

22 entries across 22 versions & 1 rubygems

Version Path
protobuf-2.6.6-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.6 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.5-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.5 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.4-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.4 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.3-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.3 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.2-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.2 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.1-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.1 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.0-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.6.0 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.5.5-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.5.5 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.5.4-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.5.4 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.5.3 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.5.2-java lib/protobuf/rpc/servers/zmq/worker.rb