lib/zmachine/connection.rb in zmachine-0.2.1 vs lib/zmachine/connection.rb in zmachine-0.3.0
- old
+ new
@@ -1,71 +1,75 @@
java_import java.io.IOException
java_import java.nio.ByteBuffer
java_import java.nio.channels.SelectionKey
+require 'zmachine'
+
module ZMachine
class Connection
extend Forwardable
attr_accessor :channel
- attr_accessor :args
- attr_accessor :block
- def self.new(*args, &block)
+ def self.new(*args)
allocate.instance_eval do
- initialize(*args, &block)
- @args, @block = args, block
+ initialize(*args)
+ @args = args
post_init
self
end
end
# channel type dispatch
- def bind(address, port_or_type)
+ def bind(address, port_or_type, &block)
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
- if address =~ %r{\w+://}
- @channel = ZMQChannel.new(port_or_type)
- @channel.bind(address)
- else
- @channel = TCPChannel.new
- @channel.bind(address, port_or_type)
- end
+ klass = (address =~ %r{\w+://}) ? ZMQChannel : TCPChannel
+ @channel = klass.new
+ @channel.bind(address, port_or_type)
+ @block = block
self
end
- def connect(address, port_or_type)
+ def connect(address, port_or_type, &block)
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
- if address.nil? or address =~ %r{\w+://}
- @channel = ZMQChannel.new(port_or_type)
- @channel.connect(address) if address
- else
- @channel = TCPChannel.new
- @channel.connect(address, port_or_type)
- end
- if @connect_timeout
- @timer = ZMachine.add_timer(@connect_timeout) do
- ZMachine.reactor.close_connection(self)
- end
- end
+ klass = (address.nil? || address =~ %r{\w+://}) ? ZMQChannel : TCPChannel
+ @channel = klass.new
+ @channel.connect(address, port_or_type) if address
+ yield self if block_given?
+ renew_timer
self
end
+ # callbacks
+ def connection_accepted
+ end
+
+ def connection_completed
+ end
+
+ def post_init
+ end
+
+ def receive_data(data)
+ end
+
+ def unbind
+ end
+
+ # EventMachine Connection API
+
def_delegator :@channel, :bound?
def_delegator :@channel, :closed?
def_delegator :@channel, :connected?
def_delegator :@channel, :connection_pending?
- # EventMachine Connection API
-
- def _not_implemented
- raise RuntimeError.new("API call not implemented! #{caller[0]}")
- end
-
def close_connection(after_writing = false)
- @channel.close(after_writing)
+ @channel.close(after_writing) do
+ ZMachine.close_connection(self)
+ end
end
alias :close :close_connection
def close_connection_after_writing
@@ -82,164 +86,55 @@
@inactivity_timeout = value
end
alias :set_comm_inactivity_timeout :comm_inactivity_timeout=
- def connection_accepted(channel)
- end
-
- def connection_completed
- end
-
- def detach
- _not_implemented
- end
-
- def error?
- _not_implemented
- end
-
def get_idle_time
(System.nano_time - @last_activity) / 1_000_000
end
- def get_peer_cert
- _not_implemented
- end
-
def get_peername
if peer = @channel.peer
::Socket.pack_sockaddr_in(*peer)
end
end
- def get_pid
- _not_implemented
- end
-
- def get_proxied_bytes
- _not_implemented
- end
-
- def get_sock_opt(level, option)
- _not_implemented
- end
-
- def get_sockname
- _not_implemented
- end
-
- def get_status
- _not_implemented
- end
-
- def notify_readable=(mode)
- _not_implemented
- end
-
def notify_readable?
true
end
- def notify_writable=(mode)
- _not_implemented
- end
-
def notify_writable?
@channel.can_send?
end
- def pause
- _not_implemented
- end
-
- def paused?
- _not_implemented
- end
-
def pending_connect_timeout=(value)
@connect_timeout = value
end
alias :set_pending_connect_timeout :pending_connect_timeout=
- def post_init
- end
-
- def proxy_completed
- _not_implemented
- end
-
- def proxy_incoming_to(conn, bufsize = 0)
- _not_implemented
- end
-
- def proxy_target_unbound
- _not_implemented
- end
-
- def receive_data(data)
- end
-
def reconnect(server, port_or_type)
ZMachine.reconnect(server, port_or_type, self)
end
- def resume
- _not_implemented
- end
-
def send_data(data)
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
+ data = data.to_java_bytes if data.is_a?(String) # EM compat
@channel.send_data(data)
update_events
end
- def send_datagram(data, recipient_address, recipient_port)
- _not_implemented
- end
-
- def send_file_data(filename)
- _not_implemented
- end
-
- def set_sock_opt(level, optname, optval)
- _not_implemented
- end
-
- def ssl_handshake_completed
- _not_implemented
- end
-
- def ssl_verify_peer(cert)
- _not_implemented
- end
-
- def start_tls(args = {})
- _not_implemented
- end
-
- def stop_proxying
- _not_implemented
- end
-
- def stream_file_data(filename, args = {})
- _not_implemented
- end
-
- def unbind
- end
-
# triggers
def acceptable!
client = @channel.accept
- connection_accepted(client) if client.connected?
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, client: client) if ZMachine.debug
- self.class.new(*@args, &@block).tap do |instance|
- instance.channel = client
- end
+ connection = self.class.new(*@args)
+ connection.channel = client
+ @block.call(connection) if @block
+ connection.connection_accepted
+ connection
end
def connectable!
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
@channel.finish_connecting
@@ -271,12 +166,17 @@
def register(selector)
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, fd: @channel.selectable_fd) if ZMachine.debug
@channel_key ||= @channel.selectable_fd.register(selector, current_events, self)
end
+ def valid?
+ @channel_key &&
+ @channel_key.valid?
+ end
+
def update_events
- return unless @channel_key
+ return unless valid?
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
@channel_key.interest_ops(current_events)
end
def current_events
@@ -299,30 +199,35 @@
return events
end
def process_events
- return unless @channel_key
+ return unless valid?
ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug
if @channel_key.connectable?
connectable!
elsif @channel_key.acceptable?
acceptable!
else
writable! if @channel_key.writable?
readable! if @channel_key.readable?
end
+ rescue Java::JavaNioChannels::CancelledKeyException
+ # channel may have been closed by write handler. ignore exception and
+ # wait for cleanup
end
def mark_active!
@last_activity = System.nano_time
renew_timer if @inactivity_timeout
end
def renew_timer
@timer.cancel if @timer
- @timer = ZMachine.add_timer(@inactivity_timeout) do
- ZMachine.reactor.close_connection(self)
+ if connection_pending? && @connect_timeout
+ @timer = ZMachine.add_timer(@connect_timeout) { ZMachine.close_connection(self, Errno::ETIMEDOUT) }
+ elsif @inactivity_timeout
+ @timer = ZMachine.add_timer(@inactivity_timeout) { ZMachine.close_connection(self, Errno::ETIMEDOUT) }
end
end
end
end