lib/httpx/selector.rb in httpx-0.3.1 vs lib/httpx/selector.rb in httpx-0.4.0
- old
+ new
@@ -9,11 +9,11 @@
#
# I/O monitor
#
class Monitor
- attr_accessor :value, :interests, :readiness
+ attr_accessor :io, :interests, :readiness
def initialize(io, interests, reactor)
@io = io
@interests = interests
@reactor = reactor
@@ -29,10 +29,11 @@
end
# closes +@io+, deregisters from reactor (unless +deregister+ is false)
def close(deregister = true)
return if @closed
+
@closed = true
@reactor.deregister(@io) if deregister
end
def closed?
@@ -47,103 +48,95 @@
end
def initialize
@readers = {}
@writers = {}
- @lock = Mutex.new
@__r__, @__w__ = IO.pipe
@closed = false
end
# deregisters +io+ from selectables.
def deregister(io)
- @lock.synchronize do
- rmonitor = @readers.delete(io)
- wmonitor = @writers.delete(io)
- monitor = rmonitor || wmonitor
- monitor.close(false) if monitor
- end
+ rmonitor = @readers.delete(io)
+ wmonitor = @writers.delete(io)
+ monitor = rmonitor || wmonitor
+ monitor.close(false) if monitor
end
# register +io+ for +interests+ events.
def register(io, interests)
readable = READABLE.include?(interests)
writable = WRITABLE.include?(interests)
- @lock.synchronize do
- if readable
- monitor = @readers[io]
- if monitor
- monitor.interests = interests
- else
- monitor = Monitor.new(io, interests, self)
- end
- @readers[io] = monitor
- @writers.delete(io) unless writable
+ if readable
+ monitor = @readers[io]
+ if monitor
+ monitor.interests = interests
+ else
+ monitor = Monitor.new(io, interests, self)
end
- if writable
- monitor = @writers[io]
- if monitor
- monitor.interests = interests
- else
- # reuse object
- monitor = readable ? @readers[io] : Monitor.new(io, interests, self)
- end
- @writers[io] = monitor
- @readers.delete(io) unless readable
+ @readers[io] = monitor
+ @writers.delete(io) unless writable
+ end
+ if writable
+ monitor = @writers[io]
+ if monitor
+ monitor.interests = interests
+ else
+ # reuse object
+ monitor = readable ? @readers[io] : Monitor.new(io, interests, self)
end
- monitor
+ @writers[io] = monitor
+ @readers.delete(io) unless readable
end
+ monitor
end
# waits for read/write events for +interval+. Yields for monitors of
# selected IO objects.
#
def select(interval)
begin
- r = nil
- w = nil
- @lock.synchronize do
- r = @readers.keys
- w = @writers.keys
- end
+ r = @readers.keys
+ w = @writers.keys
r.unshift(@__r__)
readers, writers = IO.select(r, w, nil, interval)
raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil?
rescue IOError, SystemCallError
- @lock.synchronize do
- @readers.reject! { |io, _| io.closed? }
- @writers.reject! { |io, _| io.closed? }
- end
+ @readers.reject! { |io, _| io.closed? }
+ @writers.reject! { |io, _| io.closed? }
retry
end
readers.each do |io|
if io == @__r__
# clean up wakeups
@__r__.read(@__r__.stat.size)
else
monitor = io.closed? ? @readers.delete(io) : @readers[io]
next unless monitor
+
monitor.readiness = writers.delete(io) ? :rw : :r
yield monitor
end
end if readers
writers.each do |io|
monitor = io.closed? ? @writers.delete(io) : @writers[io]
next unless monitor
+
# don't double run this, the last iteration might have run this task already
monitor.readiness = :w
yield monitor
end if writers
end
# Closes the selector.
#
def close
return if @closed
+
@__r__.close
@__w__.close
rescue IOError
ensure
@closed = true