Sha256: 139d64e173b9ba6e87841892834999fdd1ed1b1a75b5e445c1db39ec79f4290f

Contents?: true

Size: 1.67 KB

Versions: 26

Compression:

Stored size: 1.67 KB

Contents

require 'protobuf/rpc/server'
require 'protobuf/rpc/servers/zmq/util'
module Protobuf
  module Rpc
    module Zmq

      class Worker
        include ::Protobuf::Rpc::Server
        include ::Protobuf::Rpc::Zmq::Util

        ##
        # Constructor
        #
        def initialize(options = {})
          host = options[:host]
          port = options[:port]

          @zmq_context = ::ZMQ::Context.new
          @socket = @zmq_context.socket(::ZMQ::REP)
          zmq_error_check(@socket.connect("tcp://#{resolve_ip(host)}:#{port}"))

          @poller = ::ZMQ::Poller.new
          @poller.register(@socket, ::ZMQ::POLLIN)
        end

        ##
        # Instance Methods
        #
        def handle_request(socket)
          @request_data = ''
          zmq_error_check(socket.recv_string(@request_data))
          log_debug { sign_message("handling request") } if !@request_data.nil?
        end

        def run
          while ::Protobuf::Rpc::Zmq::Server.running? do
            # poll for 1_000 milliseconds then continue looping
            # This lets us see whether we need to die
            @poller.poll(1_000)
            @poller.readables.each do |socket|
              initialize_request!
              handle_request(socket)
              handle_client unless @request_data.nil?
            end
          end
        ensure
          @socket.close
          @zmq_context.terminate
        end

        def send_data
          response_data = @response.is_a?(::Protobuf::Message) ? @response.serialize_to_string : @response.to_s
          @stats.response_size = response_data.size
          zmq_error_check(@socket.send_string(response_data))
        end
      end

    end
  end
end

Version data entries

26 entries across 26 versions & 1 rubygems

Version Path
protobuf-2.4.2-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.4.2 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.4.1-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.4.1 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.4.0-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.4.0 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.3.2-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.3.2 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.3.1-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.3.1 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.3.0-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.3.0 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.7-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.7 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.6-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.6 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.5-java lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.5 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.4 lib/protobuf/rpc/servers/zmq/worker.rb
protobuf-2.2.3 lib/protobuf/rpc/servers/zmq/worker.rb