lib/embulk/input_plugin.rb in embulk-0.3.2 vs lib/embulk/input_plugin.rb in embulk-0.4.0
- old
+ new
@@ -20,18 +20,22 @@
def initialize(task, schema, index, page_builder)
@task = task
@schema = schema
@index = index
@page_builder = page_builder
+ init
end
+ def init
+ end
+
def run
raise NotImplementedError, "InputPlugin#run must be implemented"
end
if Embulk.java?
- def self.java_object
+ def self.new_java
JavaAdapter.new(self)
end
class JavaAdapter
include Java::InputPlugin
@@ -39,56 +43,66 @@
def initialize(ruby_class)
@ruby_class = ruby_class
end
def transaction(java_config, java_control)
- config = DataSource.from_java_object(java_config)
- next_config_hash = @ruby_class.transaction(config) do |task_source_hash,columns,processor_count|
- java_task_source = DataSource.from_ruby_hash(task_source_hash).java_object
- java_schema = Schema.new(columns).java_object
+ config = DataSource.from_java(java_config)
+ config_diff_hash = @ruby_class.transaction(config) do |task_source_hash,columns,processor_count|
+ java_task_source = DataSource.from_ruby_hash(task_source_hash).to_java
+ java_schema = Schema.new(columns).to_java
java_commit_reports = java_control.run(java_task_source, java_schema, processor_count)
java_commit_reports.map {|java_commit_report|
- DataSource.from_java_object(java_commit_report)
+ DataSource.from_java(java_commit_report)
}
end
# TODO check return type of #transaction
- return DataSource.from_ruby_hash(next_config_hash).java_object
+ return DataSource.from_ruby_hash(config_diff_hash).to_java
end
def resume(java_task_source, java_schema, processor_count, java_control)
- task_source = DataSource.from_java_object(java_task_source)
- schema = Schema.from_java_object(java_schema)
- next_config_hash = @ruby_class.resume(task_source, schema, processor_count) do |task_source_hash,columns,processor_count|
- java_task_source = DataSource.from_ruby_hash(task_source_hash).java_object
- java_schema = Schema.new(columns).java_object
+ task_source = DataSource.from_java(java_task_source)
+ schema = Schema.from_java(java_schema)
+ config_diff_hash = @ruby_class.resume(task_source, schema, processor_count) do |task_source_hash,columns,processor_count|
+ java_task_source = DataSource.from_ruby_hash(task_source_hash).to_java
+ java_schema = Schema.new(columns).to_java
java_commit_reports = java_control.run(java_task_source, java_schema, processor_count)
java_commit_reports.map {|java_commit_report|
- DataSource.from_java_object(java_commit_report)
+ DataSource.from_java(java_commit_report)
}
end
# TODO check return type of #resume
- return DataSource.from_ruby_hash(next_config_hash).java_object
+ return DataSource.from_ruby_hash(config_diff_hash).to_java
end
def cleanup(java_task_source, java_schema, processor_count, java_commit_reports)
- task_source = DataSource.from_java_object(java_task_source)
- schema = Schema.from_java_object(java_schema)
- commit_reports = java_commit_reports.map {|c| DataSource.from_java_object(c) }
+ task_source = DataSource.from_java(java_task_source)
+ schema = Schema.from_java(java_schema)
+ commit_reports = java_commit_reports.map {|c| DataSource.from_java(c) }
@ruby_class.cleanup(task_source, schema, processor_count, commit_reports)
return nil
end
def run(java_task_source, java_schema, processor_index, java_output)
- task_source = DataSource.from_java_object(java_task_source)
- schema = Schema.from_java_object(java_schema)
+ task_source = DataSource.from_java(java_task_source)
+ schema = Schema.from_java(java_schema)
page_builder = PageBuilder.new(schema, java_output)
begin
commit_report_hash = @ruby_class.new(task_source, schema, processor_index, page_builder).run
- return DataSource.from_ruby_hash(commit_report_hash).java_object
+ return DataSource.from_ruby_hash(commit_report_hash).to_java
ensure
page_builder.close
end
end
+ end
+
+ def self.from_java(java_class)
+ JavaPlugin.ruby_adapter_class(java_class, InputPlugin, RubyAdapter)
+ end
+
+ module RubyAdapter
+ module ClassMethods
+ end
+ # TODO
end
end
end
end