Sha256: f732d897d5cfc524675c5092c868aa219f0bf853930186a6181b87939a14a149

Contents?: true

Size: 1.32 KB

Versions: 1

Compression:

Stored size: 1.32 KB

Contents

# frozen_string_literal: true

require_relative 'event'

module CycloneLariat
  class EventsRepo
    attr_reader :dataset

    def initialize(dataset)
      @dataset = dataset
    end

    def create(event)
      dataset.insert(
        uuid: event.uuid,
        type: event.type,
        publisher: event.publisher,
        data: JSON.generate(event.data),
        error_message: event.error&.message,
        error_details: JSON.generate(event.error&.details),
        version: event.version,
        sent_at: event.sent_at
      )
    end

    def exists?(uuid:)
      dataset.where(uuid: uuid).limit(1).any?
    end

    def processed!(uuid:)
      !dataset.where(uuid: uuid).update(processed_at: Sequel.function(:NOW)).zero?
    end

    def find(uuid:)
      raw = dataset.where(uuid: uuid).first
      raw[:data]          = JSON.parse(raw[:data], symbolize_names: true)
      raw[:error_details] = JSON.parse(raw[:error_details], symbolize_names: true) if raw[:error_details]
      Event.wrap raw
    end

    def each_unprocessed
      dataset.where(processed_at: nil).each do |raw|
        raw[:data]          = JSON.parse(raw[:data], symbolize_names: true)
        raw[:error_details] = JSON.parse(raw[:error_details], symbolize_names: true) if raw[:error_details]
        event = Event.wrap(raw)
        yield(event)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
cyclone_lariat-0.2.1 lib/cyclone_lariat/events_repo.rb