Sha256: b7c041fdf073de1e6ebc221a3210c6d22a11e88f0f0e46e4df2a798ff6ef33e6
Contents?: true
Size: 1.04 KB
Versions: 2
Compression:
Stored size: 1.04 KB
Contents
require "thread" module Sequel class Worker < Thread attr_reader :queue attr_reader :errors def initialize(db = nil) @queue = Queue.new @errors = [] t = self t.abort_on_exception = true @transaction = !db.nil? db ? super {db.transaction {t.work}} : super {t.work} end def work loop {next_job} rescue Sequel::Error::WorkerStop # signals the worker thread to stop ensure raise Sequel::Error::Rollback if @transaction && !@errors.empty? end def busy? @cur || !@queue.empty? end def async(proc = nil, &block) @queue << (proc || block) self end alias_method :add, :async alias_method :<<, :async def join while busy? sleep 0.1 end self.raise Error::WorkerStop super end private def next_job @cur = @queue.pop @cur.call rescue Error::WorkerStop => e raise e rescue Exception => e @errors << e ensure @cur = nil end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
sequel_core-1.5.0 | lib/sequel_core/worker.rb |
sequel_core-1.5.1 | lib/sequel_core/worker.rb |