Sha256: d66b3da0694ba29e6507eadde11e9b65619e3889313aae3936d23d1e12b6ddb1

Contents?: true

Size: 1.71 KB

Versions: 18

Compression:

Stored size: 1.71 KB

Contents

require 'protobuf/rpc/servers/zmq/broker'
require 'protobuf/rpc/servers/zmq/worker'
require 'protobuf/rpc/servers/zmq/util'

module Protobuf
  module Rpc
    module Zmq
      class Server
        include ::Protobuf::Rpc::Zmq::Util

        ##
        # Class Methods
        #
        def self.run(options = {})
          log_debug { sign_message("initializing broker") }
          @broker = ::Protobuf::Rpc::Zmq::Broker.new(options)
          local_worker_threads = options[:threads]

          @worker_options = options.merge(:port => options[:port] + 1)
          log_debug { sign_message("starting server workers") }

          local_worker_threads.times do
            self.start_worker
          end

          @running = true
          log_debug { sign_message("server started") }
          while self.running? do
            @broker.poll
          end
        ensure
          @broker.teardown if @broker
        end

        def self.running?
          !!@running
        end

        def self.start_worker(failed_worker = false)
          @threads.select! { |t| t.alive? } if failed_worker

          @threads << Thread.new(self, @worker_options) { |parent_server, worker_options|
            begin
              ::Protobuf::Rpc::Zmq::Worker.new(worker_options).run 
            rescue => e
              if parent_server.running?
                log_error { parent_server.sign_message("Restart Worker on Exception: #{e.inspect}\n #{e.backtrace}") }
                parent_server.start_worker(true)
              end
            end
          }
        end

        def self.stop
          @running = false

          @threads.each do |t|
            t.join
          end
        end

        @threads ||= []
      end
    end
  end
end

Version data entries

18 entries across 18 versions & 1 rubygems

Version Path
protobuf-2.7.0-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.0 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.0.rc1-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.0.rc1 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.6-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.6 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.5-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.5 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.4-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.4 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.3-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.3 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.2-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.2 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.1-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.1 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.0-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.6.0 lib/protobuf/rpc/servers/zmq/server.rb