Sha256: 7b2c317333f0895684956a155b3e10e5f4798b1e00b1b1bd60bf54df876d75e0

Contents?: true

Size: 1.72 KB

Versions: 1

Compression:

Stored size: 1.72 KB

Contents

# frozen_string_literal: true

require 'securerandom'

module ActiveConcurrency
  module Base
    class Worker

      attr_reader :name
      attr_accessor :mutex

      def initialize(name: nil)
        @name = "#{prefix}_worker_#{name || SecureRandom.uuid}"
        @queue = Queue.new
      end

      def clear
        @queue.clear
      end

      def close
        @queue.close
      end

      def closed?
        @queue.closed?
      end

      def empty?
        @queue.empty?
      end

      def exit
        schedule { throw :exit }
      end

      def lock
        return true if process? || mutex.nil? || mutex.locked?

        mutex.lock
      end

      def schedule(*args, &block)
        @queue << [block, args]
      end

      # rubocop:disable Lint/UnreachableCode
      def shutdown
        exit
        lock
        join
        close
        exit!
      end
      # rubocop:enable Lint/UnreachableCode

      def size
        @queue.size
      end

      private

      def execute
        job, args = @queue.pop

        if mutex.nil? || process?
          job.call(*args)
        else
          mutex.synchronize { job.call(*args) }
        end
      end

      def prefix
        @prefix ||= begin
          klass = self.class.name.split('::')[1]
          klass.downcase
        end
      end

      def perform
        catch(:exit) do
          loop do
            break if closed? || empty?

            begin
              execute
            rescue Exception => e
              puts "#{e.class.name}: #{e.message}\n#{e.backtrace.join("\n")}"
            end
          end
        end
      end

      def process?
        prefix == 'processes'
      end

      def thread?
        prefix == 'threads'
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
active_concurrency-0.1.0 lib/active_concurrency/base/worker.rb