Sha256: 9586fe0a9b23f07f6d25f3fcb475bc1dbf00e9f9720b84aaf8a5778d550b2ce7

Contents?: true

Size: 1.72 KB

Versions: 3

Compression:

Stored size: 1.72 KB

Contents

require File.join(File.dirname(__FILE__), "service")

module Charrington
  class Process
    # This service starts the process of attempting to insert a row.
    # It handles retries where applicable.

    include Service
    attr_reader :event, :connection, :opts, :max_retries, :schema, :retry_max_interval
    attr_accessor :retry_interval, :should_retry

    Error = Class.new(StandardError)
    ProcessFailed = Class.new(Error)
    EventNil = Class.new(Error)

    def initialize(connection, event, opts={})
      raise EventNil, "Event is nil" if event.nil?
      @connection = connection
      @event = event.to_hash
      @opts = opts

      @max_retries = opts[:max_retries] || 10
      @retry_max_interval = opts[:retry_max_interval] || 2
      @retry_interval = opts[:retry_initial_interval] || 2

      @attempts = 1
      @should_retry = true
    end

    def call
      while should_retry do
        transformed = Charrington::Transform.call(event)
        should_retry = Charrington::Insert.call(connection, transformed, opts)
        break if !should_retry

        @attempts += 1
        break if @attempts > max_retries

        # If we're retrying the action, sleep for the recommended interval
        # Double the interval for the next time through to achieve exponential backoff
        sleep_interval
      end
    rescue => e
      raise ProcessFailed, e.message
    ensure
      connection.close unless connection.nil?
      @event.clear if clearable(@event)
    end

    private

    def sleep_interval
      sleep(retry_interval)
      doubled = retry_interval * 2
      retry_interval = doubled > retry_max_interval ? retry_max_interval : doubled
    end

    def clearable(obj)
      obj.is_a? Hash or obj.is_a? Array
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-output-charrington-0.2.2 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.2.1 lib/logstash/outputs/charrington/process.rb
logstash-output-charrington-0.2.0 lib/logstash/outputs/charrington/process.rb