lib/pitchfork/worker.rb in pitchfork-0.5.0 vs lib/pitchfork/worker.rb in pitchfork-0.6.0

- old
+ new

@@ -1,7 +1,7 @@ # -*- encoding: binary -*- -require "raindrops" +require 'pitchfork/shared_memory' module Pitchfork # This class and its members can be considered a stable interface # and will not change in a backwards-incompatible fashion between # releases of pitchfork. Knowledge of this class is generally not @@ -22,11 +22,12 @@ @mold = false @to_io = @master = nil @exiting = false @requests_count = 0 if nr - build_raindrops(nr) + @deadline_drop = SharedMemory.worker_deadline(nr) + self.deadline = 0 else promoted! end end @@ -45,11 +46,11 @@ def pending? @master.nil? end def outdated? - CURRENT_GENERATION_DROP[0] > @generation + SharedMemory.current_generation > @generation end def update(message) message.class.members.each do |member| send("#{member}=", message.public_send(member)) @@ -71,11 +72,11 @@ end def finish_promotion(control_socket) message = Message::MoldReady.new(@nr, @pid, generation) control_socket.sendmsg(message) - CURRENT_GENERATION_DROP[0] = @generation + SharedMemory.current_generation = @generation end def promote(generation) send_message_nonblock(Message::PromoteWorker.new(generation)) end @@ -90,12 +91,12 @@ end def promoted! @mold = true @nr = nil - @drop_offset = 0 - @deadline_drop = MOLD_DROP + @deadline_drop = SharedMemory.mold_deadline + self.deadline = 0 self end def mold? @mold @@ -171,24 +172,16 @@ self.deadline = Pitchfork.time_now(true) + timeout end # called in the worker process def deadline=(value) # :nodoc: - if mold? - MOLD_DROP[0] = value - else - @deadline_drop[@drop_offset] = value - end + @deadline_drop.value = value end # called in the master process def deadline # :nodoc: - if mold? - MOLD_DROP[0] - else - @deadline_drop[@drop_offset] - end + @deadline_drop.value end def reset @requests_count = 0 end @@ -197,10 +190,11 @@ @requests_count += by end # called in both the master (reaping worker) and worker (SIGQUIT handler) def close # :nodoc: + self.deadline = 0 @master.close if @master @to_io.close if @to_io end def create_socketpair! @@ -223,37 +217,12 @@ case @master.sendmsg_nonblock(message, exception: false) when :wait_writable else success = true end - rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNREFUSED + rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ENOTCONN # worker will be reaped soon end success - end - - MOLD_DROP = Raindrops.new(1) - CURRENT_GENERATION_DROP = Raindrops.new(1) - PER_DROP = Raindrops::PAGE_SIZE / Raindrops::SIZE - TICK_DROPS = [] - - class << self - # Since workers are created from another process, we have to - # pre-allocate the drops so they are shared between everyone. - # - # However this doesn't account for TTIN signals that increase the - # number of workers, but we should probably remove that feature too. - def preallocate_drops(workers_count) - 0.upto(workers_count / PER_DROP) do |i| - TICK_DROPS[i] = Raindrops.new(PER_DROP) - end - end - end - - def build_raindrops(drop_nr) - drop_index = drop_nr / PER_DROP - @drop_offset = drop_nr % PER_DROP - @deadline_drop = TICK_DROPS[drop_index] ||= Raindrops.new(PER_DROP) - @deadline_drop[@drop_offset] = 0 end end end