Sha256: d31478c136cff3634d15583466243d81c146081f71307afe5e02cfede63f6906

Contents?: true

Size: 1.94 KB

Versions: 2

Compression:

Stored size: 1.94 KB

Contents

module Embulk

  require 'embulk/data_source'
  require 'embulk/schema'
  require 'embulk/page_builder'

  class InputPlugin
    def self.transaction(config, &control)
      raise NotImplementedError, "InputPlugin.transaction(config, &control) must be implemented"
    end

    def initialize(task, schema, index, page_builder)
      @task = task
      @schema = schema
      @index = index
      @page_builder = page_builder
    end

    def run
      raise NotImplementedError, "InputPlugin#run must be implemented"
    end

    if Embulk.java?
      def self.java_object
        JavaAdapter.new(self)
      end

      class JavaAdapter
        include Java::InputPlugin

        def initialize(ruby_class)
          @ruby_class = ruby_class
        end

        def transaction(java_config, java_control)
          config = DataSource.from_java_object(java_config)
          next_config_hash = @ruby_class.transaction(config) do |task_source_hash,columns,processor_count|
            java_task_source = DataSource.from_ruby_hash(task_source_hash).java_object
            java_schema = Schema.new(columns).java_object
            java_commit_reports = java_control.run(java_task_source, java_schema, processor_count)
            java_commit_reports.map {|java_commit_report|
              DataSource.from_java_object(java_commit_report)
            }
          end
          # TODO check return type of #transaction
          return DataSource.from_ruby_hash(next_config_hash).java_object
        end

        def run(java_task_source, java_schema, processor_index, java_output)
          task_source = DataSource.from_java_object(java_task_source)
          schema = Schema.from_java_object(java_schema)
          page_builder = PageBuilder.new(schema, java_output)
          commit_report_hash = @ruby_class.new(task_source, schema, processor_index, page_builder).run
          return DataSource.from_ruby_hash(commit_report_hash).java_object
        end
      end
    end
  end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
embulk-0.2.1 lib/embulk/input_plugin.rb
embulk-0.2.0 lib/embulk/input_plugin.rb