lib/streamdal.rb in streamdal-0.0.2 vs lib/streamdal.rb in streamdal-0.0.3
- old
+ new
@@ -29,19 +29,17 @@
DEFAULT_GRPC_TIMEOUT = 5 # 5 seconds
DEFAULT_HEARTBEAT_INTERVAL = 1 # 1 second
MAX_PAYLOAD_SIZE = 1024 * 1024 # 1 megabyte
module Streamdal
-
OPERATION_TYPE_PRODUCER = 2
OPERATION_TYPE_CONSUMER = 1
CLIENT_TYPE_SDK = 1
CLIENT_TYPE_SHIM = 2
# Data class to hold instantiated wasm functions
class WasmFunction
-
##
# Instance of an initialized wasm module and associated memory store
attr_accessor :instance, :store
@@ -64,11 +62,10 @@
service_name: service_name,
)
end
end
-
class Client
##
# Streamdal SDK Client
#
@@ -115,24 +112,18 @@
# Let loops exit
sleep(1)
# Exit any remaining threads
@workers.each do |w|
- if w.running?
- w.exit
- end
+ w.exit if w.running?
end
end
def process(data, audience)
- if data.empty?
- raise 'data is required'
- end
+ raise 'data is required' if data.empty?
- if audience.nil?
- raise 'audience is required'
- end
+ raise 'audience is required' if audience.nil?
resp = Streamdal::Protos::SDKResponse.new
resp.status = :EXEC_STATUS_TRUE
resp.pipeline_status = Google::Protobuf::RepeatedField.new(:message, Streamdal::Protos::PipelineStatus, [])
resp.data = data
@@ -213,17 +204,13 @@
step_status.error = e.to_s
pipeline_status.step_status.push(step_status)
break
end
- if @cfg[:dry_run]
- @log.debug "Running step '#{step.name}' in dry-run mode"
- end
+ @log.debug "Running step '#{step.name}' in dry-run mode" if @cfg[:dry_run]
- if wasm_resp.output_payload.length.positive?
- resp.data = wasm_resp.output_payload
- end
+ resp.data = wasm_resp.output_payload if wasm_resp.output_payload.length.positive?
_handle_schema(aud, step, wasm_resp)
isr = wasm_resp.inter_step_result
@@ -307,35 +294,25 @@
end
private
def _validate_cfg(cfg)
- if cfg[:streamdal_url].nil? || cfg[:streamdal_url].empty?
- raise 'streamdal_url is required'
- end
+ raise 'streamdal_url is required' if cfg[:streamdal_url].nil? || cfg[:streamdal_url].empty?
- if cfg[:streamdal_token].nil? || cfg[:streamdal_token].empty?
- raise 'streamdal_token is required'
- end
+ raise 'streamdal_token is required' if cfg[:streamdal_token].nil? || cfg[:streamdal_token].empty?
- if cfg[:service_name].nil? || cfg[:streamdal_token].empty?
- raise 'service_name is required'
- end
+ raise 'service_name is required' if cfg[:service_name].nil? || cfg[:streamdal_token].empty?
if cfg[:log].nil? || cfg[:streamdal_token].empty?
logger = Logger.new($stdout)
logger.level = Logger::ERROR
cfg[:log] = logger
end
- if cfg[:pipeline_timeout].nil?
- cfg[:pipeline_timeout] = DEFAULT_PIPELINE_TIMEOUT
- end
+ cfg[:pipeline_timeout] = DEFAULT_PIPELINE_TIMEOUT if cfg[:pipeline_timeout].nil?
- if cfg[:step_timeout].nil?
- cfg[:step_timeout] = DEFAULT_STEP_TIMEOUT
- end
+ cfg[:step_timeout] = DEFAULT_STEP_TIMEOUT if cfg[:step_timeout].nil?
end
def _handle_command(cmd)
case cmd.command.to_s
when 'kv'
@@ -412,13 +389,11 @@
end
def _get_function(step)
# We cache functions so we can eliminate the wasm bytes from steps to save on memory
# And also to avoid re-initializing the same function multiple times
- if @functions.key?(step._wasm_id)
- return @functions[step._wasm_id]
- end
+ return @functions[step._wasm_id] if @functions.key?(step._wasm_id)
engine = Wasmtime::Engine.new
mod = Wasmtime::Module.new(engine, step._wasm_bytes)
linker = Wasmtime::Linker.new(engine, wasi: true)
@@ -448,21 +423,15 @@
func
end
def _call_wasm(step, data, isr)
- if step.nil?
- raise 'step is required'
- end
+ raise 'step is required' if step.nil?
- if data.nil?
- raise 'data is required'
- end
+ raise 'data is required' if data.nil?
- if isr.nil?
- isr = Streamdal::Protos::InterStepResult.new
- end
+ isr = Streamdal::Protos::InterStepResult.new if isr.nil?
req = Streamdal::Protos::WASMRequest.new
req.step = step.clone
req.input_payload = data
req.inter_step_result = isr
@@ -475,11 +444,11 @@
rescue => e
resp = Streamdal::Protos::WASMResponse.new
resp.exit_code = :WASM_EXIT_CODE_ERROR
resp.exit_msg = "Failed to execute WASM: #{e}"
resp.output_payload = ''
- return resp
+ resp
end
end
def _gen_register_request
req = Streamdal::Protos::RegisterRequest.new
@@ -514,13 +483,11 @@
@log.info('register started')
# Register with Streamdal External gRPC API
resps = @stub.register(_gen_register_request, metadata: _metadata)
resps.each do |r|
- if @exit
- break
- end
+ break if @exit
_handle_command(r)
end
@log.info('register exited')
@@ -562,13 +529,11 @@
def _get_pipelines(aud)
aud_str = aud_to_str(aud)
_add_audience(aud)
- if @pipelines.key?(aud_str)
- return @pipelines[aud_str]
- end
+ return @pipelines[aud_str] if @pipelines.key?(aud_str)
[]
end
def _heartbeat
@@ -610,22 +575,18 @@
end
end
def _get_active_tails_for_audience(aud)
aud_str = aud_to_str(aud)
- if @tails.key?(aud_str)
- return @tails[aud_str].values
- end
+ return @tails[aud_str].values if @tails.key?(aud_str)
[]
end
def _send_tail(aud, pipeline_id, original_data, new_data)
tails = _get_active_tails_for_audience(aud)
- if tails.empty?
- return nil
- end
+ return nil if tails.empty?
tails.each do |tail|
req = Streamdal::Protos::TailResponse.new
req.type = :TAIL_RESPONSE_TYPE_PAYLOAD
req.audience = aud
@@ -638,23 +599,17 @@
tail.queue.push(req)
end
end
def _notify_condition(pipeline, step, aud, cond, data, cond_type)
- if cond.nil?
- return nil
- end
+ return nil if cond.nil?
- if cond.notification.nil?
- return nil
- end
+ return nil if cond.notification.nil?
@log.debug 'Notifying'
- if @cfg[:dry_run]
- return nil
- end
+ return nil if @cfg[:dry_run]
@metrics.incr(CounterEntry.new(Metrics::COUNTER_NOTIFY, aud, {
"service": @cfg[:service_name],
"component_name": aud.component_name,
"pipeline_name": pipeline.name,
@@ -700,29 +655,24 @@
)
t.start_tail_workers
_set_active_tail(t)
-
end
def _set_active_tail(tail)
key = aud_to_str(tail.request.audience)
- unless @tails.key?(key)
- @tails[key] = {}
- end
+ @tails[key] = {} unless @tails.key?(key)
@tails[key][tail.request.id] = tail
end
def _set_paused_tail(tail)
key = aud_to_str(tail.request.aud)
- unless @paused_tails.key?(key)
- @paused_tails[key] = {}
- end
+ @paused_tails[key] = {} unless @paused_tails.key?(key)
@paused_tails[key][tail.request.id] = tail
end
def _stop_tail(cmd)
@@ -733,22 +683,18 @@
@tails[key][cmd.tail.request.id].stop_tail
# Remove from active tails
@tails[key].delete(cmd.tail.request.id)
- if @tails[key].empty?
- @tails.delete(key)
- end
+ @tails.delete(key) if @tails[key].empty?
end
- if @paused_tails.key?(key) && @paused_tails[key].key?(cmd.tail.request.id)
- @paused_tails[key].delete(cmd.tail.request.id)
+ return unless @paused_tails.key?(key) && @paused_tails[key].key?(cmd.tail.request.id)
- if @paused_tails[key].empty?
- @paused_tails.delete(key)
- end
- end
+ @paused_tails[key].delete(cmd.tail.request.id)
+
+ @paused_tails.delete(key) if @paused_tails[key].empty?
end
def _stop_all_tails
# TODO: does this modify the instances variables or copy them?
_stop_tails(@tails)
@@ -760,13 +706,11 @@
tails.each do |aud, aud_tails|
aud_tails.each do |t|
t.stop_tail
tails[aud].delete(tail.request.id)
- if tails[aud].empty?
- tails.delete(aud)
- end
+ tails.delete(aud) if tails[aud].empty?
end
end
end
def _pause_tail(cmd)
@@ -793,56 +737,32 @@
end
def _remove_active_tail(aud, tail_id)
key = aud_to_str(aud)
- if @tails.key?(key) && @tails[key].key?(tail_id)
- t = @tails[key][tail_id]
- t.stop_tail
+ return unless @tails.key?(key) && @tails[key].key?(tail_id)
- @tails[key].delete(tail_id)
+ t = @tails[key][tail_id]
+ t.stop_tail
- if @tails[key].empty?
- @tails.delete(key)
- end
+ @tails[key].delete(tail_id)
- t
- end
+ @tails.delete(key) if @tails[key].empty?
+
+ t
end
def _remove_paused_tail(aud, tail_id)
key = aud_to_str(aud)
- if @paused_tails.key?(key) && @paused_tails[key].key?(tail_id)
- t = @paused_tails[key][tail_id]
+ return unless @paused_tails.key?(key) && @paused_tails[key].key?(tail_id)
- @paused_tails[key].delete(tail_id)
+ t = @paused_tails[key][tail_id]
- if @paused_tails[key].empty?
- @paused_tails.delete(key)
- end
+ @paused_tails[key].delete(tail_id)
- t
- end
- end
+ @paused_tails.delete(key) if @paused_tails[key].empty?
- # Called by host functions to write memory to wasm instance so that
- # the wasm module can read the result of a host function call
- def write_to_memory(caller, res)
- alloc = caller.export('alloc').to_func
- memory = caller.export('memory').to_memory
-
- # Serialize protobuf message
- resp = res.to_proto
-
- # Allocate memory for response
- resp_ptr = alloc.call(resp.length)
-
- # Write response to memory
- memory.write(resp_ptr, resp)
-
- # return 64bit integer where first 32 bits is the pointer, and the last 32 is the length
- resp_ptr << 32 | resp.length
+ t
end
-
end
-end
\ No newline at end of file
+end