lib/polyn/cli/schema_loader.rb in polyn-cli-0.1.2 vs lib/polyn/cli/schema_loader.rb in polyn-cli-0.1.3
- old
+ new
@@ -17,30 +17,42 @@
end
def initialize(thor, **opts)
@thor = thor
@client = NATS.connect(Polyn::Cli.configuration.nats_servers).jetstream
- @bucket = client.key_value(opts.fetch(:store_name, STORE_NAME))
+ @store_name = opts.fetch(:store_name, STORE_NAME)
+ @bucket = client.key_value(@store_name)
@cloud_event_schema = Polyn::Cli::CloudEvent.to_h.freeze
@events_dir = opts.fetch(:events_dir, File.join(Dir.pwd, "events"))
@events = {}
+ @existing_events = {}
end
def load_events
thor.say "Loading events into the Polyn event registry from '#{events_dir}'"
read_events
+ load_existing_events
events.each do |name, event|
bucket.put(name, JSON.generate(event))
end
+ delete_missing_events
+
true
end
private
- attr_reader :thor, :events, :client, :bucket, :cloud_event_schema, :events_dir
+ attr_reader :thor,
+ :events,
+ :client,
+ :bucket,
+ :cloud_event_schema,
+ :events_dir,
+ :store_name,
+ :existing_events
def read_events
event_files = Dir.glob(File.join(events_dir, "/**/*.json"))
validate_unique_event_types!(event_files)
@@ -92,9 +104,35 @@
cloud_event_schema.merge({
"definitions" => cloud_event_schema["definitions"].merge({
"datadef" => event_schema,
}),
})
+ end
+
+ def load_existing_events
+ sub = client.subscribe("#{key_prefix}.>")
+
+ loop do
+ msg = sub.next_msg
+ existing_events[msg.subject.gsub("#{key_prefix}.", "")] = msg.data
+ # A timeout is the only mechanism given to indicate there are no
+ # more messages
+ rescue NATS::IO::Timeout
+ break
+ end
+ sub.unsubscribe
+ end
+
+ def key_prefix
+ "$KV.#{store_name}"
+ end
+
+ def delete_missing_events
+ missing_events = existing_events.keys - events.keys
+ missing_events.each do |event|
+ thor.say "Deleting event #{event}"
+ bucket.delete(event)
+ end
end
end
end
end