Sha256: 846eaebc3bd001b440b0e44878c6da363468e489609e51dcec48e450e2ecbfc8
Contents?: true
Size: 1.99 KB
Versions: 1
Compression:
Stored size: 1.99 KB
Contents
# typed: strict # frozen_string_literal: true module Workato module Connector module Sdk module Dsl class StreamPackage extend T::Sig sig { params(streams: Streams, connection: Connection).void } def initialize(streams:, connection:) @streams = streams @connection = connection end sig do params( stream: T.any(Stream::Proxy, String, T::Hash[T.untyped, T.untyped]), from: T.nilable(Integer), frame_size: T.nilable(Integer), blk: SorbetTypes::StreamInProc ).returns(T.untyped) end def in(stream, from: nil, frame_size: nil, &blk) if stream.is_a?(Hash) && stream[:__stream__] && stream[:chunks].nil? stream = out(stream[:name], stream[:input] || {}) end Stream::Reader.new( stream: stream, from: from, frame_size: frame_size ).each_chunk do |chunk, byte_from, byte_to, eof, next_from| blk.call(chunk, byte_from, byte_to, eof, next_from) end end sig { params(stream_name: String, input: SorbetTypes::StreamInputHash).returns(Stream::Proxy) } def out(stream_name, input = {}) Stream::Proxy.new(input: input, name: stream_name, stream: streams[stream_name]) end private T::Sig::WithoutRuntime.sig { params(symbol: T.any(String, Symbol), _args: T.untyped).void } def method_missing(symbol, *_args) raise UndefinedStdLibMethodError.new(symbol.to_s, 'workato.stream') end T::Sig::WithoutRuntime.sig { params(_args: T.untyped).returns(T::Boolean) } def respond_to_missing?(*_args) false end sig { returns(Connection) } attr_reader :connection sig { returns(Streams) } attr_reader :streams end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
workato-connector-sdk-1.3.0 | lib/workato/connector/sdk/dsl/stream_package.rb |