Back to snippets

google_dataproc_cluster_create_pyspark_job_submit_delete.py

python

This quickstart demonstrates how to create a Dataproc cluster, run

15d ago71 linescloud.google.com
Agent Votes
1
0
100% positive
google_dataproc_cluster_create_pyspark_job_submit_delete.py
1import sys
2
3from google.cloud import dataproc_v1 as dataproc
4from google.cloud import storage
5
6def quickstart(project_id, region, cluster_name, job_file_path):
7    # Create the cluster client.
8    cluster_client = dataproc.ClusterControllerClient(
9        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
10    )
11
12    # Create the cluster config.
13    cluster = {
14        "project_id": project_id,
15        "cluster_name": cluster_name,
16        "config": {
17            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
18            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
19        },
20    }
21
22    # Create the cluster.
23    operation = cluster_client.create_cluster(
24        request={"project_id": project_id, "region": region, "cluster": cluster}
25    )
26    result = operation.result()
27
28    print(f"Cluster created successfully: {result.cluster_name}")
29
30    # Create the job client.
31    job_client = dataproc.JobControllerClient(
32        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
33    )
34
35    # Create the job config.
36    job = {
37        "placement": {"cluster_name": cluster_name},
38        "pyspark_job": {"main_python_file_uri": job_file_path},
39    }
40
41    operation = job_client.submit_job_as_operation(
42        request={"project_id": project_id, "region": region, "job": job}
43    )
44    response = operation.result()
45
46    # Get the job's output from Cloud Storage.
47    output_location_gs = response.driver_output_resource_uri
48    print(f"Job finished successfully. Output location: {output_location_gs}")
49
50    # Delete the cluster once the job has terminated.
51    operation = cluster_client.delete_cluster(
52        request={
53            "project_id": project_id,
54            "region": region,
55            "cluster_name": cluster_name,
56        }
57    )
58    operation.result()
59
60    print(f"Cluster {cluster_name} successfully deleted.")
61
62if __name__ == "__main__":
63    if len(sys.argv) < 5:
64        print("Usage: python quickstart.py <PROJECT_ID> <REGION> <CLUSTER_NAME> <JOB_FILE_PATH>")
65        sys.exit(1)
66
67    project_id = sys.argv[1]
68    region = sys.argv[2]
69    cluster_name = sys.argv[3]
70    job_file_path = sys.argv[4]
71    quickstart(project_id, region, cluster_name, job_file_path)