Sha256: 0da4e03fb791a6ae2322b41db54474f65cb4bcc2456a01057bb98300b7cb682e
Contents?: true
Size: 930 Bytes
Versions: 2
Compression:
Stored size: 930 Bytes
Contents
module Dynflow module Executors class Parallel < Abstract class Worker < Actor def initialize(pool, transaction_adapter, telemetry_options = {}) @pool = Type! pool, Concurrent::Actor::Reference @transaction_adapter = Type! transaction_adapter, TransactionAdapters::Abstract @telemetry_options = telemetry_options end def on_message(work_item) ok = false Executors.run_user_code do work_item.execute ok = true end rescue Errors::PersistenceError => e @pool.tell([:handle_persistence_error, reference, e, work_item]) ok = false ensure Dynflow::Telemetry.with_instance { |t| t.increment_counter(:dynflow_worker_events, 1, @telemetry_options) } @pool.tell([:worker_done, reference, work_item]) if ok end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
dynflow-1.1.5 | lib/dynflow/executors/parallel/worker.rb |
dynflow-1.1.4 | lib/dynflow/executors/parallel/worker.rb |