Sha256: 0568f26d150434c461bfe8dca67aaca4567e5297e481b24b63d8a1b2055e7ff7

Contents?: true

Size: 835 Bytes

Versions: 2

Compression:

Stored size: 835 Bytes

Contents

module Kcl
  class Checkpointer
    def initialize io_handler
      @io_handler = io_handler
    end

    def checkpoint sequence_number = nil
      io_handler.write_action action: 'checkpoint', checkpoint: sequence_number

      action = fetch_action
      if action['action'] == 'checkpoint'
        fail CheckpointError, action['error'] unless action['error'].nil?
      else
        fail CheckpointError, 'InvalidStateException'
      end
    end

    private

    attr_reader :io_handler

    def fetch_action
      loop do
        action = read_action

        return action unless action.nil?
      end
    end

    def read_action
      io_handler.read_action
    rescue IOHandler::ReadError => read_error
      io_handler.write_error \
        "Could not understand line read from input: #{read_error.line}"
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
amazon-kinesis-client-ruby-0.0.3 lib/kcl/checkpointer.rb
amazon-kinesis-client-ruby-0.0.1 lib/kcl/checkpointer.rb