lib/streamdal.rb in streamdal-0.0.1 vs lib/streamdal.rb in streamdal-0.0.2
- old
+ new
@@ -6,15 +6,15 @@
require 'wasmtime'
require 'sp_sdk_pb'
require 'sp_common_pb'
require 'sp_info_pb'
require 'sp_internal_pb'
-require "sp_internal_services_pb"
+require 'sp_internal_services_pb'
require 'sp_pipeline_pb'
-require "sp_wsm_pb"
-require "steps/sp_steps_httprequest_pb"
-require "steps/sp_steps_kv_pb"
+require 'sp_wsm_pb'
+require 'steps/sp_steps_httprequest_pb'
+require 'steps/sp_steps_kv_pb'
require 'timeout'
require 'google/protobuf'
require_relative 'audiences'
require_relative 'hostfunc'
require_relative 'kv'
@@ -122,16 +122,16 @@
end
end
end
def process(data, audience)
- if data.length == 0
- raise "data is required"
+ if data.empty?
+ raise 'data is required'
end
if audience.nil?
- raise "audience is required"
+ raise 'audience is required'
end
resp = Streamdal::Protos::SDKResponse.new
resp.status = :EXEC_STATUS_TRUE
resp.pipeline_status = Google::Protobuf::RepeatedField.new(:message, Streamdal::Protos::PipelineStatus, [])
@@ -142,12 +142,12 @@
labels = {
"service": @cfg[:service_name],
"operation_type": aud.operation_type,
"operation": aud.operation_name,
"component": aud.component_name,
- "pipeline_name": "",
- "pipeline_id": "",
+ "pipeline_name": '',
+ "pipeline_id": '',
}
# TODO: metrics
bytes_processed = Metrics::COUNTER_CONSUME_BYTES
errors_counter = Metrics::COUNTER_CONSUME_ERRORS
@@ -164,20 +164,20 @@
payload_size = data.length
if payload_size > MAX_PAYLOAD_SIZE
# TODO: add metrics
resp.status = :EXEC_STATUS_ERROR
- resp.error = "payload size exceeds maximum allowed size"
+ resp.error = 'payload size exceeds maximum allowed size'
resp
end
# Needed for send_tail()
original_data = data
pipelines = _get_pipelines(aud)
- if pipelines.length == 0
- _send_tail(aud, "", original_data, original_data)
+ if pipelines.empty?
+ _send_tail(aud, '', original_data, original_data)
return resp
end
@metrics.incr(CounterEntry.new(bytes_processed, aud, labels, data.length))
@metrics.incr(CounterEntry.new(rate_processed, aud, labels, 1))
@@ -217,11 +217,11 @@
if @cfg[:dry_run]
@log.debug "Running step '#{step.name}' in dry-run mode"
end
- if wasm_resp.output_payload.length > 0
+ if wasm_resp.output_payload.length.positive?
resp.data = wasm_resp.output_payload
end
_handle_schema(aud, step, wasm_resp)
@@ -294,37 +294,37 @@
# Append pipeline status to the response
resp.pipeline_status.push(pipeline_status)
end # pipelines.each
end # timeout
- _send_tail(aud, "", original_data, resp.data)
+ _send_tail(aud, '', original_data, resp.data)
if @cfg[:dry_run]
- @log.debug "Dry-run, setting response data to original data"
+ @log.debug 'Dry-run, setting response data to original data'
resp.data = original_data
end
resp
end
private
def _validate_cfg(cfg)
if cfg[:streamdal_url].nil? || cfg[:streamdal_url].empty?
- raise "streamdal_url is required"
+ raise 'streamdal_url is required'
end
if cfg[:streamdal_token].nil? || cfg[:streamdal_token].empty?
- raise "streamdal_token is required"
+ raise 'streamdal_token is required'
end
if cfg[:service_name].nil? || cfg[:streamdal_token].empty?
- raise "service_name is required"
+ raise 'service_name is required'
end
if cfg[:log].nil? || cfg[:streamdal_token].empty?
- logger = Logger.new(STDOUT)
+ logger = Logger.new($stdout)
logger.level = Logger::ERROR
cfg[:log] = logger
end
if cfg[:pipeline_timeout].nil?
@@ -336,17 +336,17 @@
end
end
def _handle_command(cmd)
case cmd.command.to_s
- when "kv"
+ when 'kv'
_handle_kv(cmd)
- when "tail"
+ when 'tail'
_handle_tail_request(cmd)
- when "set_pipelines"
+ when 'set_pipelines'
_set_pipelines(cmd)
- when "keep_alive"
+ when 'keep_alive'
# Do nothing
else
@log.error "unknown command type #{cmd.command}"
end
end
@@ -376,17 +376,15 @@
end
end
end
def _set_pipelines(cmd)
- if cmd.nil?
- raise "cmd is required"
- end
+ raise 'cmd is required' if cmd.nil?
cmd.set_pipelines.pipelines.each_with_index { |p, pIdx|
p.steps.each_with_index { |step, idx|
- if step._wasm_bytes == ""
+ if step._wasm_bytes == ''
if cmd.set_pipelines.wasm_modules.has_key?(step._wasm_id)
step._wasm_bytes = cmd.set_pipelines.wasm_modules[step._wasm_id].bytes
cmd.set_pipelines.pipelines[pIdx].steps[idx] = step
else
@log.error "WASM module not found for step: #{step._wasm_id}"
@@ -422,15 +420,15 @@
engine = Wasmtime::Engine.new
mod = Wasmtime::Module.new(engine, step._wasm_bytes)
linker = Wasmtime::Linker.new(engine, wasi: true)
- linker.func_new("env", "httpRequest", [:i32, :i32], [:i64]) do |caller, ptr, len|
+ linker.func_new('env', 'httpRequest', %i[i32 i32], [:i64]) do |caller, ptr, len|
@hostfunc.http_request(caller, ptr, len)
end
- linker.func_new("env", "kvExists", [:i32, :i32], [:i64]) do |caller, ptr, len|
+ linker.func_new('env', 'kvExists', %i[i32 i32], [:i64]) do |caller, ptr, len|
@hostfunc.kv_exists(caller, ptr, len)
end
wasi_ctx = Wasmtime::WasiCtxBuilder.new
.inherit_stdout
@@ -440,12 +438,10 @@
.build
store = Wasmtime::Store.new(engine, wasi_ctx: wasi_ctx)
instance = linker.instantiate(store, mod)
- # TODO: host funcs
-
# Store in cache
func = WasmFunction.new
func.instance = instance
func.store = store
@functions[step._wasm_id] = func
@@ -453,15 +449,15 @@
func
end
def _call_wasm(step, data, isr)
if step.nil?
- raise "step is required"
+ raise 'step is required'
end
if data.nil?
- raise "data is required"
+ raise 'data is required'
end
if isr.nil?
isr = Streamdal::Protos::InterStepResult.new
end
@@ -478,11 +474,11 @@
end
rescue => e
resp = Streamdal::Protos::WASMResponse.new
resp.exit_code = :WASM_EXIT_CODE_ERROR
resp.exit_msg = "Failed to execute WASM: #{e}"
- resp.output_payload = ""
+ resp.output_payload = ''
return resp
end
end
def _gen_register_request
@@ -498,26 +494,26 @@
def _gen_client_info
arch, os = RUBY_PLATFORM.split(/-/)
ci = Streamdal::Protos::ClientInfo.new
ci.client_type = :CLIENT_TYPE_SDK
- ci.library_name = "ruby-sdk"
- ci.library_version = "0.0.1"
- ci.language = "ruby"
+ ci.library_name = 'ruby-sdk'
+ ci.library_version = '0.0.1'
+ ci.language = 'ruby'
ci.arch = arch
ci.os = os
ci
end
# Returns metadata for gRPC requests to the internal gRPC API
def _metadata
- { "auth-token" => @cfg[:streamdal_token].to_s }
+ { 'auth-token' => @cfg[:streamdal_token].to_s }
end
def _register
- @log.info("register started")
+ @log.info('register started')
# Register with Streamdal External gRPC API
resps = @stub.register(_gen_register_request, metadata: _metadata)
resps.each do |r|
if @exit
@@ -525,26 +521,26 @@
end
_handle_command(r)
end
- @log.info("register exited")
+ @log.info('register exited')
end
def _exec_wasm(req)
wasm_func = _get_function(req.step)
# Empty out _wasm_bytes, we don't need it anymore
# TODO: does this actually update the original object?
- req.step._wasm_bytes = ""
+ req.step._wasm_bytes = ''
data = req.to_proto
- memory = wasm_func.instance.export("memory").to_memory
- alloc = wasm_func.instance.export("alloc").to_func
- dealloc = wasm_func.instance.export("dealloc").to_func
- f = wasm_func.instance.export("f").to_func
+ memory = wasm_func.instance.export('memory').to_memory
+ alloc = wasm_func.instance.export('alloc').to_func
+ dealloc = wasm_func.instance.export('dealloc').to_func
+ f = wasm_func.instance.export('f').to_func
start_ptr = alloc.call(data.length)
memory.write(start_ptr, data)
@@ -579,11 +575,11 @@
until @exit
req = Streamdal::Protos::HeartbeatRequest.new
req.session_id = @session_id
req.audiences = Google::Protobuf::RepeatedField.new(:message, Streamdal::Protos::Audience, [])
- @audiences.each do |_, aud|
+ @audiences.each_value do |aud|
req.audiences.push(aud)
end
req.client_info = _gen_client_info
req.service_name = @cfg[:service_name]
@@ -623,11 +619,11 @@
[]
end
def _send_tail(aud, pipeline_id, original_data, new_data)
tails = _get_active_tails_for_audience(aud)
- if tails.length == 0
+ if tails.empty?
return nil
end
tails.each do |tail|
req = Streamdal::Protos::TailResponse.new
@@ -650,11 +646,11 @@
if cond.notification.nil?
return nil
end
- @log.debug "Notifying"
+ @log.debug 'Notifying'
if @cfg[:dry_run]
return nil
end
@@ -737,19 +733,19 @@
@tails[key][cmd.tail.request.id].stop_tail
# Remove from active tails
@tails[key].delete(cmd.tail.request.id)
- if @tails[key].length == 0
+ if @tails[key].empty?
@tails.delete(key)
end
end
if @paused_tails.key?(key) && @paused_tails[key].key?(cmd.tail.request.id)
@paused_tails[key].delete(cmd.tail.request.id)
- if @paused_tails[key].length == 0
+ if @paused_tails[key].empty?
@paused_tails.delete(key)
end
end
end
@@ -764,11 +760,11 @@
tails.each do |aud, aud_tails|
aud_tails.each do |t|
t.stop_tail
tails[aud].delete(tail.request.id)
- if tails[aud].length == 0
+ if tails[aud].empty?
tails.delete(aud)
end
end
end
end
@@ -803,11 +799,11 @@
t = @tails[key][tail_id]
t.stop_tail
@tails[key].delete(tail_id)
- if @tails[key].length == 0
+ if @tails[key].empty?
@tails.delete(key)
end
t
end
@@ -819,22 +815,22 @@
if @paused_tails.key?(key) && @paused_tails[key].key?(tail_id)
t = @paused_tails[key][tail_id]
@paused_tails[key].delete(tail_id)
- if @paused_tails[key].length == 0
+ if @paused_tails[key].empty?
@paused_tails.delete(key)
end
t
end
end
# Called by host functions to write memory to wasm instance so that
# the wasm module can read the result of a host function call
def write_to_memory(caller, res)
- alloc = caller.export("alloc").to_func
- memory = caller.export("memory").to_memory
+ alloc = caller.export('alloc').to_func
+ memory = caller.export('memory').to_memory
# Serialize protobuf message
resp = res.to_proto
# Allocate memory for response
\ No newline at end of file