lib/polyn/cli/schema_loader.rb in polyn-cli-0.1.9 vs lib/polyn/cli/schema_loader.rb in polyn-cli-0.2.0
- old
+ new
@@ -1,58 +1,58 @@
# frozen_string_literal: true
module Polyn
class Cli
##
- # Loads the JSON schmea into the event registry.
+ # Loads the JSON schema into the schema registry.
class SchemaLoader
include Thor::Actions
STORE_NAME = "POLYN_SCHEMAS"
##
- # Loads the events from the event repository into the Polyn event registry.
+ # Loads the schemas from the schema repository into the Polyn schema registry.
# @return [Bool]
def self.load(cli)
- new(cli).load_events
+ new(cli).load_schemas
end
def initialize(thor, **opts)
@thor = thor
@client = connect
@store_name = opts.fetch(:store_name, STORE_NAME)
@bucket = client.key_value(@store_name)
@cloud_event_schema = Polyn::Cli::CloudEvent.to_h.freeze
- @events_dir = opts.fetch(:events_dir, File.join(Dir.pwd, "events"))
- @events = {}
- @existing_events = {}
+ @schemas_dir = opts.fetch(:schemas_dir, File.join(Dir.pwd, "schemas"))
+ @schemas = {}
+ @existing_schemas = {}
end
- def load_events
- thor.say "Loading events into the Polyn event registry from '#{events_dir}'"
- read_events
- load_existing_events
+ def load_schemas
+ thor.say "Loading schemas into the Polyn schema registry from '#{schemas_dir}'"
+ read_schemas
+ load_existing_schemas
- events.each do |name, event|
- bucket.put(name, JSON.generate(event))
+ schemas.each do |name, schema|
+ bucket.put(name, JSON.generate(schema))
end
- delete_missing_events
+ delete_missing_schemas
true
end
private
attr_reader :thor,
- :events,
+ :schemas,
:client,
:bucket,
:cloud_event_schema,
- :events_dir,
+ :schemas_dir,
:store_name,
- :existing_events
+ :existing_schemas
def connect
opts = {
max_reconnect_attempts: 5,
reconnect_time_wait: 0.5,
@@ -64,72 +64,72 @@
end
NATS.connect(opts).jetstream
end
- def read_events
- event_files = Dir.glob(File.join(events_dir, "/**/*.json"))
- validate_unique_event_types!(event_files)
+ def read_schemas
+ schema_files = Dir.glob(File.join(schemas_dir, "/**/*.json"))
+ validate_unique_schema_names!(schema_files)
- event_files.each do |event_file|
- thor.say "Loading 'event #{event_file}'"
- data_schema = JSON.parse(File.read(event_file))
- event_type = File.basename(event_file, ".json")
- validate_schema!(event_type, data_schema)
- Polyn::Cli::Naming.validate_event_type!(event_type)
+ schema_files.each do |schema_file|
+ thor.say "Loading 'schema #{schema_file}'"
+ data_schema = JSON.parse(File.read(schema_file))
+ schema_name = File.basename(schema_file, ".json")
+ validate_schema!(schema_name, data_schema)
+ Polyn::Cli::Naming.validate_message_name!(schema_name)
schema = compose_cloud_event(data_schema)
- events[event_type] = schema
+ schemas[schema_name] = schema
end
end
- def validate_unique_event_types!(event_files)
- duplicates = find_duplicates(event_files)
+ def validate_unique_schema_names!(schema_files)
+ duplicates = find_duplicates(schema_files)
return if duplicates.empty?
- messages = duplicates.reduce([]) do |memo, (event_type, files)|
- memo << [event_type, *files].join("\n")
+ messages = duplicates.reduce([]) do |memo, (schema_name, files)|
+ memo << [schema_name, *files].join("\n")
end
message = [
- "There can only be one of each event type. The following events were duplicated:",
+ "There can only be one of each schema name. The following schemas were duplicated:",
*messages,
].join("\n")
raise Polyn::Cli::ValidationError, message
end
- def find_duplicates(event_files)
- event_types = event_files.group_by do |event_file|
- File.basename(event_file, ".json")
+ def find_duplicates(schema_files)
+ schema_names = schema_files.group_by do |schema_file|
+ File.basename(schema_file, ".json")
end
- event_types.each_with_object({}) do |(event_type, files), hash|
- hash[event_type] = files if files.length > 1
+ schema_names.each_with_object({}) do |(schema_name, files), hash|
+ hash[schema_name] = files if files.length > 1
hash
end
end
- def validate_schema!(event_type, schema)
+ def validate_schema!(schema_name, schema)
JSONSchemer.schema(schema)
rescue StandardError => e
raise Polyn::Cli::ValidationError,
- "Invalid JSON Schema document for event #{event_type}\n#{e.message}\n"\
+ "Invalid JSON Schema document for event #{schema_name}\n#{e.message}\n"\
"#{JSON.pretty_generate(schema)}"
end
- def compose_cloud_event(event_schema)
+ def compose_cloud_event(data_schema)
cloud_event_schema.merge({
"definitions" => cloud_event_schema["definitions"].merge({
- "datadef" => event_schema,
+ "datadef" => data_schema,
}),
})
end
- def load_existing_events
+ def load_existing_schemas
sub = client.subscribe("#{key_prefix}.>")
loop do
- msg = sub.next_msg
- existing_events[msg.subject.gsub("#{key_prefix}.", "")] = msg.data unless msg.data.empty?
+ msg = sub.next_msg
+ existing_schemas[msg.subject.gsub("#{key_prefix}.", "")] = msg.data unless msg.data.empty?
# A timeout is the only mechanism given to indicate there are no
# more messages
rescue NATS::IO::Timeout
break
end
@@ -138,14 +138,14 @@
def key_prefix
"$KV.#{store_name}"
end
- def delete_missing_events
- missing_events = existing_events.keys - events.keys
- missing_events.each do |event|
- thor.say "Deleting event #{event}"
- bucket.delete(event)
+ def delete_missing_schemas
+ missing_schemas = existing_schemas.keys - schemas.keys
+ missing_schemas.each do |schema|
+ thor.say "Deleting schema #{schema}"
+ bucket.delete(schema)
end
end
end
end
end