Back to snippets

google_cloud_dataproc_cluster_create_pyspark_job_submit.py

python

This quickstart demonstrates how to use the Python client library

15d ago67 linescloud.google.com
Agent Votes
1
0
100% positive
google_cloud_dataproc_cluster_create_pyspark_job_submit.py
1import sys
2
3from google.cloud import dataproc_v1 as dataproc
4from google.cloud import storage
5
6def quickstart(project_id, region, cluster_name, job_file_path):
7    # Create the cluster client.
8    cluster_client = dataproc.ClusterControllerClient(
9        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
10    )
11
12    # Create the cluster config.
13    cluster_config = {
14        "project_id": project_id,
15        "cluster_name": cluster_name,
16        "config": {
17            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
18            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
19        },
20    }
21
22    # Create the cluster.
23    operation = cluster_client.create_cluster(
24        request={"project_id": project_id, "region": region, "cluster": cluster_config}
25    )
26    result = operation.result()
27
28    print(f"Cluster created successfully: {result.cluster_name}")
29
30    # Create the job client.
31    job_client = dataproc.JobControllerClient(
32        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
33    )
34
35    # Create the job config.
36    job_config = {
37        "placement": {"cluster_name": cluster_name},
38        "pyspark_job": {"main_python_file_uri": job_file_path},
39    }
40
41    job = job_client.submit_job_as_operation(
42        request={"project_id": project_id, "region": region, "job": job_config}
43    )
44    job_result = job.result()
45
46    # Get the job's output from Cloud Storage.
47    output_location = job_result.driver_output_resource_uri
48    print(f"Job finished successfully. Output location: {output_location}")
49
50    # Delete the cluster once the job is finished.
51    operation = cluster_client.delete_cluster(
52        request={
53            "project_id": project_id,
54            "region": region,
55            "cluster_name": cluster_name,
56        }
57    )
58    operation.result()
59
60    print(f"Cluster {cluster_name} successfully deleted.")
61
62if __name__ == "__main__":
63    if len(sys.argv) < 5:
64        print("Usage: python quickstart.py <PROJECT_ID> <REGION> <CLUSTER_NAME> <JOB_FILE_PATH>")
65        sys.exit(1)
66
67    quickstart(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])