lib/streamdal.rb in streamdal-0.0.1 vs lib/streamdal.rb in streamdal-0.0.2

- old
+ new

@@ -6,15 +6,15 @@ require 'wasmtime' require 'sp_sdk_pb' require 'sp_common_pb' require 'sp_info_pb' require 'sp_internal_pb' -require "sp_internal_services_pb" +require 'sp_internal_services_pb' require 'sp_pipeline_pb' -require "sp_wsm_pb" -require "steps/sp_steps_httprequest_pb" -require "steps/sp_steps_kv_pb" +require 'sp_wsm_pb' +require 'steps/sp_steps_httprequest_pb' +require 'steps/sp_steps_kv_pb' require 'timeout' require 'google/protobuf' require_relative 'audiences' require_relative 'hostfunc' require_relative 'kv' @@ -122,16 +122,16 @@ end end end def process(data, audience) - if data.length == 0 - raise "data is required" + if data.empty? + raise 'data is required' end if audience.nil? - raise "audience is required" + raise 'audience is required' end resp = Streamdal::Protos::SDKResponse.new resp.status = :EXEC_STATUS_TRUE resp.pipeline_status = Google::Protobuf::RepeatedField.new(:message, Streamdal::Protos::PipelineStatus, []) @@ -142,12 +142,12 @@ labels = { "service": @cfg[:service_name], "operation_type": aud.operation_type, "operation": aud.operation_name, "component": aud.component_name, - "pipeline_name": "", - "pipeline_id": "", + "pipeline_name": '', + "pipeline_id": '', } # TODO: metrics bytes_processed = Metrics::COUNTER_CONSUME_BYTES errors_counter = Metrics::COUNTER_CONSUME_ERRORS @@ -164,20 +164,20 @@ payload_size = data.length if payload_size > MAX_PAYLOAD_SIZE # TODO: add metrics resp.status = :EXEC_STATUS_ERROR - resp.error = "payload size exceeds maximum allowed size" + resp.error = 'payload size exceeds maximum allowed size' resp end # Needed for send_tail() original_data = data pipelines = _get_pipelines(aud) - if pipelines.length == 0 - _send_tail(aud, "", original_data, original_data) + if pipelines.empty? + _send_tail(aud, '', original_data, original_data) return resp end @metrics.incr(CounterEntry.new(bytes_processed, aud, labels, data.length)) @metrics.incr(CounterEntry.new(rate_processed, aud, labels, 1)) @@ -217,11 +217,11 @@ if @cfg[:dry_run] @log.debug "Running step '#{step.name}' in dry-run mode" end - if wasm_resp.output_payload.length > 0 + if wasm_resp.output_payload.length.positive? resp.data = wasm_resp.output_payload end _handle_schema(aud, step, wasm_resp) @@ -294,37 +294,37 @@ # Append pipeline status to the response resp.pipeline_status.push(pipeline_status) end # pipelines.each end # timeout - _send_tail(aud, "", original_data, resp.data) + _send_tail(aud, '', original_data, resp.data) if @cfg[:dry_run] - @log.debug "Dry-run, setting response data to original data" + @log.debug 'Dry-run, setting response data to original data' resp.data = original_data end resp end private def _validate_cfg(cfg) if cfg[:streamdal_url].nil? || cfg[:streamdal_url].empty? - raise "streamdal_url is required" + raise 'streamdal_url is required' end if cfg[:streamdal_token].nil? || cfg[:streamdal_token].empty? - raise "streamdal_token is required" + raise 'streamdal_token is required' end if cfg[:service_name].nil? || cfg[:streamdal_token].empty? - raise "service_name is required" + raise 'service_name is required' end if cfg[:log].nil? || cfg[:streamdal_token].empty? - logger = Logger.new(STDOUT) + logger = Logger.new($stdout) logger.level = Logger::ERROR cfg[:log] = logger end if cfg[:pipeline_timeout].nil? @@ -336,17 +336,17 @@ end end def _handle_command(cmd) case cmd.command.to_s - when "kv" + when 'kv' _handle_kv(cmd) - when "tail" + when 'tail' _handle_tail_request(cmd) - when "set_pipelines" + when 'set_pipelines' _set_pipelines(cmd) - when "keep_alive" + when 'keep_alive' # Do nothing else @log.error "unknown command type #{cmd.command}" end end @@ -376,17 +376,15 @@ end end end def _set_pipelines(cmd) - if cmd.nil? - raise "cmd is required" - end + raise 'cmd is required' if cmd.nil? cmd.set_pipelines.pipelines.each_with_index { |p, pIdx| p.steps.each_with_index { |step, idx| - if step._wasm_bytes == "" + if step._wasm_bytes == '' if cmd.set_pipelines.wasm_modules.has_key?(step._wasm_id) step._wasm_bytes = cmd.set_pipelines.wasm_modules[step._wasm_id].bytes cmd.set_pipelines.pipelines[pIdx].steps[idx] = step else @log.error "WASM module not found for step: #{step._wasm_id}" @@ -422,15 +420,15 @@ engine = Wasmtime::Engine.new mod = Wasmtime::Module.new(engine, step._wasm_bytes) linker = Wasmtime::Linker.new(engine, wasi: true) - linker.func_new("env", "httpRequest", [:i32, :i32], [:i64]) do |caller, ptr, len| + linker.func_new('env', 'httpRequest', %i[i32 i32], [:i64]) do |caller, ptr, len| @hostfunc.http_request(caller, ptr, len) end - linker.func_new("env", "kvExists", [:i32, :i32], [:i64]) do |caller, ptr, len| + linker.func_new('env', 'kvExists', %i[i32 i32], [:i64]) do |caller, ptr, len| @hostfunc.kv_exists(caller, ptr, len) end wasi_ctx = Wasmtime::WasiCtxBuilder.new .inherit_stdout @@ -440,12 +438,10 @@ .build store = Wasmtime::Store.new(engine, wasi_ctx: wasi_ctx) instance = linker.instantiate(store, mod) - # TODO: host funcs - # Store in cache func = WasmFunction.new func.instance = instance func.store = store @functions[step._wasm_id] = func @@ -453,15 +449,15 @@ func end def _call_wasm(step, data, isr) if step.nil? - raise "step is required" + raise 'step is required' end if data.nil? - raise "data is required" + raise 'data is required' end if isr.nil? isr = Streamdal::Protos::InterStepResult.new end @@ -478,11 +474,11 @@ end rescue => e resp = Streamdal::Protos::WASMResponse.new resp.exit_code = :WASM_EXIT_CODE_ERROR resp.exit_msg = "Failed to execute WASM: #{e}" - resp.output_payload = "" + resp.output_payload = '' return resp end end def _gen_register_request @@ -498,26 +494,26 @@ def _gen_client_info arch, os = RUBY_PLATFORM.split(/-/) ci = Streamdal::Protos::ClientInfo.new ci.client_type = :CLIENT_TYPE_SDK - ci.library_name = "ruby-sdk" - ci.library_version = "0.0.1" - ci.language = "ruby" + ci.library_name = 'ruby-sdk' + ci.library_version = '0.0.1' + ci.language = 'ruby' ci.arch = arch ci.os = os ci end # Returns metadata for gRPC requests to the internal gRPC API def _metadata - { "auth-token" => @cfg[:streamdal_token].to_s } + { 'auth-token' => @cfg[:streamdal_token].to_s } end def _register - @log.info("register started") + @log.info('register started') # Register with Streamdal External gRPC API resps = @stub.register(_gen_register_request, metadata: _metadata) resps.each do |r| if @exit @@ -525,26 +521,26 @@ end _handle_command(r) end - @log.info("register exited") + @log.info('register exited') end def _exec_wasm(req) wasm_func = _get_function(req.step) # Empty out _wasm_bytes, we don't need it anymore # TODO: does this actually update the original object? - req.step._wasm_bytes = "" + req.step._wasm_bytes = '' data = req.to_proto - memory = wasm_func.instance.export("memory").to_memory - alloc = wasm_func.instance.export("alloc").to_func - dealloc = wasm_func.instance.export("dealloc").to_func - f = wasm_func.instance.export("f").to_func + memory = wasm_func.instance.export('memory').to_memory + alloc = wasm_func.instance.export('alloc').to_func + dealloc = wasm_func.instance.export('dealloc').to_func + f = wasm_func.instance.export('f').to_func start_ptr = alloc.call(data.length) memory.write(start_ptr, data) @@ -579,11 +575,11 @@ until @exit req = Streamdal::Protos::HeartbeatRequest.new req.session_id = @session_id req.audiences = Google::Protobuf::RepeatedField.new(:message, Streamdal::Protos::Audience, []) - @audiences.each do |_, aud| + @audiences.each_value do |aud| req.audiences.push(aud) end req.client_info = _gen_client_info req.service_name = @cfg[:service_name] @@ -623,11 +619,11 @@ [] end def _send_tail(aud, pipeline_id, original_data, new_data) tails = _get_active_tails_for_audience(aud) - if tails.length == 0 + if tails.empty? return nil end tails.each do |tail| req = Streamdal::Protos::TailResponse.new @@ -650,11 +646,11 @@ if cond.notification.nil? return nil end - @log.debug "Notifying" + @log.debug 'Notifying' if @cfg[:dry_run] return nil end @@ -737,19 +733,19 @@ @tails[key][cmd.tail.request.id].stop_tail # Remove from active tails @tails[key].delete(cmd.tail.request.id) - if @tails[key].length == 0 + if @tails[key].empty? @tails.delete(key) end end if @paused_tails.key?(key) && @paused_tails[key].key?(cmd.tail.request.id) @paused_tails[key].delete(cmd.tail.request.id) - if @paused_tails[key].length == 0 + if @paused_tails[key].empty? @paused_tails.delete(key) end end end @@ -764,11 +760,11 @@ tails.each do |aud, aud_tails| aud_tails.each do |t| t.stop_tail tails[aud].delete(tail.request.id) - if tails[aud].length == 0 + if tails[aud].empty? tails.delete(aud) end end end end @@ -803,11 +799,11 @@ t = @tails[key][tail_id] t.stop_tail @tails[key].delete(tail_id) - if @tails[key].length == 0 + if @tails[key].empty? @tails.delete(key) end t end @@ -819,22 +815,22 @@ if @paused_tails.key?(key) && @paused_tails[key].key?(tail_id) t = @paused_tails[key][tail_id] @paused_tails[key].delete(tail_id) - if @paused_tails[key].length == 0 + if @paused_tails[key].empty? @paused_tails.delete(key) end t end end # Called by host functions to write memory to wasm instance so that # the wasm module can read the result of a host function call def write_to_memory(caller, res) - alloc = caller.export("alloc").to_func - memory = caller.export("memory").to_memory + alloc = caller.export('alloc').to_func + memory = caller.export('memory').to_memory # Serialize protobuf message resp = res.to_proto # Allocate memory for response \ No newline at end of file