Sha256: 664f6d70b21bc3e452e7995954993a3ea8129ed23b5250e7236f35ac64797a0c

Contents?: true

Size: 1.97 KB

Versions: 7

Compression:

Stored size: 1.97 KB

Contents

require File.join(File.dirname(__FILE__), "service")

module Charrington
  class Process
    # This service starts the process of attempting to insert a row.
    # It handles retries where applicable.

    include Service
    attr_reader :event, :connection, :opts, :max_retries, :schema, :retry_max_interval, :driver
    attr_accessor :retry_interval, :should_retry

    Error = Class.new(StandardError)
    ProcessFailed = Class.new(Error)
    EventNil = Class.new(Error)

    def initialize(connection, event, opts={})
      raise EventNil, "Event is nil" if event.nil?
      @connection = connection
      @event = event.to_hash
      @opts = opts

      @max_retries = opts[:max_retries] || 10
      @retry_max_interval = opts[:retry_max_interval] || 2
      @retry_interval = opts[:retry_initial_interval] || 2
      @driver = opts[:driver]

      @attempts = 1
      @should_retry = true
    end

    def call
      while should_retry do
        transformed = case driver
                        when "redshift"
                          Charrington::TransformRedshift.call(event)
                        else
                          Charrington::TransformPostgres.call(event)
                        end
        should_retry = Charrington::Insert.call(connection, transformed, opts)
        break if !should_retry

        @attempts += 1
        break if @attempts > max_retries

        # If we're retrying the action, sleep for the recommended interval
        # Double the interval for the next time through to achieve exponential backoff
        sleep_interval
      end
    rescue => e
      raise ProcessFailed, e.message
    ensure
      connection.close unless connection.nil?
      @event.clear if clearable(@event)
    end

    private

    def sleep_interval
      sleep(retry_interval)
      doubled = retry_interval * 2
      retry_interval = doubled > retry_max_interval ? retry_max_interval : doubled
    end

    def clearable(obj)
      obj.is_a? Hash or obj.is_a? Array
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
logstash-output-charrington-0.3.6 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.3.5 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.3.4 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.3.3 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.3.2 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.3.1 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.3.0 lib/logstash/outputs/charrington/process.rb