# frozen_string_literal: true module Multiwoven module Integrations::Core module Utils def keys_to_symbols(hash) if hash.is_a?(Hash) hash.each_with_object({}) do |(key, value), result| result[key.to_sym] = keys_to_symbols(value) end elsif hash.is_a?(Array) hash.map { |item| keys_to_symbols(item) } else hash end end def convert_to_json_schema(column_definitions) json_schema = { "type" => "object", "properties" => {} } column_definitions.each do |column| column_name = column[:column_name] type = column[:type] optional = column[:optional] json_type = map_type_to_json_schema(type) json_schema["properties"][column_name] = { "type" => json_type } json_schema["properties"][column_name]["type"] = [json_type, "null"] if optional end json_schema end def map_type_to_json_schema(type) case type when "NUMBER" "integer" else "string" # Default type end end def logger Integrations::Service.logger end def report_exception(exception, meta = {}) reporter = Integrations::Service.exception_reporter reporter&.report(exception, meta) end def create_log_message(context, type, exception) Integrations::Protocol::LogMessage.new( name: context, level: type, message: exception.message ).to_multiwoven_message end def handle_exception(exception, meta = {}) logger.error( "#{hash_to_string(meta)}: #{exception.message}" ) report_exception(exception, meta) create_log_message(meta[:context], meta[:type], exception) end def hash_to_string(hash) hash.map { |key, value| "#{key} = #{value}" }.join(", ") end def extract_data(record_object, properties) data_attributes = record_object.with_indifferent_access data_attributes.select { |key, _| properties.key?(key.to_sym) } end def success?(response) response && %w[200 201].include?(response.code.to_s) end def build_catalog(catalog_json) streams = catalog_json["streams"].map { |stream_json| build_stream(stream_json) } Multiwoven::Integrations::Protocol::Catalog.new( streams: streams, request_rate_limit: catalog_json["request_rate_limit"] || 60, request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute", request_rate_concurrency: catalog_json["request_rate_concurrency"] || 10, schema_mode: catalog_json["schema_mode"] || "schema" ) end def build_stream(stream_json) Multiwoven::Integrations::Protocol::Stream.new( name: stream_json["name"], url: stream_json["url"], action: stream_json["action"], request_method: stream_json["method"], batch_support: stream_json["batch_support"] || false, batch_size: stream_json["batch_size"] || 1, json_schema: stream_json["json_schema"], request_rate_limit: stream_json["request_rate_limit"].to_i, request_rate_limit_unit: stream_json["request_rate_limit_unit"] || "minute", request_rate_concurrency: stream_json["request_rate_concurrency"].to_i, supported_sync_modes: stream_json["supported_sync_modes"] ) end end end end