Sha256: 846eaebc3bd001b440b0e44878c6da363468e489609e51dcec48e450e2ecbfc8

Contents?: true

Size: 1.99 KB

Versions: 1

Compression:

Stored size: 1.99 KB

Contents

# typed: strict
# frozen_string_literal: true

module Workato
  module Connector
    module Sdk
      module Dsl
        class StreamPackage
          extend T::Sig

          sig { params(streams: Streams, connection: Connection).void }
          def initialize(streams:, connection:)
            @streams = streams
            @connection = connection
          end

          sig do
            params(
              stream: T.any(Stream::Proxy, String, T::Hash[T.untyped, T.untyped]),
              from: T.nilable(Integer),
              frame_size: T.nilable(Integer),
              blk: SorbetTypes::StreamInProc
            ).returns(T.untyped)
          end
          def in(stream, from: nil, frame_size: nil, &blk)
            if stream.is_a?(Hash) && stream[:__stream__] && stream[:chunks].nil?
              stream = out(stream[:name], stream[:input] || {})
            end

            Stream::Reader.new(
              stream: stream,
              from: from,
              frame_size: frame_size
            ).each_chunk do |chunk, byte_from, byte_to, eof, next_from|
              blk.call(chunk, byte_from, byte_to, eof, next_from)
            end
          end

          sig { params(stream_name: String, input: SorbetTypes::StreamInputHash).returns(Stream::Proxy) }
          def out(stream_name, input = {})
            Stream::Proxy.new(input: input, name: stream_name, stream: streams[stream_name])
          end

          private

          T::Sig::WithoutRuntime.sig { params(symbol: T.any(String, Symbol), _args: T.untyped).void }
          def method_missing(symbol, *_args)
            raise UndefinedStdLibMethodError.new(symbol.to_s, 'workato.stream')
          end

          T::Sig::WithoutRuntime.sig { params(_args: T.untyped).returns(T::Boolean) }
          def respond_to_missing?(*_args)
            false
          end

          sig { returns(Connection) }
          attr_reader :connection

          sig { returns(Streams) }
          attr_reader :streams
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
workato-connector-sdk-1.3.0 lib/workato/connector/sdk/dsl/stream_package.rb