lib/pitchfork/worker.rb in pitchfork-0.5.0 vs lib/pitchfork/worker.rb in pitchfork-0.6.0
- old
+ new
@@ -1,7 +1,7 @@
# -*- encoding: binary -*-
-require "raindrops"
+require 'pitchfork/shared_memory'
module Pitchfork
# This class and its members can be considered a stable interface
# and will not change in a backwards-incompatible fashion between
# releases of pitchfork. Knowledge of this class is generally not
@@ -22,11 +22,12 @@
@mold = false
@to_io = @master = nil
@exiting = false
@requests_count = 0
if nr
- build_raindrops(nr)
+ @deadline_drop = SharedMemory.worker_deadline(nr)
+ self.deadline = 0
else
promoted!
end
end
@@ -45,11 +46,11 @@
def pending?
@master.nil?
end
def outdated?
- CURRENT_GENERATION_DROP[0] > @generation
+ SharedMemory.current_generation > @generation
end
def update(message)
message.class.members.each do |member|
send("#{member}=", message.public_send(member))
@@ -71,11 +72,11 @@
end
def finish_promotion(control_socket)
message = Message::MoldReady.new(@nr, @pid, generation)
control_socket.sendmsg(message)
- CURRENT_GENERATION_DROP[0] = @generation
+ SharedMemory.current_generation = @generation
end
def promote(generation)
send_message_nonblock(Message::PromoteWorker.new(generation))
end
@@ -90,12 +91,12 @@
end
def promoted!
@mold = true
@nr = nil
- @drop_offset = 0
- @deadline_drop = MOLD_DROP
+ @deadline_drop = SharedMemory.mold_deadline
+ self.deadline = 0
self
end
def mold?
@mold
@@ -171,24 +172,16 @@
self.deadline = Pitchfork.time_now(true) + timeout
end
# called in the worker process
def deadline=(value) # :nodoc:
- if mold?
- MOLD_DROP[0] = value
- else
- @deadline_drop[@drop_offset] = value
- end
+ @deadline_drop.value = value
end
# called in the master process
def deadline # :nodoc:
- if mold?
- MOLD_DROP[0]
- else
- @deadline_drop[@drop_offset]
- end
+ @deadline_drop.value
end
def reset
@requests_count = 0
end
@@ -197,10 +190,11 @@
@requests_count += by
end
# called in both the master (reaping worker) and worker (SIGQUIT handler)
def close # :nodoc:
+ self.deadline = 0
@master.close if @master
@to_io.close if @to_io
end
def create_socketpair!
@@ -223,37 +217,12 @@
case @master.sendmsg_nonblock(message, exception: false)
when :wait_writable
else
success = true
end
- rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNREFUSED
+ rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ENOTCONN
# worker will be reaped soon
end
success
- end
-
- MOLD_DROP = Raindrops.new(1)
- CURRENT_GENERATION_DROP = Raindrops.new(1)
- PER_DROP = Raindrops::PAGE_SIZE / Raindrops::SIZE
- TICK_DROPS = []
-
- class << self
- # Since workers are created from another process, we have to
- # pre-allocate the drops so they are shared between everyone.
- #
- # However this doesn't account for TTIN signals that increase the
- # number of workers, but we should probably remove that feature too.
- def preallocate_drops(workers_count)
- 0.upto(workers_count / PER_DROP) do |i|
- TICK_DROPS[i] = Raindrops.new(PER_DROP)
- end
- end
- end
-
- def build_raindrops(drop_nr)
- drop_index = drop_nr / PER_DROP
- @drop_offset = drop_nr % PER_DROP
- @deadline_drop = TICK_DROPS[drop_index] ||= Raindrops.new(PER_DROP)
- @deadline_drop[@drop_offset] = 0
end
end
end