lib/concurrent/reactor/tcp_sync_demux.rb in concurrent-ruby-0.2.1 vs lib/concurrent/reactor/tcp_sync_demux.rb in concurrent-ruby-0.2.2
- old
+ new
@@ -1,131 +1,131 @@
-require 'socket'
-require 'drb/acl'
-require 'functional'
-require 'concurrent/reactor'
-require 'concurrent/supervisor'
-
-module Concurrent
- class Reactor
-
- class TcpSyncDemux
-
- behavior(:sync_event_demux)
-
- DEFAULT_HOST = '127.0.0.1'
- DEFAULT_PORT = 12345
- DEFAULT_ACL = %w{deny all allow 127.0.0.1}
-
- attr_reader :host
- attr_reader :port
- attr_reader :acl
-
- def initialize(opts = {})
- @host = opts[:host] || DEFAULT_HOST
- @port = opts[:port] || DEFAULT_PORT
- @acl = ACL.new(opts[:acl] || DEFAULT_ACL)
- end
-
- def run
- raise StandardError.new('already running') if running?
- begin
- @server = TCPServer.new(@host, @port)
- return true
- rescue Exception => ex
- return false
- end
- end
-
- def stop
- begin
- @socket.close unless @socket.nil?
- rescue Exception => ex
- # suppress
- end
-
- begin
- @server.close unless @server.nil?
- rescue Exception => ex
- # suppress
- end
-
- @server = @socket = nil
- return true
- end
-
- def reset
- stop
- sleep(1)
- run
- end
-
- def running?
- return ! @server.nil?
- end
-
- def accept
- @socket = @server.accept if @socket.nil?
- return nil unless @acl.allow_socket?(@socket)
- event, args = get_message(@socket)
- return nil if event.nil?
- return Reactor::EventContext.new(event, args)
- rescue Exception => ex
- reset
- return nil
- end
-
- def respond(result, message)
- return nil if @socket.nil?
- @socket.puts(format_message(result, message))
- rescue Exception => ex
- reset
- end
-
- def self.format_message(event, *args)
- event = event.to_s.strip
- raise ArgumentError.new('nil or empty event') if event.empty?
- args = args.reduce('') do |memo, arg|
- memo << "#{arg}\r\n"
- end
- return "#{event}\r\n#{args}\r\n"
- end
-
- def format_message(*args)
- self.class.format_message(*args)
- end
-
- def self.parse_message(message)
- message = message.lines.map(&:chomp) if message.is_a?(String)
- return [nil, []] if message.nil?
- event = message.first.match(/^:?(\w+)/)
- event = event[1].to_s.downcase.to_sym unless event.nil?
- args = message.slice(1, message.length) || []
- args.pop if args.last.nil? || args.last.empty?
- return [event, args]
- end
-
- def parse_message(*args)
- self.class.parse_message(*args)
- end
-
- def self.get_message(socket)
- message = []
- while line = socket.gets
- if line.nil? || (line = line.strip).empty?
- break
- else
- message << line
- end
- end
-
- if message.empty?
- return nil
- else
- return parse_message(message)
- end
- end
- def get_message(*args) self.class.get_message(*args); end
- end
-
- TcpSyncDemultiplexer = TcpSyncDemux
- end
-end
+require 'socket'
+require 'drb/acl'
+require 'functional'
+require 'concurrent/reactor'
+require 'concurrent/supervisor'
+
+module Concurrent
+ class Reactor
+
+ class TcpSyncDemux
+
+ behavior(:sync_event_demux)
+
+ DEFAULT_HOST = '127.0.0.1'
+ DEFAULT_PORT = 12345
+ DEFAULT_ACL = %w{deny all allow 127.0.0.1}
+
+ attr_reader :host
+ attr_reader :port
+ attr_reader :acl
+
+ def initialize(opts = {})
+ @host = opts[:host] || DEFAULT_HOST
+ @port = opts[:port] || DEFAULT_PORT
+ @acl = ACL.new(opts[:acl] || DEFAULT_ACL)
+ end
+
+ def run
+ raise StandardError.new('already running') if running?
+ begin
+ @server = TCPServer.new(@host, @port)
+ return true
+ rescue Exception => ex
+ return false
+ end
+ end
+
+ def stop
+ begin
+ @socket.close unless @socket.nil?
+ rescue Exception => ex
+ # suppress
+ end
+
+ begin
+ @server.close unless @server.nil?
+ rescue Exception => ex
+ # suppress
+ end
+
+ @server = @socket = nil
+ return true
+ end
+
+ def reset
+ stop
+ sleep(1)
+ run
+ end
+
+ def running?
+ return ! @server.nil?
+ end
+
+ def accept
+ @socket = @server.accept if @socket.nil?
+ return nil unless @acl.allow_socket?(@socket)
+ event, args = get_message(@socket)
+ return nil if event.nil?
+ return Reactor::EventContext.new(event, args)
+ rescue Exception => ex
+ reset
+ return nil
+ end
+
+ def respond(result, message)
+ return nil if @socket.nil?
+ @socket.puts(format_message(result, message))
+ rescue Exception => ex
+ reset
+ end
+
+ def self.format_message(event, *args)
+ event = event.to_s.strip
+ raise ArgumentError.new('nil or empty event') if event.empty?
+ args = args.reduce('') do |memo, arg|
+ memo << "#{arg}\r\n"
+ end
+ return "#{event}\r\n#{args}\r\n"
+ end
+
+ def format_message(*args)
+ self.class.format_message(*args)
+ end
+
+ def self.parse_message(message)
+ message = message.lines.map(&:chomp) if message.is_a?(String)
+ return [nil, []] if message.nil?
+ event = message.first.match(/^:?(\w+)/)
+ event = event[1].to_s.downcase.to_sym unless event.nil?
+ args = message.slice(1, message.length) || []
+ args.pop if args.last.nil? || args.last.empty?
+ return [event, args]
+ end
+
+ def parse_message(*args)
+ self.class.parse_message(*args)
+ end
+
+ def self.get_message(socket)
+ message = []
+ while line = socket.gets
+ if line.nil? || (line = line.strip).empty?
+ break
+ else
+ message << line
+ end
+ end
+
+ if message.empty?
+ return nil
+ else
+ return parse_message(message)
+ end
+ end
+ def get_message(*args) self.class.get_message(*args); end
+ end
+
+ TcpSyncDemultiplexer = TcpSyncDemux
+ end
+end