Sha256: 65879ccdec5a21d6a0f469ec533fd76a7e98854c826be9855146616fc465aee3

Contents?: true

Size: 746 Bytes

Versions: 1

Compression:

Stored size: 746 Bytes

Contents

# frozen_string_literal: true

require 'redis'

# :nodoc:
module Jstreams
  ##
  # A Redis streams Consumer Group
  class ConsumerGroup
    ##
    # @param [String] name Consumer group name
    # @param [String] stream Stream name
    # @param [Redis] redis Redis connection
    def initialize(name:, stream:, redis:)
      @name = name
      @stream = stream
      @redis = redis
    end

    ##
    # Returns true if the group was created and false if it already existed
    def create_if_not_exists(start_id: 0)
      @redis.xgroup(:create, @stream, @name, start_id, mkstream: true)
      true
    rescue ::Redis::CommandError => e
      raise e unless /BUSYGROUP/ =~ e.message
      false
    end
  end

  private_constant :ConsumerGroup
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jstreams-0.1.0.alpha lib/jstreams/consumer_group.rb