Sha256: f732d897d5cfc524675c5092c868aa219f0bf853930186a6181b87939a14a149
Contents?: true
Size: 1.32 KB
Versions: 1
Compression:
Stored size: 1.32 KB
Contents
# frozen_string_literal: true require_relative 'event' module CycloneLariat class EventsRepo attr_reader :dataset def initialize(dataset) @dataset = dataset end def create(event) dataset.insert( uuid: event.uuid, type: event.type, publisher: event.publisher, data: JSON.generate(event.data), error_message: event.error&.message, error_details: JSON.generate(event.error&.details), version: event.version, sent_at: event.sent_at ) end def exists?(uuid:) dataset.where(uuid: uuid).limit(1).any? end def processed!(uuid:) !dataset.where(uuid: uuid).update(processed_at: Sequel.function(:NOW)).zero? end def find(uuid:) raw = dataset.where(uuid: uuid).first raw[:data] = JSON.parse(raw[:data], symbolize_names: true) raw[:error_details] = JSON.parse(raw[:error_details], symbolize_names: true) if raw[:error_details] Event.wrap raw end def each_unprocessed dataset.where(processed_at: nil).each do |raw| raw[:data] = JSON.parse(raw[:data], symbolize_names: true) raw[:error_details] = JSON.parse(raw[:error_details], symbolize_names: true) if raw[:error_details] event = Event.wrap(raw) yield(event) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
cyclone_lariat-0.2.1 | lib/cyclone_lariat/events_repo.rb |