Sha256: d2b1a72952c63267a30e2407dc63680b40591ecaaab449833fdfede968e37531

Contents?: true

Size: 973 Bytes

Versions: 14

Compression:

Stored size: 973 Bytes

Contents

module Dynflow
  module Executors
    class Parallel < Abstract
      class Worker < Actor

        def initialize(pool, transaction_adapter)
          @pool                = Type! pool, Concurrent::Actor::Reference
          @transaction_adapter = Type! transaction_adapter, TransactionAdapters::Abstract
        end

        def on_message(message)
          match message,
                (on Work::Step.(step: ~any) |
                        Work::Event.(step: ~any, event: Parallel::Event.(event: ~any)) do |step, event|
                  step.execute event
                end),
                (on Work::Finalize.(~any, any) do |sequential_manager|
                  sequential_manager.finalize
                 end)
        rescue Errors::PersistenceError => e
          @pool.tell([:handle_persistence_error, e])
        ensure
          @pool.tell([:worker_done, reference, message])
          @transaction_adapter.cleanup
        end
      end
    end
  end
end

Version data entries

14 entries across 14 versions & 1 rubygems

Version Path
dynflow-0.8.13 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.12 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.11 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.10 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.9 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.8 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.7 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.6 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.5 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.4 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.3 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.2 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.1 lib/dynflow/executors/parallel/worker.rb
dynflow-0.8.0 lib/dynflow/executors/parallel/worker.rb