Sha256: 60b94ee459a38b828fd7f2fc8802bdfd36840aae9aed0aa0f1a80e4414bf8dff

Contents?: true

Size: 1.79 KB

Versions: 9

Compression:

Stored size: 1.79 KB

Contents

require 'protobuf/rpc/servers/zmq/broker'
require 'protobuf/rpc/servers/zmq/worker'
require 'protobuf/rpc/servers/zmq/util'

module Protobuf
  module Rpc
    module Zmq
      class Server
        include ::Protobuf::Rpc::Zmq::Util

        ##
        # Class Methods
        #
        def self.run(options = {})
          @options = options

          unless options[:workers_only]
            log_debug { sign_message("initializing broker") }
            @broker = ::Protobuf::Rpc::Zmq::Broker.new(options)
          end

          local_worker_threads = options[:threads]
          log_debug { sign_message("starting server workers") }

          @running = true
          local_worker_threads.times do
            self.start_worker
          end

          log_debug { sign_message("server started") }
          while self.running? do
            if options[:workers_only]
              sleep 5
              Thread.pass
            else
              @broker.poll
            end
          end
        ensure
          @broker.teardown if @broker
        end

        def self.running?
          !!@running
        end

        def self.start_worker
          @threads << Thread.new(@options) { |options|
            begin
              ::Protobuf::Rpc::Zmq::Worker.new(options).run
            rescue => e
              message =  "Worker Failed, spawning new worker: #{e.inspect}\n #{e.backtrace.join($/)}"
              $stderr.puts message
              log_error { message }

              retry if ::Protobuf::Rpc::Zmq::Server.running?
            end
          }
        end

        def self.stop
          @running = false

          @threads.each do |t|
            t.join(5) || t.kill
          end
        end

        def self.threads
          @threads
        end

        @threads ||= []
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
protobuf-2.7.12 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.11-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.11 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.10-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.10 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.9-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.9 lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.8-java lib/protobuf/rpc/servers/zmq/server.rb
protobuf-2.7.8 lib/protobuf/rpc/servers/zmq/server.rb