Sha256: e99731412d4fd7ebc1025fad757507280215e06cc2e060d2fcf321f23451cf29

Contents?: true

Size: 1.53 KB

Versions: 1

Compression:

Stored size: 1.53 KB

Contents

# frozen_string_literal: true

require 'legion/transport'
require 'legion/transport/messages/task_update'
require 'legion/transport/messages/task_log'

module Legion
  module Extensions
    module Helpers
      module Task
        def generate_task_log(task_id:, runner_class: to_s, function:, **payload)
          Legion::Transport::Messages::TaskLog.new(task_id: task_id, runner_class: runner_class, function: function, entry: payload).publish
        end

        def task_update(task_id, status, **opts)
          return if task_id.nil? || status.nil?

          update_hash = { task_id: task_id, status: status }
          %i[results payload function_args payload results].each do |column|
            update_hash[column] = opts[column] if opts.key? column
          end
          Legion::Transport::Messages::TaskUpdate.new(update_hash).publish
        rescue StandardError => e
          log.fatal e.message
          log.fatal e.backtrace
          raise e
        end

        def generate_task_id(function_id:, status: 'task.queued', **opts)
          insert = { status: status, function_id: function_id }
          insert[:payload] = Legion::JSON.dump(opts[:payload]) if opts.key? :payload
          insert[:function_args] = Legion::JSON.dump(opts[:args]) if opts.key? :args
          %i[master_id parent_id relationship_id task_id].each do |column|
            insert[column] = opts[column] if opts.key? column
          end

          { success: true, task_id: Legion::Data::Model::Task.insert(insert), **insert }
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
legionio-0.2.0 lib/legion/extensions/helpers/task.rb