lib/polyn/cli/schema_loader.rb in polyn-cli-0.1.9 vs lib/polyn/cli/schema_loader.rb in polyn-cli-0.2.0

- old
+ new

@@ -1,58 +1,58 @@ # frozen_string_literal: true module Polyn class Cli ## - # Loads the JSON schmea into the event registry. + # Loads the JSON schema into the schema registry. class SchemaLoader include Thor::Actions STORE_NAME = "POLYN_SCHEMAS" ## - # Loads the events from the event repository into the Polyn event registry. + # Loads the schemas from the schema repository into the Polyn schema registry. # @return [Bool] def self.load(cli) - new(cli).load_events + new(cli).load_schemas end def initialize(thor, **opts) @thor = thor @client = connect @store_name = opts.fetch(:store_name, STORE_NAME) @bucket = client.key_value(@store_name) @cloud_event_schema = Polyn::Cli::CloudEvent.to_h.freeze - @events_dir = opts.fetch(:events_dir, File.join(Dir.pwd, "events")) - @events = {} - @existing_events = {} + @schemas_dir = opts.fetch(:schemas_dir, File.join(Dir.pwd, "schemas")) + @schemas = {} + @existing_schemas = {} end - def load_events - thor.say "Loading events into the Polyn event registry from '#{events_dir}'" - read_events - load_existing_events + def load_schemas + thor.say "Loading schemas into the Polyn schema registry from '#{schemas_dir}'" + read_schemas + load_existing_schemas - events.each do |name, event| - bucket.put(name, JSON.generate(event)) + schemas.each do |name, schema| + bucket.put(name, JSON.generate(schema)) end - delete_missing_events + delete_missing_schemas true end private attr_reader :thor, - :events, + :schemas, :client, :bucket, :cloud_event_schema, - :events_dir, + :schemas_dir, :store_name, - :existing_events + :existing_schemas def connect opts = { max_reconnect_attempts: 5, reconnect_time_wait: 0.5, @@ -64,72 +64,72 @@ end NATS.connect(opts).jetstream end - def read_events - event_files = Dir.glob(File.join(events_dir, "/**/*.json")) - validate_unique_event_types!(event_files) + def read_schemas + schema_files = Dir.glob(File.join(schemas_dir, "/**/*.json")) + validate_unique_schema_names!(schema_files) - event_files.each do |event_file| - thor.say "Loading 'event #{event_file}'" - data_schema = JSON.parse(File.read(event_file)) - event_type = File.basename(event_file, ".json") - validate_schema!(event_type, data_schema) - Polyn::Cli::Naming.validate_event_type!(event_type) + schema_files.each do |schema_file| + thor.say "Loading 'schema #{schema_file}'" + data_schema = JSON.parse(File.read(schema_file)) + schema_name = File.basename(schema_file, ".json") + validate_schema!(schema_name, data_schema) + Polyn::Cli::Naming.validate_message_name!(schema_name) schema = compose_cloud_event(data_schema) - events[event_type] = schema + schemas[schema_name] = schema end end - def validate_unique_event_types!(event_files) - duplicates = find_duplicates(event_files) + def validate_unique_schema_names!(schema_files) + duplicates = find_duplicates(schema_files) return if duplicates.empty? - messages = duplicates.reduce([]) do |memo, (event_type, files)| - memo << [event_type, *files].join("\n") + messages = duplicates.reduce([]) do |memo, (schema_name, files)| + memo << [schema_name, *files].join("\n") end message = [ - "There can only be one of each event type. The following events were duplicated:", + "There can only be one of each schema name. The following schemas were duplicated:", *messages, ].join("\n") raise Polyn::Cli::ValidationError, message end - def find_duplicates(event_files) - event_types = event_files.group_by do |event_file| - File.basename(event_file, ".json") + def find_duplicates(schema_files) + schema_names = schema_files.group_by do |schema_file| + File.basename(schema_file, ".json") end - event_types.each_with_object({}) do |(event_type, files), hash| - hash[event_type] = files if files.length > 1 + schema_names.each_with_object({}) do |(schema_name, files), hash| + hash[schema_name] = files if files.length > 1 hash end end - def validate_schema!(event_type, schema) + def validate_schema!(schema_name, schema) JSONSchemer.schema(schema) rescue StandardError => e raise Polyn::Cli::ValidationError, - "Invalid JSON Schema document for event #{event_type}\n#{e.message}\n"\ + "Invalid JSON Schema document for event #{schema_name}\n#{e.message}\n"\ "#{JSON.pretty_generate(schema)}" end - def compose_cloud_event(event_schema) + def compose_cloud_event(data_schema) cloud_event_schema.merge({ "definitions" => cloud_event_schema["definitions"].merge({ - "datadef" => event_schema, + "datadef" => data_schema, }), }) end - def load_existing_events + def load_existing_schemas sub = client.subscribe("#{key_prefix}.>") loop do - msg = sub.next_msg - existing_events[msg.subject.gsub("#{key_prefix}.", "")] = msg.data unless msg.data.empty? + msg = sub.next_msg + existing_schemas[msg.subject.gsub("#{key_prefix}.", "")] = msg.data unless msg.data.empty? # A timeout is the only mechanism given to indicate there are no # more messages rescue NATS::IO::Timeout break end @@ -138,14 +138,14 @@ def key_prefix "$KV.#{store_name}" end - def delete_missing_events - missing_events = existing_events.keys - events.keys - missing_events.each do |event| - thor.say "Deleting event #{event}" - bucket.delete(event) + def delete_missing_schemas + missing_schemas = existing_schemas.keys - schemas.keys + missing_schemas.each do |schema| + thor.say "Deleting schema #{schema}" + bucket.delete(schema) end end end end end