lib/mortar/templates/project/luigiscripts/luigiscript.py in mortar-0.15.29 vs lib/mortar/templates/project/luigiscripts/luigiscript.py in mortar-0.15.30

- old
+ new

@@ -2,133 +2,146 @@ from luigi import configuration from luigi.s3 import S3Target, S3PathTask from mortar.luigi import mortartask +import logging + """ - Luigi is a powerful, easy-to-use framework for building data pipelines. +Luigi is a powerful, easy-to-use framework for building data pipelines. - This is an example Luigi script to get you started. This script has a - 'fill in the blank' interaction. Feel free to alter it to build your pipeline. +This is an example Luigi script to get you started. This script has a +'fill in the blank' interaction. Feel free to alter it to build your pipeline. - In this example we will run a Pig script, and then shutdown any clusters associated - with that script. We will do that by running the ShutdownClusters Task, - which is dependent on RunMyExamplePigScript Task. This means the cluster will only - shutdown after RunMyExamplePigScript (where the data transformation happens) - Task is completed. +In this example we will run a Pig script, and then shutdown any clusters associated +with that script. We will do that by running the ShutdownClusters Task, +which is dependent on RunMyExamplePigScript Task. This means the cluster will only +shutdown after RunMyExamplePigScript (where the data transformation happens) +Task is completed. - For full tutorials and in-depth Luigi documentation, visit: - https://help.mortardata.com/technologies/luigi +For full tutorials and in-depth Luigi documentation, visit: +https://help.mortardata.com/technologies/luigi - To Run: - mortar luigi luigiscripts/<%= project_name %>_luigi.py \ +To Run: +mortar luigi luigiscripts/<%= project_name %>_luigi.py \ --output-base-path "s3://mortar-example-output-data/<your_username_here>/<%= project_name %>" """ MORTAR_PROJECT = '<%= project_name %>' +""" +This logger outputs logs to Mortar Logs. An example of it's usage can be seen +in the ShutdownClusters function. +""" +logger = logging.getLogger('luigi-interface') + # helper function def create_full_path(base_path, sub_path): - return '%s/%s' % (base_path, sub_path) + return '%s/%s' % (base_path, sub_path) class RunMyExamplePigScript(mortartask.MortarProjectPigscriptTask): - """ - This is a Luigi Task that extends MortarProjectPigscriptTask to run a Pig - script on Mortar. - """ - - """ - The location in S3 where the output of the Mortar job will be written. - """ - output_base_path = luigi.Parameter() - - """ - Default cluster size to use for running Mortar jobs. A cluster size of 0 - will run in Mortar's local mode. This is a fast (and free!) way to run jobs - on small data samples. Cluster sizes >= 2 will run on a Hadoop cluster. - """ - cluster_size = luigi.IntParameter(default=0) - - """ - Path to input data being analyzed using the Pig script. - """ - input_path = luigi.Parameter(default ='s3://mortar-example-data/tutorial/excite.log.bz2') - - - def requires(self): """ - The requires method specifies a list of other Tasks that must be complete - for this Task to run. In this case, we want to require that our input - exists on S3 before we run the script. S3PathTask validates that the - specified file or directory exists on S3. + This is a Luigi Task that extends MortarProjectPigscriptTask to run a Pig + script on Mortar. """ - return [S3PathTask(self.input_path)] - def project(self): """ - Name of Mortar Project to run. + The location in S3 where the output of the Mortar job will be written. """ - return MORTAR_PROJECT + output_base_path = luigi.Parameter() - def script_output(self): """ - The script_output method is how you define where the output from this task - will be stored. Luigi will check this output location before starting any - tasks that depend on this task. + Default cluster size to use for running Mortar jobs. A cluster size of 0 + will run in Mortar's local mode. This is a fast (and free!) way to run jobs + on small data samples. Cluster sizes >= 2 will run on a Hadoop cluster. """ - return[S3Target(self.output_base_path + '/pigoutput')] - - def token_path(self): + cluster_size = luigi.IntParameter(default=0) + """ - Luigi manages dependencies between tasks by checking for the existence of - files. When one task finishes it writes out a 'token' file that will - trigger the next task in the dependency graph. This is the base path for - where those tokens will be written. + Path to input data being analyzed using the Pig script. """ - return self.output_base_path - - def parameters(self): - """ - Parameters for this pig job. - """ - return {'INPUT_PATH': self.input_path, + input_path = luigi.Parameter(default ='s3://mortar-example-data/tutorial/excite.log.bz2') + + + def requires(self): + """ + The requires method specifies a list of other Tasks that must be complete + for this Task to run. In this case, we want to require that our input + exists on S3 before we run the script. S3PathTask validates that the + specified file or directory exists on S3. + """ + return [S3PathTask(self.input_path)] + + def project(self): + """ + Name of Mortar Project to run. + """ + return MORTAR_PROJECT + + def script_output(self): + """ + The script_output method is how you define where the output from this task + will be stored. Luigi will check this output location before starting any + tasks that depend on this task. + """ + return[S3Target(self.output_base_path + '/pigoutput')] + + def token_path(self): + """ + Luigi manages dependencies between tasks by checking for the existence of + files. When one task finishes it writes out a 'token' file that will + trigger the next task in the dependency graph. This is the base path for + where those tokens will be written. + """ + return self.output_base_path + + def parameters(self): + """ + Parameters for this pig job. + """ + return {'INPUT_PATH': self.input_path, 'OUTPUT_PATH': self.output_base_path + '/pigoutput'} - - def script(self): - """ - Name of Pig script to run. - """ - return '<%= project_name %>' + def script(self): + """ + Name of Pig script to run. + """ + return '<%= project_name %>' -class ShutdownClusters(mortartask.MortarClusterShutdownTask): - """ - When the pipeline is completed, this task shuts down all active clusters not - currently running jobs. As this task is only shutting down clusters and not - generating any output data, this S3 location is used to store a 'token' file - indicating when the task has been completed. - """ - output_base_path = luigi.Parameter() - def requires(self): +class ShutdownClusters(mortartask.MortarClusterShutdownTask): """ - The ShutdownClusters task is dependent on RunMyExamplePigScript because a - cluster should not shut down until all the tasks are completed. You can - think of this as saying 'shut down my cluster after running my task'. + When the pipeline is completed, this task shuts down all active clusters not + currently running jobs. As this task is only shutting down clusters and not + generating any output data, this S3 location is used to store a 'token' file + indicating when the task has been completed. """ - return RunMyExamplePigScript(output_base_path = self.output_base_path) + output_base_path = luigi.Parameter() - def output(self): - """ - This output statement is needed because ShutdownClusters has no actual - output. We write a token with the class name to S3 to know that this task - has completed and it does not need to be run again. - """ - return [S3Target((create_full_path(self.output_base_path, 'ShutdownClusters')))] + def requires(self): + """ + The ShutdownClusters task is dependent on RunMyExamplePigScript because a + cluster should not shut down until all the tasks are completed. You can + think of this as saying 'shut down my cluster after running my task'. + """ + + # This is an example of emitting log messages. + logger.info('My Log Message!') + + return RunMyExamplePigScript(output_base_path = self.output_base_path) + + def output(self): + """ + This output statement is needed because ShutdownClusters has no actual + output. We write a token with the class name to S3 to know that this task + has completed and it does not need to be run again. + """ + return [S3Target((create_full_path(self.output_base_path, 'ShutdownClusters')))] + if __name__ == "__main__": - """ - The final task in your pipeline, which will in turn pull in any dependencies - need to be run should be called in the main method. In this case ShutdownClusters - is being called. - """ - luigi.run(main_task_cls= ShutdownClusters) + """ + The final task in your pipeline, which will in turn pull in any dependencies + need to be run should be called in the main method. In this case ShutdownClusters + is being called. + """ + luigi.run(main_task_cls= ShutdownClusters)