lib/embulk/input_plugin.rb in embulk-0.3.2 vs lib/embulk/input_plugin.rb in embulk-0.4.0

- old
+ new

@@ -20,18 +20,22 @@ def initialize(task, schema, index, page_builder) @task = task @schema = schema @index = index @page_builder = page_builder + init end + def init + end + def run raise NotImplementedError, "InputPlugin#run must be implemented" end if Embulk.java? - def self.java_object + def self.new_java JavaAdapter.new(self) end class JavaAdapter include Java::InputPlugin @@ -39,56 +43,66 @@ def initialize(ruby_class) @ruby_class = ruby_class end def transaction(java_config, java_control) - config = DataSource.from_java_object(java_config) - next_config_hash = @ruby_class.transaction(config) do |task_source_hash,columns,processor_count| - java_task_source = DataSource.from_ruby_hash(task_source_hash).java_object - java_schema = Schema.new(columns).java_object + config = DataSource.from_java(java_config) + config_diff_hash = @ruby_class.transaction(config) do |task_source_hash,columns,processor_count| + java_task_source = DataSource.from_ruby_hash(task_source_hash).to_java + java_schema = Schema.new(columns).to_java java_commit_reports = java_control.run(java_task_source, java_schema, processor_count) java_commit_reports.map {|java_commit_report| - DataSource.from_java_object(java_commit_report) + DataSource.from_java(java_commit_report) } end # TODO check return type of #transaction - return DataSource.from_ruby_hash(next_config_hash).java_object + return DataSource.from_ruby_hash(config_diff_hash).to_java end def resume(java_task_source, java_schema, processor_count, java_control) - task_source = DataSource.from_java_object(java_task_source) - schema = Schema.from_java_object(java_schema) - next_config_hash = @ruby_class.resume(task_source, schema, processor_count) do |task_source_hash,columns,processor_count| - java_task_source = DataSource.from_ruby_hash(task_source_hash).java_object - java_schema = Schema.new(columns).java_object + task_source = DataSource.from_java(java_task_source) + schema = Schema.from_java(java_schema) + config_diff_hash = @ruby_class.resume(task_source, schema, processor_count) do |task_source_hash,columns,processor_count| + java_task_source = DataSource.from_ruby_hash(task_source_hash).to_java + java_schema = Schema.new(columns).to_java java_commit_reports = java_control.run(java_task_source, java_schema, processor_count) java_commit_reports.map {|java_commit_report| - DataSource.from_java_object(java_commit_report) + DataSource.from_java(java_commit_report) } end # TODO check return type of #resume - return DataSource.from_ruby_hash(next_config_hash).java_object + return DataSource.from_ruby_hash(config_diff_hash).to_java end def cleanup(java_task_source, java_schema, processor_count, java_commit_reports) - task_source = DataSource.from_java_object(java_task_source) - schema = Schema.from_java_object(java_schema) - commit_reports = java_commit_reports.map {|c| DataSource.from_java_object(c) } + task_source = DataSource.from_java(java_task_source) + schema = Schema.from_java(java_schema) + commit_reports = java_commit_reports.map {|c| DataSource.from_java(c) } @ruby_class.cleanup(task_source, schema, processor_count, commit_reports) return nil end def run(java_task_source, java_schema, processor_index, java_output) - task_source = DataSource.from_java_object(java_task_source) - schema = Schema.from_java_object(java_schema) + task_source = DataSource.from_java(java_task_source) + schema = Schema.from_java(java_schema) page_builder = PageBuilder.new(schema, java_output) begin commit_report_hash = @ruby_class.new(task_source, schema, processor_index, page_builder).run - return DataSource.from_ruby_hash(commit_report_hash).java_object + return DataSource.from_ruby_hash(commit_report_hash).to_java ensure page_builder.close end end + end + + def self.from_java(java_class) + JavaPlugin.ruby_adapter_class(java_class, InputPlugin, RubyAdapter) + end + + module RubyAdapter + module ClassMethods + end + # TODO end end end end