Sha256: 8877ad1b819a935bed2f33c60bf329c306406ed43323f1f8eba86b05400d2b80

Contents?: true

Size: 1.92 KB

Versions: 1

Compression:

Stored size: 1.92 KB

Contents

module Chronicle
  module ETL
    class Job
      attr_accessor :name,
                    :extractor_klass,
                    :extractor_options,
                    :transformer_klass,
                    :transformer_options,
                    :loader_klass,
                    :loader_options

      # TODO: build a proper id system
      alias id name

      def initialize(definition)
        definition = definition.definition # FIXME
        @name = definition[:name]
        @extractor_klass = load_klass(:extractor, definition[:extractor][:name])
        @extractor_options = definition[:extractor][:options] || {}

        @transformer_klass = load_klass(:transformer, definition[:transformer][:name])
        @transformer_options = definition[:transformer][:options] || {}

        @loader_klass = load_klass(:loader, definition[:loader][:name])
        @loader_options = definition[:loader][:options] || {}

        set_continuation if load_continuation?
        yield self if block_given?
      end

      def instantiate_extractor
        instantiate_klass(:extractor)
      end

      def instantiate_transformer(data)
        instantiate_klass(:transformer, data)
      end

      def instantiate_loader
        instantiate_klass(:loader)
      end

      def save_log?
        # TODO: this needs more nuance
        return !id.nil?
      end

      private

      def instantiate_klass(phase, *args)
        options = self.send("#{phase.to_s}_options")
        args = args.unshift(options)
        klass = self.send("#{phase.to_s}_klass")
        klass.new(*args)
      end

      def load_klass phase, identifier
        Chronicle::ETL::Catalog.phase_and_identifier_to_klass(phase, identifier)
      end

      def set_continuation
        continuation = Chronicle::ETL::JobLogger.load_latest(@job_id)
        @extractor_options[:continuation] = continuation
      end

      def load_continuation?
        save_log?
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
chronicle-etl-0.2.4 lib/chronicle/etl/job.rb