# # The main Ductr module. module Ductr VERSION: String # The adapter classes registry, all declared adapters are in the registry. # # _@return_ — The registry instance def self.adapter_registry: () -> Registry # The trigger classes registry, all declared triggers are in the registry. # # _@return_ — The registry instance def self.trigger_registry: () -> Registry # The Ductr current environment, "development" by default. # You can change it by setting the `DUCTR_ENV` environment variable. # # _@return_ — The Ductr environment def self.env: () -> String # Determines if Ductr is in development mode. # # _@return_ — True if DUCTR_ENV is set to "development" or nil def self.development?: () -> bool # Determines if Ductr is in production mode. # # _@return_ — True if DUCTR_ENV is set to "production" def self.production?: () -> bool # The configure block allows to configure Ductr internals. # You must calls this method one and only one time to use the framework. def self.configure: () ?{ (Configuration config) -> void } -> void # The Ductr main logger instance. # # _@return_ — The logger instance def self.logger: () -> Log::Logger # sord warn - ActiveSupport::Cache::Store wasn't able to be resolved to a constant in this project # The Ductr store, used to share information across different instances. # # _@return_ — The store instance def self.store: () -> ActiveSupport::Cache::Store # Contains all the Ductr configuration. # # _@return_ — The configuration instance def self.config: () -> Configuration class AdapterNotFoundError < StandardError end class ControlNotFoundError < StandardError end class InconsistentPaginationError < StandardError end # # The base class for any job, you can use it directly if you don't need an ETL job. class Job < ActiveJob::Base include Ductr::JobStatus extend Annotable extend Forwardable # sord omit - no YARD type given for "*_", using untyped # The active job's perform method. DO NOT override it, implement the #run method instead. def perform: (*untyped _) -> void # The configured adapter instances. # # _@param_ `name` — The adapter name # # _@return_ — The adapter corresponding to the given name def adapter: (Symbol name) -> Adapter # The job's logger instance. # # _@return_ — The logger instance def logger: () -> Ductr::Log::Logger # The entry point of jobs. def run: () -> void # Writes the job's status into the Ductr's store. # # _@param_ `status` — The status of the job def status=: (Symbol status) -> void # Determines whether the job has a `completed` or `failed` status. # # _@return_ — True when the status is `completed` or `failed` def stopped?: () -> bool # _@return_ — The occurred error if any attr_reader error: Exception # _@return_ — The job's status, one of `:queued`, `:working`, `:completed` and `:failed` attr_reader status: Symbol end # # Store interaction helpers for internal usage. module Store extend Ductr::Store::JobStore extend Ductr::Store::PipelineStore EXPIRATION_INTERVAL: Integer # Get all known job instances for the given registry_key and job's key_prefix. # # _@param_ `registry_key` — The registry key in which job keys will be read # # _@param_ `key_prefix` — The cache key prefix for the registry's job keys # # _@return_ — The job instances def self.all: (String registry_key, String key_prefix) -> ::Array[Job] # Read all given jobs in the given key_prefix. # # _@param_ `key_prefix` — The cache key prefix for the job_id # # _@param_ `*jobs` — The jobs to read # # _@return_ — The read jobs def self.read: (String key_prefix, *::Array[Job] jobs) -> ::Array[Job] # sord omit - no YARD type given for "key_prefix", using untyped # Update the given job in the given key_prefix. # # _@param_ `job` — The job to update in the store def self.write: (untyped key_prefix, Job job) -> void # sord omit - no YARD type given for "registry_key", using untyped # Add the given job to the store's job registry. This method is NOT thread-safe. # # _@param_ `job` — The job to register def self.register: (untyped registry_key, Job job) -> void # Determines whether all tracked jobs have either a completed or failed status. # # _@return_ — `true` when all jobs are done def self.all_done?: () -> bool # Get all known job instances. # # _@return_ — The job instances def self.all_jobs: () -> ::Array[Job] # Read all given jobs. # # _@param_ `*jobs` — The jobs to read # # _@return_ — The read jobs def self.read_jobs: (*::Array[Job] jobs) -> ::Array[Job] # Update the given job. # # _@param_ `job` — The job to update in the store def self.write_job: (Job job) -> void # Add the given job to the store's job registry. This method is NOT thread-safe. # # _@param_ `job` — The job to register def self.register_job: (Job job) -> void # Convert the given job into a `SerializedJob` struct. # # _@param_ `job` — The job to serialize # # _@return_ — The job converted into struct def self.serialize_job: (Job job) -> SerializedJob # sord infer - SerializedPipeline was resolved to Ductr::Store::PipelineSerializer::SerializedPipeline # Get all known pipeline instances. # # _@return_ — The pipeline instances def self.all_pipelines: () -> ::Array[Ductr::Store::PipelineSerializer::SerializedPipeline] # Update the given pipeline. # # _@param_ `pipeline` — The pipeline to update in the store def self.write_pipeline: (Pipeline pipeline) -> void # Add the given pipeline to the store's pipeline registry. This method is NOT thread-safe. # # _@param_ `pipeline` — The job to register def self.register_pipeline: (Pipeline pipeline) -> void # Convert the given pipeline and its steps into # `SerializedPipeline` and `SerializedPipelineStep` structs. # # _@param_ `pipeline` — The pipeline to serialize # # _@return_ — The pipeline converted into struct def self.serialize_pipeline: (Pipeline pipeline) -> SerializedPipeline # # Job's level store interactions. module JobStore include Ductr::Store::JobSerializer JOB_KEY_PREFIX: String JOB_REGISTRY_KEY: String # Get all known job instances. # # _@return_ — The job instances def all_jobs: () -> ::Array[Job] # Read all given jobs. # # _@param_ `*jobs` — The jobs to read # # _@return_ — The read jobs def read_jobs: (*::Array[Job] jobs) -> ::Array[Job] # Update the given job. # # _@param_ `job` — The job to update in the store def write_job: (Job job) -> void # Add the given job to the store's job registry. This method is NOT thread-safe. # # _@param_ `job` — The job to register def register_job: (Job job) -> void # Convert the given job into a `SerializedJob` struct. # # _@param_ `job` — The job to serialize # # _@return_ — The job converted into struct def serialize_job: (Job job) -> SerializedJob end # # Convert jobs into active job serializable structs. module JobSerializer # Convert the given job into a `SerializedJob` struct. # # _@param_ `job` — The job to serialize # # _@return_ — The job converted into struct def serialize_job: (Job job) -> SerializedJob # # @!parse # # # # The job representation as a struct. # # # # @!attribute [r] job_id # # @return [String] The active job's job id # # # # @!attribute [r] status # # @return [Symbol] The job's status # # # # @!attribute [r] error # # @return [Exception, nil] The job's error if any # # # class SerializedJob < Struct # # # # @param [String] job_id Active job's job id # # @param [Symbol] status Job's status # # @param [Exception, nil] error Job's error # # # def initialize(job_id, status, error) # @job_id = job_id # @status = status # @error = error # end # end class SerializedJob < Struct # _@param_ `job_id` — Active job's job id # # _@param_ `status` — Job's status # # _@param_ `error` — Job's error def initialize: (String job_id, Symbol status, Exception? error) -> void # Determines whether the job has a `completed` or `failed` status. # # _@return_ — True when the status is `completed` or `failed` def stopped?: () -> bool # _@return_ — The active job's job id attr_reader job_id: String # _@return_ — The job's status attr_reader status: Symbol # _@return_ — The job's error if any attr_reader error: Exception? end end # # Pipeline's level store interactions. module PipelineStore include Ductr::Store::PipelineSerializer PIPELINE_KEY_PREFIX: String PIPELINE_REGISTRY_KEY: String # sord infer - SerializedPipeline was resolved to Ductr::Store::PipelineSerializer::SerializedPipeline # Get all known pipeline instances. # # _@return_ — The pipeline instances def all_pipelines: () -> ::Array[Ductr::Store::PipelineSerializer::SerializedPipeline] # Update the given pipeline. # # _@param_ `pipeline` — The pipeline to update in the store def write_pipeline: (Pipeline pipeline) -> void # Add the given pipeline to the store's pipeline registry. This method is NOT thread-safe. # # _@param_ `pipeline` — The job to register def register_pipeline: (Pipeline pipeline) -> void # Convert the given pipeline and its steps into # `SerializedPipeline` and `SerializedPipelineStep` structs. # # _@param_ `pipeline` — The pipeline to serialize # # _@return_ — The pipeline converted into struct def serialize_pipeline: (Pipeline pipeline) -> SerializedPipeline # Convert the given job into a `SerializedJob` struct. # # _@param_ `job` — The job to serialize # # _@return_ — The job converted into struct def serialize_job: (Job job) -> SerializedJob end # # Convert pipelines and steps into active job serializable structs. module PipelineSerializer include Ductr::Store::JobSerializer # Convert the given pipeline and its steps into # `SerializedPipeline` and `SerializedPipelineStep` structs. # # _@param_ `pipeline` — The pipeline to serialize # # _@return_ — The pipeline converted into struct def serialize_pipeline: (Pipeline pipeline) -> SerializedPipeline # Convert the given job into a `SerializedJob` struct. # # _@param_ `job` — The job to serialize # # _@return_ — The job converted into struct def serialize_job: (Job job) -> SerializedJob # # @!parse # # # # The pipeline representation as a struct. # # # # @!attribute [r] job_id # # @return [String] The active job's job id # # # # @!attribute [r] status # # @return [Symbol] The pipeline job status # # # # @!attribute [r] error # # @return [Exception, nil] The pipeline job error if any # # # # @!attribute [r] steps # # @return [Array] The pipeline steps as struct # # # class SerializedPipeline < Struct # # # # @param [String] job_id Pipeline job id # # @param [Symbol] status Pipeline status # # @param [Exception, nil] error Pipeline error # # @param [Array] steps Pipeline steps as struct # # # def initialize(job_id, status, error, steps) # @job_id = job_id # @status = status # @error = error # @steps = steps # end # end class SerializedPipeline < Struct # _@param_ `job_id` — Pipeline job id # # _@param_ `status` — Pipeline status # # _@param_ `error` — Pipeline error # # _@param_ `steps` — Pipeline steps as struct def initialize: ( String job_id, Symbol status, Exception? error, ::Array[SerializedPipelineStep] steps ) -> void # Determines whether the pipeline has a `completed` or `failed` status. # # _@return_ — True when the status is `completed` or `failed` def stopped?: () -> bool # _@return_ — The active job's job id attr_reader job_id: String # _@return_ — The pipeline job status attr_reader status: Symbol # _@return_ — The pipeline job error if any attr_reader error: Exception? # _@return_ — The pipeline steps as struct attr_reader steps: ::Array[SerializedPipelineStep] end # # @!parse # # # # The pipeline step representation as a struct. # # # # @!attribute [r] jobs # # @return [Array] The step's jobs # # # # @!attribute [r] done # # @return [Boolean] The step's fiber state # # # class SerializedPipelineStep < Struct # # # # @param [Array] jobs The step's jobs # # @param [Boolean] done The step's fiber state # # # def initialize(jobs, done) # @jobs = jobs # @done = done # end # end class SerializedPipelineStep < Struct # _@param_ `jobs` — The step's jobs # # _@param_ `done` — The step's fiber state def initialize: (::Array[Job] jobs, bool done) -> void # Check if the step is done. # # _@return_ — True if the step is done def done?: () -> bool # _@return_ — The step's jobs attr_reader jobs: ::Array[Job] # _@return_ — The step's fiber state attr_reader done: bool end end end # # Base adapter class, your adapter should inherit of this class. class Adapter # All the sources declared for this adapter goes here. # # _@return_ — The registry instance def self.source_registry: () -> Registry # All the lookups declared for this adapter goes here. # # _@return_ — The registry instance def self.lookup_registry: () -> Registry # All the destinations declared for this adapter goes here. # # _@return_ — The registry instance def self.destination_registry: () -> Registry # All the triggers declared for this adapter goes here. # # _@return_ — The registry instance def self.trigger_registry: () -> Registry # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Creates a new adapter instance. # # _@param_ `name` — The adapter instance name, mandatory, must be unique # # _@param_ `**config` — The adapter configuration hash def initialize: (Symbol name, **SORD_ERROR_SORD_ERROR_SymbolObject config) -> void # Allow use of adapter with block syntax, automatically closes on block exit. def open: () -> void # Opens the adapter before using it e.g. open connection, authenticate to http endpoint, open file... # This method may return something, as a connection object. def open!: () -> void # Closes the adapter when finished e.g. close connection, drop http session, close file... def close!: () -> void # _@return_ — the adapter instance name attr_reader name: Symbol # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # _@return_ — the adapter configuration hash attr_reader config: SORD_ERROR_SORD_ERROR_SymbolObject end # # Base class for ETL job using the experimental fiber runner. # Usage example: # # class MyETLJob < Ductr::ETLJob # source :first_db, :basic # send_to :the_transform, :the_other_transform # def the_source(db) # # ... # end # # transform # send_to :the_destination # def the_transform(row) # # ... # end # # destination :first_db, :basic # def the_destination(row, db) # # ... # end # # transform # send_to :the_other_destination # def the_other_transform(row) # # ... # end # # destination :second_db, :basic # def the_other_destination(row, db) # # ... # end # end class ETLJob < Ductr::Job include Ductr::JobETLRunner include Ductr::ETL::Parser ETL_RUNNER_CLASS: Class # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a source method # # _@param_ `adapter_name` — The adapter the source is running on # # _@param_ `source_type` — The type of source to run # # _@param_ `**source_options` — The options to pass to the source # # Source with Sequel SQLite adapter # ```ruby # source :my_adapter, :paginated, page_size: 42 # def my_source(db, offset, limit) # db[:items].offset(offset).limit(limit) # end # ``` # # _@see_ `The` — chosen adapter documentation for further information on sources usage. def self.source: (Symbol adapter_name, Symbol source_type, **SORD_ERROR_SORD_ERROR_SymbolObject source_options) -> void # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a transform method # # _@param_ `transform_class` — The class the transform is running on # # _@param_ `**transform_options` — The options to pass to the transform # # Transform without params # ```ruby # transform # def rename_keys(row) # row[:new_name] = row.delete[:old_name] # row[:new_email] = row.delete[:old_email] # end # ``` # # Transform with params # ```ruby # class RenameTransform < Ductr::ETL::Transform # def process(row) # call_method.each do |actual_name, new_name| # new_key = "#{options[:prefix]}#{new_name}".to_sym # # row[new_key] = row.delete(actual_name) # end # end # end # # transform RenameTransform, prefix: "some_" # def rename # { old_name: :new_name, old_email: :new_email } # end # ``` def self.transform: (Class? transform_class, **SORD_ERROR_SORD_ERROR_SymbolObject transform_options) -> void # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a lookup method # # _@param_ `adapter_name` — The adapter the lookup is running on # # _@param_ `lookup_type` — The type of lookup to run # # _@param_ `**lookup_options` — The options to pass to the lookup # # Lookup with Sequel SQLite adapter # ```ruby # lookup :my_other_adapter, :match, merge: [:id, :item], buffer_size: 4 # def joining_different_adapters(db, ids) # db[:items_bis].select(:id, :item, :name).where(item: ids) # end # ``` # # _@see_ `The` — chosen adapter documentation for further information on lookups usage. def self.lookup: (Symbol adapter_name, Symbol lookup_type, **SORD_ERROR_SORD_ERROR_SymbolObject lookup_options) -> void # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a destination method # # _@param_ `adapter_name` — The adapter the destination is running on # # _@param_ `destination_type` — The type of destination to run # # _@param_ `**destination_options` — The options to pass to the destination # # Destination with Sequel SQLite adapter # ```ruby # destination :my_other_adapter, :basic # def my_destination(row, db) # db[:new_items].insert(name: row[:name], new_name: row[:new_name]) # end # ``` # # _@see_ `The` — chosen adapter documentation for further information on destinations usage. def self.destination: (Symbol adapter_name, Symbol destination_type, **SORD_ERROR_SORD_ERROR_SymbolObject destination_options) -> void # Annotation to define which methods will follow the current one # # _@param_ `*methods` — The names of the following methods # # Source with Sequel SQLite adapter sending rows to two transforms # ```ruby # source :my_adapter, :paginated, page_size: 42 # send_to :my_first_transform, :my_second_transform # def my_source(db, offset, limit) # db[:items].offset(offset).limit(limit) # end # # transform # def my_first_transform(row) # # ... # end # # transform # def my_second_transform(row) # # ... # end # ``` def self.send_to: (*::Array[Symbol] methods) -> void # Handles sources, transforms and destinations controls. # Handles send_to directives, used to do the plumbing between controls. # Used for both kiba and fiber runners initialization. # # _@return_ — The job's controls def parse_annotations: () -> ::Array[(Source | Transform | Destination | ::Hash[Symbol, ::Array[Symbol]])] # Currently used adapters set. # # _@return_ — The current adapters def adapters: () -> ::Set[untyped] # sord warn - method is probably not a type, but using anyway # sord warn - method wasn't able to be resolved to a constant in this project # Finds the method(s) associated to the given annotation names in the job class. # # _@param_ `*annotation_names` — The annotation names of the searched methods # # _@return_ — Returns mapped array containing the block's returned value def find_method: (*::Array[Symbol] annotation_names) ?{ (method A) -> void } -> ::Array[untyped] # Initializes adapter controls for the given type. # # _@param_ `control_type` — The adapter control type, one of :source or :destination # # _@return_ — The initialized adapter controls def init_adapter_controls: (Symbol control_type) -> ::Array[(Source | Destination)] # Initializes transform controls for the given types. # # _@param_ `*control_types` — The transform control types, :transform and/or :lookup # # _@return_ — The initialized transform controls def init_transform_controls: (*::Array[Symbol] control_types) -> ::Array[Transform] # sord warn - Annotable::Method wasn't able to be resolved to a constant in this project # Initializes an adapter control (source, lookup or destination) based on the given annotated method. # # _@param_ `annotated_method` — The control's method # # _@return_ — The adapter control instance def adapter_control: (Annotable::Method annotated_method) -> Control # sord warn - Annotable::Method wasn't able to be resolved to a constant in this project # Initializes a transform control. # # _@param_ `annotated_method` — The transform's method # # _@return_ — The transform control instance def transform_control: (Annotable::Method annotated_method) -> Transform # Parse job's annotations and create the runner instance. def initialize: () -> void # Opens adapters, executes the runner and then closes back adapters. def run: () -> void end # # The base class for any trigger, can be initialized by passing it its adapter name if any. # A trigger must implement the #add method which is called for each trigger declaration. # Depending on what your trigger do, you may have to implement the #start and #stop methods. # #start is called when the scheduler relying on the trigger is started. #stop does the opposite: # it is called when the scheduler relying on the trigger is stopped. class Trigger # sord warn - Nil wasn't able to be resolved to a constant in this project # Creates a new trigger instance, called by the scheduler. # # _@param_ `adapter` — The trigger's adapter, if any def initialize: (?(Adapter | Nil)? adapter) -> void # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Adds a new trigger, called by a scheduler when a trigger is declared. # # _@param_ `_method` — The scheduler method to be called by the trigger # # _@param_ `_options` — options The options of the trigger declaration def add: (Method _method, SORD_ERROR_SORD_ERROR_SymbolObject _options) -> void # Called when the scheduler relying on the trigger is started. def start: () -> void # Called when the scheduler relying on the trigger is stopped. def stop: () -> void # sord omit - no YARD type given for :adapter, using untyped # Returns the value of attribute adapter. attr_reader adapter: untyped end module CLI # # The main CLI is started when used inside a ductr project folder. # It exposes scheduling and monitoring tasks. class Main < Thor # sord omit - no YARD type given for "job_name", using untyped # sord omit - no YARD return type given, using untyped def perform: (untyped job_name) -> untyped # sord omit - no YARD type given for "*scheduler_names", using untyped # sord omit - no YARD return type given, using untyped def schedule: (*untyped scheduler_names) -> untyped # Keeps the thread alive until Ctrl-C is pressed. def sleep_until_interrupt: () -> void end # # The default CLI is started when no project folder was found. # It expose project and adapter generation tasks. class Default < Thor # Generates a new project # # _@param_ `name` — The project's name def new: (?String? name) -> void end # # Thor generator to create a new project class NewProjectGenerator < Thor::Group include Thor::Actions # The templates source used to create a new project # # _@return_ — the templates source absolute path def self.source_root: () -> String # Doing some setup before generating file, # creates the project directory and sets it as destination for the generator def init: () -> void # Creates files in the project's root def gen_root: () -> void # Creates the bin file for the project def gen_bin: () -> void # Creates files in the `config` folder def gen_config: () -> void end end # # Base class for ETL job using kiba's streaming runner. # Example using the SQLite adapter: # # class MyKibaJob < Ductr::KibaJob # source :some_adapter, :paginated, page_size: 4 # def select_some_stuff(db, offset, limit) # db[:items].offset(offset).limit(limit) # end # # lookup :some_adapter, :match, merge: [:id, :item], buffer_size: 4 # def merge_with_stuff(db, ids) # db[:items_bis].select(:id, Sequel.as(:name, :name_bis), :item).where(item: ids) # end # # transform # def generate_more_stuff(row) # { name: "#{row[:name]}_#{row[:name_bis]}" } # end # # destination :some_other_adapter, :basic # def my_destination(row, db) # logger.trace("Hello destination: #{row}") # db[:new_items].insert(name: row[:name]) # end # end # # @see The chosen adapter documentation for further information on controls usage. class KibaJob < Ductr::Job include Ductr::JobETLRunner include Ductr::ETL::Parser ETL_RUNNER_CLASS: Class # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a source method # # _@param_ `adapter_name` — The adapter the source is running on # # _@param_ `source_type` — The type of source to run # # _@param_ `**source_options` — The options to pass to the source # # Source with Sequel SQLite adapter # ```ruby # source :my_adapter, :paginated, page_size: 42 # def my_source(db, offset, limit) # db[:items].offset(offset).limit(limit) # end # ``` # # _@see_ `The` — chosen adapter documentation for further information on sources usage. def self.source: (Symbol adapter_name, Symbol source_type, **SORD_ERROR_SORD_ERROR_SymbolObject source_options) -> void # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a transform method # # _@param_ `transform_class` — The class the transform is running on # # _@param_ `**transform_options` — The options to pass to the transform # # Transform without params # ```ruby # transform # def rename_keys(row) # row[:new_name] = row.delete[:old_name] # row[:new_email] = row.delete[:old_email] # end # ``` # # Transform with params # ```ruby # class RenameTransform < Ductr::ETL::Transform # def process(row) # call_method.each do |actual_name, new_name| # new_key = "#{options[:prefix]}#{new_name}".to_sym # # row[new_key] = row.delete(actual_name) # end # end # end # # transform RenameTransform, prefix: "some_" # def rename # { old_name: :new_name, old_email: :new_email } # end # ``` def self.transform: (Class? transform_class, **SORD_ERROR_SORD_ERROR_SymbolObject transform_options) -> void # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a lookup method # # _@param_ `adapter_name` — The adapter the lookup is running on # # _@param_ `lookup_type` — The type of lookup to run # # _@param_ `**lookup_options` — The options to pass to the lookup # # Lookup with Sequel SQLite adapter # ```ruby # lookup :my_other_adapter, :match, merge: [:id, :item], buffer_size: 4 # def joining_different_adapters(db, ids) # db[:items_bis].select(:id, :item, :name).where(item: ids) # end # ``` # # _@see_ `The` — chosen adapter documentation for further information on lookups usage. def self.lookup: (Symbol adapter_name, Symbol lookup_type, **SORD_ERROR_SORD_ERROR_SymbolObject lookup_options) -> void # sord warn - "Symbol: Object" does not appear to be a type # sord warn - Invalid hash, must have exactly two types: "Hash". # Annotation to define a destination method # # _@param_ `adapter_name` — The adapter the destination is running on # # _@param_ `destination_type` — The type of destination to run # # _@param_ `**destination_options` — The options to pass to the destination # # Destination with Sequel SQLite adapter # ```ruby # destination :my_other_adapter, :basic # def my_destination(row, db) # db[:new_items].insert(name: row[:name], new_name: row[:new_name]) # end # ``` # # _@see_ `The` — chosen adapter documentation for further information on destinations usage. def self.destination: (Symbol adapter_name, Symbol destination_type, **SORD_ERROR_SORD_ERROR_SymbolObject destination_options) -> void # Handles sources, transforms and destinations controls. # Handles send_to directives, used to do the plumbing between controls. # Used for both kiba and fiber runners initialization. # # _@return_ — The job's controls def parse_annotations: () -> ::Array[(Source | Transform | Destination | ::Hash[Symbol, ::Array[Symbol]])] # Currently used adapters set. # # _@return_ — The current adapters def adapters: () -> ::Set[untyped] # sord warn - method is probably not a type, but using anyway # sord warn - method wasn't able to be resolved to a constant in this project # Finds the method(s) associated to the given annotation names in the job class. # # _@param_ `*annotation_names` — The annotation names of the searched methods # # _@return_ — Returns mapped array containing the block's returned value def find_method: (*::Array[Symbol] annotation_names) ?{ (method A) -> void } -> ::Array[untyped] # Initializes adapter controls for the given type. # # _@param_ `control_type` — The adapter control type, one of :source or :destination # # _@return_ — The initialized adapter controls def init_adapter_controls: (Symbol control_type) -> ::Array[(Source | Destination)] # Initializes transform controls for the given types. # # _@param_ `*control_types` — The transform control types, :transform and/or :lookup # # _@return_ — The initialized transform controls def init_transform_controls: (*::Array[Symbol] control_types) -> ::Array[Transform] # sord warn - Annotable::Method wasn't able to be resolved to a constant in this project # Initializes an adapter control (source, lookup or destination) based on the given annotated method. # # _@param_ `annotated_method` — The control's method # # _@return_ — The adapter control instance def adapter_control: (Annotable::Method annotated_method) -> Control # sord warn - Annotable::Method wasn't able to be resolved to a constant in this project # Initializes a transform control. # # _@param_ `annotated_method` — The transform's method # # _@return_ — The transform control instance def transform_control: (Annotable::Method annotated_method) -> Transform # Parse job's annotations and create the runner instance. def initialize: () -> void # Opens adapters, executes the runner and then closes back adapters. def run: () -> void end # # Pipelines allows to easily declare rich data pipelines. # # By using the `after` annotation, you can define steps execution hierarchy. # # `sync` and `async` are useful to define job sequences inside step methods. # # `Pipeline` inherits from `Job` which means that pipeline are enqueued as any other job. # Pipelines are enqueued in the :ductr_pipelines queue. # # class MyPipeline < Ductr::Pipeline # def first_step # sync(MyJob, 1) # async(SomeJob) # Executed when `MyJob` is done # end # # after :first_step # def first_parallel_step # Returns when all three `HelloJob` are done # async(HelloJob, :one) # async(HelloJob, :two) # async(HelloJob, :three) # end # # after :first_step # def second_parallel_step # Executed concurrently with :first_parallel_step # async(SomeJob) # async(SomeOtherJob) # sync(HelloJob, :one) # Executed when `SomeJob` and `SomeOtherJob` are done # end # # after :first_parallel_step, :second_parallel_step # def last_step # Executed when `first_parallel_step` and `second_parallel_step` jobs are done # sync(ByeJob) # end # end # # You can define pipelines with only one step by using `after` annotation without parameter: # # class MonoStepPipeline < Ductr::Pipeline # after # def unique_step # async(MyJob) # async(MyJob) # end # end # # A pipeline can inherit from another, allowing you to overload and add steps to the parent pipeline: # # class InheritPipeline < MonoStepPipeline # after :unique_step # def not_that_unique # async(MyJob) # end # end class Pipeline < Ductr::Job # Annotation to define preceding steps on a pipeline step method. # # ```ruby # after :some_step_method, :some_other_step_method # def my_step # # ... # end # ``` def self.after: () -> void # Starts the pipeline runner. def run: () -> void # Initializes the pipeline runner def initialize: () -> void # Puts the given job in the queue and waits for it to be done. # # _@param_ `job_class` — The job to enqueue # # _@param_ `*params` — The job's params def sync: (singleton(Job) job_class, *::Array[Object] params) -> void # Enqueues the given job. # # _@param_ `job_class` — The job to enqueue # # _@param_ `*params` — The job's params def async: (singleton(Job) job_class, *::Array[Object] params) -> void # Writes the pipeline's status into the Ductr's store. # # _@param_ `status` — The status of the job def status=: (Symbol status) -> void # _@return_ — The pipeline's runner instance attr_reader runner: PipelineRunner end class NotFoundInRegistryError < StandardError end # # The registry pattern to store adapters, controls and triggers. class Registry extend Forwardable end end