lib/streamdal.rb in streamdal-0.0.2 vs lib/streamdal.rb in streamdal-0.0.3

- old
+ new

@@ -29,19 +29,17 @@ DEFAULT_GRPC_TIMEOUT = 5 # 5 seconds DEFAULT_HEARTBEAT_INTERVAL = 1 # 1 second MAX_PAYLOAD_SIZE = 1024 * 1024 # 1 megabyte module Streamdal - OPERATION_TYPE_PRODUCER = 2 OPERATION_TYPE_CONSUMER = 1 CLIENT_TYPE_SDK = 1 CLIENT_TYPE_SHIM = 2 # Data class to hold instantiated wasm functions class WasmFunction - ## # Instance of an initialized wasm module and associated memory store attr_accessor :instance, :store @@ -64,11 +62,10 @@ service_name: service_name, ) end end - class Client ## # Streamdal SDK Client # @@ -115,24 +112,18 @@ # Let loops exit sleep(1) # Exit any remaining threads @workers.each do |w| - if w.running? - w.exit - end + w.exit if w.running? end end def process(data, audience) - if data.empty? - raise 'data is required' - end + raise 'data is required' if data.empty? - if audience.nil? - raise 'audience is required' - end + raise 'audience is required' if audience.nil? resp = Streamdal::Protos::SDKResponse.new resp.status = :EXEC_STATUS_TRUE resp.pipeline_status = Google::Protobuf::RepeatedField.new(:message, Streamdal::Protos::PipelineStatus, []) resp.data = data @@ -213,17 +204,13 @@ step_status.error = e.to_s pipeline_status.step_status.push(step_status) break end - if @cfg[:dry_run] - @log.debug "Running step '#{step.name}' in dry-run mode" - end + @log.debug "Running step '#{step.name}' in dry-run mode" if @cfg[:dry_run] - if wasm_resp.output_payload.length.positive? - resp.data = wasm_resp.output_payload - end + resp.data = wasm_resp.output_payload if wasm_resp.output_payload.length.positive? _handle_schema(aud, step, wasm_resp) isr = wasm_resp.inter_step_result @@ -307,35 +294,25 @@ end private def _validate_cfg(cfg) - if cfg[:streamdal_url].nil? || cfg[:streamdal_url].empty? - raise 'streamdal_url is required' - end + raise 'streamdal_url is required' if cfg[:streamdal_url].nil? || cfg[:streamdal_url].empty? - if cfg[:streamdal_token].nil? || cfg[:streamdal_token].empty? - raise 'streamdal_token is required' - end + raise 'streamdal_token is required' if cfg[:streamdal_token].nil? || cfg[:streamdal_token].empty? - if cfg[:service_name].nil? || cfg[:streamdal_token].empty? - raise 'service_name is required' - end + raise 'service_name is required' if cfg[:service_name].nil? || cfg[:streamdal_token].empty? if cfg[:log].nil? || cfg[:streamdal_token].empty? logger = Logger.new($stdout) logger.level = Logger::ERROR cfg[:log] = logger end - if cfg[:pipeline_timeout].nil? - cfg[:pipeline_timeout] = DEFAULT_PIPELINE_TIMEOUT - end + cfg[:pipeline_timeout] = DEFAULT_PIPELINE_TIMEOUT if cfg[:pipeline_timeout].nil? - if cfg[:step_timeout].nil? - cfg[:step_timeout] = DEFAULT_STEP_TIMEOUT - end + cfg[:step_timeout] = DEFAULT_STEP_TIMEOUT if cfg[:step_timeout].nil? end def _handle_command(cmd) case cmd.command.to_s when 'kv' @@ -412,13 +389,11 @@ end def _get_function(step) # We cache functions so we can eliminate the wasm bytes from steps to save on memory # And also to avoid re-initializing the same function multiple times - if @functions.key?(step._wasm_id) - return @functions[step._wasm_id] - end + return @functions[step._wasm_id] if @functions.key?(step._wasm_id) engine = Wasmtime::Engine.new mod = Wasmtime::Module.new(engine, step._wasm_bytes) linker = Wasmtime::Linker.new(engine, wasi: true) @@ -448,21 +423,15 @@ func end def _call_wasm(step, data, isr) - if step.nil? - raise 'step is required' - end + raise 'step is required' if step.nil? - if data.nil? - raise 'data is required' - end + raise 'data is required' if data.nil? - if isr.nil? - isr = Streamdal::Protos::InterStepResult.new - end + isr = Streamdal::Protos::InterStepResult.new if isr.nil? req = Streamdal::Protos::WASMRequest.new req.step = step.clone req.input_payload = data req.inter_step_result = isr @@ -475,11 +444,11 @@ rescue => e resp = Streamdal::Protos::WASMResponse.new resp.exit_code = :WASM_EXIT_CODE_ERROR resp.exit_msg = "Failed to execute WASM: #{e}" resp.output_payload = '' - return resp + resp end end def _gen_register_request req = Streamdal::Protos::RegisterRequest.new @@ -514,13 +483,11 @@ @log.info('register started') # Register with Streamdal External gRPC API resps = @stub.register(_gen_register_request, metadata: _metadata) resps.each do |r| - if @exit - break - end + break if @exit _handle_command(r) end @log.info('register exited') @@ -562,13 +529,11 @@ def _get_pipelines(aud) aud_str = aud_to_str(aud) _add_audience(aud) - if @pipelines.key?(aud_str) - return @pipelines[aud_str] - end + return @pipelines[aud_str] if @pipelines.key?(aud_str) [] end def _heartbeat @@ -610,22 +575,18 @@ end end def _get_active_tails_for_audience(aud) aud_str = aud_to_str(aud) - if @tails.key?(aud_str) - return @tails[aud_str].values - end + return @tails[aud_str].values if @tails.key?(aud_str) [] end def _send_tail(aud, pipeline_id, original_data, new_data) tails = _get_active_tails_for_audience(aud) - if tails.empty? - return nil - end + return nil if tails.empty? tails.each do |tail| req = Streamdal::Protos::TailResponse.new req.type = :TAIL_RESPONSE_TYPE_PAYLOAD req.audience = aud @@ -638,23 +599,17 @@ tail.queue.push(req) end end def _notify_condition(pipeline, step, aud, cond, data, cond_type) - if cond.nil? - return nil - end + return nil if cond.nil? - if cond.notification.nil? - return nil - end + return nil if cond.notification.nil? @log.debug 'Notifying' - if @cfg[:dry_run] - return nil - end + return nil if @cfg[:dry_run] @metrics.incr(CounterEntry.new(Metrics::COUNTER_NOTIFY, aud, { "service": @cfg[:service_name], "component_name": aud.component_name, "pipeline_name": pipeline.name, @@ -700,29 +655,24 @@ ) t.start_tail_workers _set_active_tail(t) - end def _set_active_tail(tail) key = aud_to_str(tail.request.audience) - unless @tails.key?(key) - @tails[key] = {} - end + @tails[key] = {} unless @tails.key?(key) @tails[key][tail.request.id] = tail end def _set_paused_tail(tail) key = aud_to_str(tail.request.aud) - unless @paused_tails.key?(key) - @paused_tails[key] = {} - end + @paused_tails[key] = {} unless @paused_tails.key?(key) @paused_tails[key][tail.request.id] = tail end def _stop_tail(cmd) @@ -733,22 +683,18 @@ @tails[key][cmd.tail.request.id].stop_tail # Remove from active tails @tails[key].delete(cmd.tail.request.id) - if @tails[key].empty? - @tails.delete(key) - end + @tails.delete(key) if @tails[key].empty? end - if @paused_tails.key?(key) && @paused_tails[key].key?(cmd.tail.request.id) - @paused_tails[key].delete(cmd.tail.request.id) + return unless @paused_tails.key?(key) && @paused_tails[key].key?(cmd.tail.request.id) - if @paused_tails[key].empty? - @paused_tails.delete(key) - end - end + @paused_tails[key].delete(cmd.tail.request.id) + + @paused_tails.delete(key) if @paused_tails[key].empty? end def _stop_all_tails # TODO: does this modify the instances variables or copy them? _stop_tails(@tails) @@ -760,13 +706,11 @@ tails.each do |aud, aud_tails| aud_tails.each do |t| t.stop_tail tails[aud].delete(tail.request.id) - if tails[aud].empty? - tails.delete(aud) - end + tails.delete(aud) if tails[aud].empty? end end end def _pause_tail(cmd) @@ -793,56 +737,32 @@ end def _remove_active_tail(aud, tail_id) key = aud_to_str(aud) - if @tails.key?(key) && @tails[key].key?(tail_id) - t = @tails[key][tail_id] - t.stop_tail + return unless @tails.key?(key) && @tails[key].key?(tail_id) - @tails[key].delete(tail_id) + t = @tails[key][tail_id] + t.stop_tail - if @tails[key].empty? - @tails.delete(key) - end + @tails[key].delete(tail_id) - t - end + @tails.delete(key) if @tails[key].empty? + + t end def _remove_paused_tail(aud, tail_id) key = aud_to_str(aud) - if @paused_tails.key?(key) && @paused_tails[key].key?(tail_id) - t = @paused_tails[key][tail_id] + return unless @paused_tails.key?(key) && @paused_tails[key].key?(tail_id) - @paused_tails[key].delete(tail_id) + t = @paused_tails[key][tail_id] - if @paused_tails[key].empty? - @paused_tails.delete(key) - end + @paused_tails[key].delete(tail_id) - t - end - end + @paused_tails.delete(key) if @paused_tails[key].empty? - # Called by host functions to write memory to wasm instance so that - # the wasm module can read the result of a host function call - def write_to_memory(caller, res) - alloc = caller.export('alloc').to_func - memory = caller.export('memory').to_memory - - # Serialize protobuf message - resp = res.to_proto - - # Allocate memory for response - resp_ptr = alloc.call(resp.length) - - # Write response to memory - memory.write(resp_ptr, resp) - - # return 64bit integer where first 32 bits is the pointer, and the last 32 is the length - resp_ptr << 32 | resp.length + t end - end -end \ No newline at end of file +end