Sha256: 2042d28e1f45952a79f5ec2ac846609889e31ba10ede9a69fd6a33adb303a050

Contents?: true

Size: 1.7 KB

Versions: 21

Compression:

Stored size: 1.7 KB

Contents

require 'concurrent/executor/executor_service'

module Concurrent

  # @!macro single_thread_executor
  # @!macro thread_pool_options
  # @!macro abstract_executor_service_public_api
  # @!visibility private
  class RubySingleThreadExecutor < RubyExecutorService
    include SerialExecutorService

    # @!macro single_thread_executor_method_initialize
    def initialize(opts = {})
      super
    end

    protected

    def ns_initialize(opts)
      @queue = Queue.new
      @thread = nil
      @fallback_policy = opts.fetch(:fallback_policy, :discard)
      raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
      self.auto_terminate = opts.fetch(:auto_terminate, true)
    end

    # @!visibility private
    def execute(*args, &task)
      supervise
      @queue << [args, task]
    end

    # @!visibility private
    def shutdown_execution
      @queue << :stop
      stopped_event.set unless alive?
    end

    # @!visibility private
    def kill_execution
      @queue.clear
      @thread.kill if alive?
    end

    # @!visibility private
    def alive?
      @thread && @thread.alive?
    end

    # @!visibility private
    def supervise
      @thread = new_worker_thread unless alive?
    end

    # @!visibility private
    def new_worker_thread
      Thread.new do
        Thread.current.abort_on_exception = false
        work
      end
    end

    # @!visibility private
    def work
      loop do
        task = @queue.pop
        break if task == :stop
        begin
          task.last.call(*task.first)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
      end
      stopped_event.set
    end
  end
end

Version data entries

21 entries across 19 versions & 5 rubygems

Version Path
logstash-filter-zabbix-0.1.2 vendor/bundle/jruby/1.9/gems/concurrent-ruby-0.9.2-java/lib/concurrent/executor/ruby_single_thread_executor.rb
logstash-filter-zabbix-0.1.1 vendor/bundle/jruby/1.9/gems/concurrent-ruby-0.9.2-java/lib/concurrent/executor/ruby_single_thread_executor.rb
ivanvc-logstash-input-s3-3.1.1.4 vendor/local/gems/concurrent-ruby-0.9.2-java/lib/concurrent/executor/ruby_single_thread_executor.rb
ivanvc-logstash-input-s3-3.1.1.3 vendor/local/gems/concurrent-ruby-0.9.2-java/lib/concurrent/executor/ruby_single_thread_executor.rb
ivanvc-logstash-input-s3-3.1.1.2 vendor/local/gems/concurrent-ruby-0.9.2-java/lib/concurrent/executor/ruby_single_thread_executor.rb
logstash-input-beats-2.0.2 vendor/jruby/1.9/gems/concurrent-ruby-0.9.2-java/lib/concurrent/executor/ruby_single_thread_executor.rb
logstash-input-beats-2.0.2 vendor/jruby/1.9/gems/logstash-codec-json-2.0.3/vendor/gems/concurrent-ruby-0.9.1-java/lib/concurrent/executor/ruby_single_thread_executor.rb
logstash-input-beats-2.0.2 vendor/jruby/1.9/gems/concurrent-ruby-0.9.1-java/lib/concurrent/executor/ruby_single_thread_executor.rb
logstash-codec-json-2.0.3 vendor/gems/concurrent-ruby-0.9.1-java/lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.2-java lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.2 lib/concurrent/executor/ruby_single_thread_executor.rb
logstash-input-beats-0.9.2 vendor/jruby/1.9/gems/concurrent-ruby-0.9.1-java/lib/concurrent/executor/ruby_single_thread_executor.rb
logstash-input-beats-0.9.1 vendor/jruby/1.9/gems/concurrent-ruby-0.9.1-java/lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.1 lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.1-java lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.0 lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.0-java lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.0.pre3-java lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.0.pre3 lib/concurrent/executor/ruby_single_thread_executor.rb
concurrent-ruby-0.9.0.pre2 lib/concurrent/executor/ruby_single_thread_executor.rb