Sha256: 9d1a36b5663f365e173b45992cfcb4de07699a46116a877bc7bcfa79bfcf05a6
Contents?: true
Size: 1.67 KB
Versions: 3
Compression:
Stored size: 1.67 KB
Contents
# frozen_string_literal: true require 'cyclone_lariat/messages/v1/event' require 'cyclone_lariat/messages/v1/command' require 'cyclone_lariat/repo/messages_mapper' require 'cyclone_lariat/messages/builder' module CycloneLariat module Repo module Sequel class Messages attr_reader :dataset def initialize(dataset) @dataset = dataset end def enabled? !dataset.nil? end def disabled? dataset.nil? end def create(msg) dataset.insert MessagesMapper.to_row(msg) end def exists?(uuid:) dataset.where(uuid: uuid).limit(1).any? end def processed!(uuid:, error: nil) data = { processed_at: ::Sequel.function(:NOW) } data.merge!(client_error_message: error.message, client_error_details: JSON.generate(error.details)) if error !dataset.where(uuid: uuid).update(data).zero? end def find(uuid:) row = dataset.where(uuid: uuid).first return if row.nil? build MessagesMapper.from_row(row) end def each_unprocessed dataset.where(processed_at: nil).each do |row| msg = build MessagesMapper.from_row(row) yield(msg) end end def each_with_client_errors dataset.where { (processed_at !~ nil) & (client_error_message !~ nil) }.each do |row| msg = build MessagesMapper.from_row(row) yield(msg) end end private def build(raw) CycloneLariat::Messages::Builder.new(raw_message: raw).call end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems