diff --git a/scripts/hail_wdl_test/hail_submit.py b/scripts/hail_wdl_test/hail_submit.py index 5d8a6f3..da78a84 100644 --- a/scripts/hail_wdl_test/hail_submit.py +++ b/scripts/hail_wdl_test/hail_submit.py @@ -3,21 +3,28 @@ import google.auth from google.cloud import storage import uuid - -# functions -def create_cluster(dataproc, project, region, cluster_name, - master_machine_type, master_boot_disk_size, - worker_num_instances, worker_machine_type, worker_boot_disk_size, worker_num_ssd, worker_preemptible): - print "Creating cluster {} in project: {}".format(cluster_name, project) - +from pprint import pprint + +# functions +def create_cluster( + dataproc, project, region, cluster_name, + master_machine_type, master_boot_disk_size, + worker_num_instances, worker_machine_type, + worker_boot_disk_size, worker_num_ssd, + preemptible_num_instances, preemptible_boot_disk_size): + + print("Creating cluster {} in project: {}".format(cluster_name, project)) + # https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster + # https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/ClusterConfig?authuser=1#InstanceGroupConfig + cluster_data = { 'projectId': project, 'clusterName': cluster_name, 'config': { 'gceClusterConfig': { "serviceAccountScopes": [ - 'https://www.googleapis.com/auth/userinfo.profile', + 'https://www.googleapis.com/auth/userinfo.profile', 'https://www.googleapis.com/auth/userinfo.email' ] }, @@ -34,19 +41,26 @@ def create_cluster(dataproc, project, region, cluster_name, "diskConfig": { "bootDiskSizeGb": worker_boot_disk_size, "numLocalSsds": worker_num_ssd, + } + }, + 'secondaryWorkerConfig': { + "numInstances": preemptible_num_instances, + "machineTypeUri": worker_machine_type, + "diskConfig": { + "bootDiskSizeGb": preemptible_boot_disk_size, }, - "isPreemptible": "true" if worker_preemptible else "false" + "isPreemptible": "true" }, "softwareConfig": { "imageVersion": "1.1", "properties": { - "spark:spark.driver.extraJavaOptions":"-Xss4M", - "spark:spark.executor.extraJavaOptions":"-Xss4M", - "spark:spark.driver.memory":"45g", + "spark:spark.driver.extraJavaOptions": "-Xss4M", + "spark:spark.executor.extraJavaOptions": "-Xss4M", + "spark:spark.driver.memory": "45g", "spark:spark.driver.maxResultSize": "30g", - "spark:spark.task.maxFailures":"20", - "spark:spark.kryoserializer.buffer.max":"1g", - "hdfs:dfs.replication":"1" + "spark:spark.task.maxFailures": "20", + "spark:spark.kryoserializer.buffer.max": "1g", + "hdfs:dfs.replication": "1" } }, "initializationActions": [ @@ -60,7 +74,8 @@ def create_cluster(dataproc, project, region, cluster_name, region=region, body=cluster_data).execute() return result - + + def wait_for_cluster_creation(dataproc, project_id, region, cluster_name): print('Waiting for cluster creation...') @@ -78,21 +93,22 @@ def wait_for_cluster_creation(dataproc, project_id, region, cluster_name): print("Cluster created.") break + def submit_pyspark_job(dataproc, project, region, cluster_name, bucket_name, hail_script_path, script_arguments): - + storage_client = storage.Client() bucket = storage_client.get_bucket("hail-common") blob = bucket.blob("latest-hash.txt" ) # get the hash from this text file, removing any trailing newline - hail_hash = blob.download_as_string().rstrip() - + hail_hash = blob.download_as_string().rstrip().decode() + hail_jar_file="hail-hail-is-master-all-spark2.0.2-{}.jar".format(hail_hash) hail_jar_path="gs://hail-common/{}".format(hail_jar_file) - - # upload the hail script to this dataproc staging bucket + + # upload the hail script to this dataproc staging bucket upload_blob(bucket_name, hail_script_path, "script.py") - + """Submits the Pyspark job to the cluster, assuming `filename` has already been uploaded to `bucket_name`""" job_details = { @@ -106,26 +122,26 @@ def submit_pyspark_job(dataproc, project, region, "mainPythonFileUri": "gs://{}/{}".format(bucket_name, "script.py"), "args": script_arguments, "pythonFileUris": [ - "gs://hail-common/pyhail-hail-is-master-{}.zip".format(hail_hash), - "gs://{}/{}".format(bucket_name, "script.py") + "gs://hail-common/pyhail-hail-is-master-{}.zip".format(hail_hash), + "gs://{}/{}".format(bucket_name, "script.py") ], "jarFileUris": [ - hail_jar_path + hail_jar_path ], # "fileUris": [ -# string +# string # ], # "archiveUris": [ -# string +# string # ], "properties": { - "spark.driver.extraClassPath":"./{}".format(hail_jar_file), - "spark.executor.extraClassPath":"./{}".format(hail_jar_file) + "spark.driver.extraClassPath":"./{}".format(hail_jar_file), + "spark.executor.extraClassPath":"./{}".format(hail_jar_file) }, # "loggingConfig": { -# object(LoggingConfig) +# object(LoggingConfig) # }, - } + } } } result = dataproc.projects().regions().jobs().submit( @@ -136,6 +152,7 @@ def submit_pyspark_job(dataproc, project, region, print('Submitted job ID {}'.format(job_id)) return job_id + def wait_for_job(dataproc, project, region, job_id): print('Waiting for job to finish...') while True: @@ -149,7 +166,8 @@ def wait_for_job(dataproc, project, region, job_id): elif result['status']['state'] == 'DONE': print('Job finished.') return result - + + def delete_cluster(dataproc, project, region, cluster): print('Tearing down cluster') result = dataproc.projects().regions().clusters().delete( @@ -158,6 +176,7 @@ def delete_cluster(dataproc, project, region, cluster): clusterName=cluster).execute() return result + def upload_blob(bucket_name, source_file_name, destination_blob_name): """Uploads a file to the bucket.""" storage_client = storage.Client() @@ -168,82 +187,105 @@ def upload_blob(bucket_name, source_file_name, destination_blob_name): print('File {} uploaded to {}.'.format( source_file_name, - destination_blob_name)) + destination_blob_name)) + def list_clusters(dataproc, project, region): - result = dataproc.projects().regions().clusters().list( - projectId=project, - region=region).execute() - return result + result = dataproc.projects().regions().clusters().list( + projectId=project, + region=region).execute() + return result + def get_client(): - """Builds a client to the dataproc API.""" - dataproc = googleapiclient.discovery.build('dataproc', 'v1') - return dataproc + """Builds a client to the dataproc API.""" + dataproc = googleapiclient.discovery.build('dataproc', 'v1') + return dataproc + if __name__ == "__main__": - parser = argparse.ArgumentParser(description='') - # required dataproc arguments - parser.add_argument('--dataprocMasterMachType', required=True, - help='Machine type to use for the master machine, e.g. n1-standard-8.') - parser.add_argument('--dataprocMasterBootDiskSize', required=True, - help='Size of the boot disk to use for the master machine in GB, e.g. 100') - - parser.add_argument('--dataprocNumWorkers', required=True, - help='Number of worker nodes, e.g. 2') - parser.add_argument('--dataprocWorkerMachType', required=True, - help='Machine type to use for the worker nodes, e.g. n1-standard-8.') - parser.add_argument('--dataprocWorkerBootDiskSize', required=True, - help='Size of the boot disk to use for the worker nodes in GB, e.g. 100') - parser.add_argument('--dataprocWorkerNumSSD', required=True, - help='Number of SSD disks for use in the worker nodes, e.g. 2') - parser.add_argument('--dataprocWorkerPreemptible', required=False, action='store_true', default=False, - help='Number of preemptible instances to use for worker nodes, e.g. 2') + parser = argparse.ArgumentParser( + description='Run a hail script on dataproc') # the python hail script to execute on the dataproc cluster parser.add_argument('hailScript', nargs=1) - # optional dataproc arguments - parser.add_argument('--dataprocRegion', dest='dataprocRegion', required=False, default="us-central1", - help='Optional region for use in choosing a region to create the cluster. Defaults to us-central1.') - parser.add_argument('--project', required=False, - help='Project to create the Dataproc cluster within. Defaults to the current project in the gcloud config.') + # optional dataproc arguments + parser.add_argument( + '--dataprocMasterMachType', default='n1-standard-8', + help='Machine type to use for the master machine, e.g. n1-standard-8.') + parser.add_argument( + '--dataprocMasterBootDiskSize', default=100, type=int, + help='Size of the boot disk to use for the master machine in GB, e.g. 100') + parser.add_argument( + '--dataprocNumWorkers', default=2, type=int, + help='Number of worker nodes, e.g. 2') + parser.add_argument( + '--dataprocWorkerMachType', default='n1-standard-8', + help='Machine type to use for the worker nodes, e.g. n1-standard-8.') + parser.add_argument( + '--dataprocWorkerBootDiskSize', default=100, type=int, + help='Size of the boot disk to use for the worker nodes in GB, e.g. 100') + parser.add_argument( + '--dataprocWorkerNumSSD', default=0, type=int, + help='Number of SSD disks for use in the worker nodes, e.g. 2') + parser.add_argument( + '--dataprocNumWorkerPreemptible', default=0, type=int, + help='Number of preemptible instances to use for worker nodes, e.g. 2') + parser.add_argument( + '--dataprocPreemptibleBootDiskSize', default=40, type=int, + help='Disk size of preemptible machines, in GB (default: %(default)s.)' + ) + parser.add_argument( + '--dataprocRegion', dest='dataprocRegion', required=False, + default="us-central1", + help='Optional region for use in choosing a region to create the cluster. Defaults to us-central1.') + parser.add_argument( + '--project', required=False, + help='Project to create the Dataproc cluster within. Defaults to the current project in the gcloud config.') + + # parse the args above as well user defined arguments (unknown below) that + # will get passed to the hail script. the user defined arguments are + # whatever arguments that are needed by the python hail script. - # parse the args above as well user defined arguments (unknown below) that will get passed to the hail script. - # the user defined arguments are whatever arguments that are needed by the python hail script. args, script_args = parser.parse_known_args() dataproc = get_client() try: # get the current project from gcloud config - project = args.project if args.project else google.auth.default()[1] + project = args.project if args.project else google.auth.default()[1] cluster_name = "firecloud-hail-{}".format(uuid.uuid4()) - - print "Creating cluster {} in project: {}".format(cluster_name, project) - - cluster_info = create_cluster(dataproc, project, args.dataprocRegion, cluster_name, - args.dataprocMasterMachType, args.dataprocMasterBootDiskSize, args.dataprocNumWorkers, - args.dataprocWorkerMachType, args.dataprocWorkerBootDiskSize, args.dataprocWorkerNumSSD, - args.dataprocWorkerPreemptible) + + cluster_info = create_cluster( + dataproc, project, args.dataprocRegion, cluster_name, + args.dataprocMasterMachType, args.dataprocMasterBootDiskSize, + args.dataprocNumWorkers, args.dataprocWorkerMachType, + args.dataprocWorkerBootDiskSize, args.dataprocWorkerNumSSD, + args.dataprocNumWorkerPreemptible, + args.dataprocPreemptibleBootDiskSize) cluster_uuid = cluster_info["metadata"]["clusterUuid"] - active_clusters = wait_for_cluster_creation(dataproc, project, args.dataprocRegion, cluster_name) + active_clusters = wait_for_cluster_creation( + dataproc, project, args.dataprocRegion, cluster_name) clusters = list_clusters(dataproc, project, args.dataprocRegion) for cluster in clusters["clusters"]: if cluster["clusterUuid"] == cluster_uuid: cluster_staging_bucket = cluster["config"]["configBucket"] - - job_id = submit_pyspark_job(dataproc, project, args.dataprocRegion, - cluster_name, cluster_staging_bucket, args.hailScript[0], script_args) - - job_result = wait_for_job(dataproc, project, args.dataprocRegion, job_id) - + + job_id = submit_pyspark_job( + dataproc, project, args.dataprocRegion, + cluster_name, cluster_staging_bucket, args.hailScript[0], + script_args) + + job_result = wait_for_job( + dataproc, project, args.dataprocRegion, job_id) + # TODO: what do we need to do to handle successful jobs? - print job_result + print(job_result) break except Exception as e: - print e + print(e) raise finally: - delete_cluster(dataproc, project, args.dataprocRegion, cluster_name) \ No newline at end of file + delete_cluster(dataproc, project, args.dataprocRegion, cluster_name)