Sha256: a46d6ea1cf5980040a5fbb77b26b0de622329288a63984803c8e966960dee156
Contents?: true
Size: 1.05 KB
Versions: 11
Compression:
Stored size: 1.05 KB
Contents
require 'thread' module Sequel class Worker < Thread class WorkerStopError < RuntimeError; end attr_reader :queue attr_reader :errors def initialize(db = nil) @queue = Queue.new @errors = [] t = self t.abort_on_exception = true @transaction = !db.nil? db ? super {db.transaction {t.work}} : super {t.work} end def work loop {next_job} rescue WorkerStopError # signals the worker thread to stop ensure rollback! if @transaction && !@errors.empty? end def busy? @cur || !@queue.empty? end def async(proc = nil, &block) @queue << (proc || block) self end alias_method :add, :async alias_method :<<, :async def join while busy? sleep 0.1 end self.raise WorkerStopError super end private def next_job @cur = @queue.pop @cur.call rescue WorkerStopError => e raise e rescue Exception => e @errors << e ensure @cur = nil end end end
Version data entries
11 entries across 11 versions & 1 rubygems