lib/polyn/cli/schema_loader.rb in polyn-cli-0.1.2 vs lib/polyn/cli/schema_loader.rb in polyn-cli-0.1.3

- old
+ new

@@ -17,30 +17,42 @@ end def initialize(thor, **opts) @thor = thor @client = NATS.connect(Polyn::Cli.configuration.nats_servers).jetstream - @bucket = client.key_value(opts.fetch(:store_name, STORE_NAME)) + @store_name = opts.fetch(:store_name, STORE_NAME) + @bucket = client.key_value(@store_name) @cloud_event_schema = Polyn::Cli::CloudEvent.to_h.freeze @events_dir = opts.fetch(:events_dir, File.join(Dir.pwd, "events")) @events = {} + @existing_events = {} end def load_events thor.say "Loading events into the Polyn event registry from '#{events_dir}'" read_events + load_existing_events events.each do |name, event| bucket.put(name, JSON.generate(event)) end + delete_missing_events + true end private - attr_reader :thor, :events, :client, :bucket, :cloud_event_schema, :events_dir + attr_reader :thor, + :events, + :client, + :bucket, + :cloud_event_schema, + :events_dir, + :store_name, + :existing_events def read_events event_files = Dir.glob(File.join(events_dir, "/**/*.json")) validate_unique_event_types!(event_files) @@ -92,9 +104,35 @@ cloud_event_schema.merge({ "definitions" => cloud_event_schema["definitions"].merge({ "datadef" => event_schema, }), }) + end + + def load_existing_events + sub = client.subscribe("#{key_prefix}.>") + + loop do + msg = sub.next_msg + existing_events[msg.subject.gsub("#{key_prefix}.", "")] = msg.data + # A timeout is the only mechanism given to indicate there are no + # more messages + rescue NATS::IO::Timeout + break + end + sub.unsubscribe + end + + def key_prefix + "$KV.#{store_name}" + end + + def delete_missing_events + missing_events = existing_events.keys - events.keys + missing_events.each do |event| + thor.say "Deleting event #{event}" + bucket.delete(event) + end end end end end