# frozen_string_literal: true require File.join(File.dirname(__FILE__), 'service') module Charrington # This service starts the process of attempting to insert a row. # It handles retries where applicable. class Process include Service include LogStash::Util::Loggable attr_reader :event, :connection, :opts, :max_retries, :schema, :retry_max_interval, :driver, :transformer attr_accessor :retry_interval, :should_retry Error = Class.new(StandardError) ProcessFailed = Class.new(Error) EventNil = Class.new(Error) def initialize(connection, event, opts = {}) raise EventNil, 'Event is nil' if event.nil? @connection = connection @event = event.to_hash @opts = opts @max_retries = opts[:max_retries] || 10 @retry_max_interval = opts[:retry_max_interval] || 2 @retry_interval = opts[:retry_initial_interval] || 2 @driver = opts[:driver] @transformer = opts[:transformer] @attempts = 1 @should_retry = true end def call while should_retry logger.info "Found transformer of #{transformer} for driver of #{driver} with event of: #{event}" transformed = case transformer when 'redshift' Charrington::TransformRedshift.call(event) else Charrington::TransformPostgres.call(event) end logger.info "Transformed event into: #{transformed}" should_retry = Charrington::Insert.call(connection, transformed, opts) break unless should_retry @attempts += 1 break if @attempts > max_retries # If we're retrying the action, sleep for the recommended interval # Double the interval for the next time through to achieve exponential backoff sleep_interval end rescue StandardError => e raise ProcessFailed, e.message ensure connection&.close @event.clear if clearable(@event) end private def sleep_interval sleep(retry_interval) doubled = retry_interval * 2 # rubocop:disable Lint/UselessAssignment retry_interval = doubled > retry_max_interval ? retry_max_interval : doubled # rubocop:enable Lint/UselessAssignment end def clearable(obj) obj.is_a?(Hash) || obj.is_a?(Array) end end end