Sha256: 9d1a36b5663f365e173b45992cfcb4de07699a46116a877bc7bcfa79bfcf05a6

Contents?: true

Size: 1.67 KB

Versions: 3

Compression:

Stored size: 1.67 KB

Contents

# frozen_string_literal: true

require 'cyclone_lariat/messages/v1/event'
require 'cyclone_lariat/messages/v1/command'
require 'cyclone_lariat/repo/messages_mapper'
require 'cyclone_lariat/messages/builder'

module CycloneLariat
  module Repo
    module Sequel
      class Messages
        attr_reader :dataset

        def initialize(dataset)
          @dataset = dataset
        end

        def enabled?
          !dataset.nil?
        end

        def disabled?
          dataset.nil?
        end

        def create(msg)
          dataset.insert MessagesMapper.to_row(msg)
        end

        def exists?(uuid:)
          dataset.where(uuid: uuid).limit(1).any?
        end

        def processed!(uuid:, error: nil)
          data = { processed_at: ::Sequel.function(:NOW) }
          data.merge!(client_error_message: error.message, client_error_details: JSON.generate(error.details)) if error

          !dataset.where(uuid: uuid).update(data).zero?
        end

        def find(uuid:)
          row = dataset.where(uuid: uuid).first
          return if row.nil?

          build MessagesMapper.from_row(row)
        end

        def each_unprocessed
          dataset.where(processed_at: nil).each do |row|
            msg = build MessagesMapper.from_row(row)
            yield(msg)
          end
        end

        def each_with_client_errors
          dataset.where { (processed_at !~ nil) & (client_error_message !~ nil) }.each do |row|
            msg = build MessagesMapper.from_row(row)
            yield(msg)
          end
        end

        private

        def build(raw)
          CycloneLariat::Messages::Builder.new(raw_message: raw).call
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
cyclone_lariat-1.0.0.rc5 lib/cyclone_lariat/repo/sequel/messages.rb
cyclone_lariat-1.0.0.rc4 lib/cyclone_lariat/repo/sequel/messages.rb
cyclone_lariat-1.0.0.rc3 lib/cyclone_lariat/repo/sequel/messages.rb