lib/httpx/selector.rb in httpx-0.3.1 vs lib/httpx/selector.rb in httpx-0.4.0

- old
+ new

@@ -9,11 +9,11 @@ # # I/O monitor # class Monitor - attr_accessor :value, :interests, :readiness + attr_accessor :io, :interests, :readiness def initialize(io, interests, reactor) @io = io @interests = interests @reactor = reactor @@ -29,10 +29,11 @@ end # closes +@io+, deregisters from reactor (unless +deregister+ is false) def close(deregister = true) return if @closed + @closed = true @reactor.deregister(@io) if deregister end def closed? @@ -47,103 +48,95 @@ end def initialize @readers = {} @writers = {} - @lock = Mutex.new @__r__, @__w__ = IO.pipe @closed = false end # deregisters +io+ from selectables. def deregister(io) - @lock.synchronize do - rmonitor = @readers.delete(io) - wmonitor = @writers.delete(io) - monitor = rmonitor || wmonitor - monitor.close(false) if monitor - end + rmonitor = @readers.delete(io) + wmonitor = @writers.delete(io) + monitor = rmonitor || wmonitor + monitor.close(false) if monitor end # register +io+ for +interests+ events. def register(io, interests) readable = READABLE.include?(interests) writable = WRITABLE.include?(interests) - @lock.synchronize do - if readable - monitor = @readers[io] - if monitor - monitor.interests = interests - else - monitor = Monitor.new(io, interests, self) - end - @readers[io] = monitor - @writers.delete(io) unless writable + if readable + monitor = @readers[io] + if monitor + monitor.interests = interests + else + monitor = Monitor.new(io, interests, self) end - if writable - monitor = @writers[io] - if monitor - monitor.interests = interests - else - # reuse object - monitor = readable ? @readers[io] : Monitor.new(io, interests, self) - end - @writers[io] = monitor - @readers.delete(io) unless readable + @readers[io] = monitor + @writers.delete(io) unless writable + end + if writable + monitor = @writers[io] + if monitor + monitor.interests = interests + else + # reuse object + monitor = readable ? @readers[io] : Monitor.new(io, interests, self) end - monitor + @writers[io] = monitor + @readers.delete(io) unless readable end + monitor end # waits for read/write events for +interval+. Yields for monitors of # selected IO objects. # def select(interval) begin - r = nil - w = nil - @lock.synchronize do - r = @readers.keys - w = @writers.keys - end + r = @readers.keys + w = @writers.keys r.unshift(@__r__) readers, writers = IO.select(r, w, nil, interval) raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil? rescue IOError, SystemCallError - @lock.synchronize do - @readers.reject! { |io, _| io.closed? } - @writers.reject! { |io, _| io.closed? } - end + @readers.reject! { |io, _| io.closed? } + @writers.reject! { |io, _| io.closed? } retry end readers.each do |io| if io == @__r__ # clean up wakeups @__r__.read(@__r__.stat.size) else monitor = io.closed? ? @readers.delete(io) : @readers[io] next unless monitor + monitor.readiness = writers.delete(io) ? :rw : :r yield monitor end end if readers writers.each do |io| monitor = io.closed? ? @writers.delete(io) : @writers[io] next unless monitor + # don't double run this, the last iteration might have run this task already monitor.readiness = :w yield monitor end if writers end # Closes the selector. # def close return if @closed + @__r__.close @__w__.close rescue IOError ensure @closed = true