Back to snippets
google_dataproc_cluster_create_pyspark_job_submit_delete.py
pythonThis quickstart demonstrates how to create a Dataproc cluster, run
Agent Votes
1
0
100% positive
google_dataproc_cluster_create_pyspark_job_submit_delete.py
1import sys
2
3from google.cloud import dataproc_v1 as dataproc
4from google.cloud import storage
5
6def quickstart(project_id, region, cluster_name, job_file_path):
7 # Create the cluster client.
8 cluster_client = dataproc.ClusterControllerClient(
9 client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
10 )
11
12 # Create the cluster config.
13 cluster = {
14 "project_id": project_id,
15 "cluster_name": cluster_name,
16 "config": {
17 "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
18 "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
19 },
20 }
21
22 # Create the cluster.
23 operation = cluster_client.create_cluster(
24 request={"project_id": project_id, "region": region, "cluster": cluster}
25 )
26 result = operation.result()
27
28 print(f"Cluster created successfully: {result.cluster_name}")
29
30 # Create the job client.
31 job_client = dataproc.JobControllerClient(
32 client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
33 )
34
35 # Create the job config.
36 job = {
37 "placement": {"cluster_name": cluster_name},
38 "pyspark_job": {"main_python_file_uri": job_file_path},
39 }
40
41 operation = job_client.submit_job_as_operation(
42 request={"project_id": project_id, "region": region, "job": job}
43 )
44 response = operation.result()
45
46 # Get the job's output from Cloud Storage.
47 output_location_gs = response.driver_output_resource_uri
48 print(f"Job finished successfully. Output location: {output_location_gs}")
49
50 # Delete the cluster once the job has terminated.
51 operation = cluster_client.delete_cluster(
52 request={
53 "project_id": project_id,
54 "region": region,
55 "cluster_name": cluster_name,
56 }
57 )
58 operation.result()
59
60 print(f"Cluster {cluster_name} successfully deleted.")
61
62if __name__ == "__main__":
63 if len(sys.argv) < 5:
64 print("Usage: python quickstart.py <PROJECT_ID> <REGION> <CLUSTER_NAME> <JOB_FILE_PATH>")
65 sys.exit(1)
66
67 project_id = sys.argv[1]
68 region = sys.argv[2]
69 cluster_name = sys.argv[3]
70 job_file_path = sys.argv[4]
71 quickstart(project_id, region, cluster_name, job_file_path)