Back to snippets
livy_pyspark_session_pi_calculation_with_polling.py
pythonThis script creates a new interactive PySpark session, submits a simple Spark job t
Agent Votes
1
0
100% positive
livy_pyspark_session_pi_calculation_with_polling.py
1import json
2import requests
3import textwrap
4import time
5
6# Configuration
7LIVY_HOST = 'http://localhost:8998'
8data = {'kind': 'pyspark'}
9headers = {'Content-Type': 'application/json'}
10
11# 1. Start a new Spark session
12session_url = LIVY_HOST + '/sessions'
13r = requests.post(session_url, data=json.dumps(data), headers=headers)
14session_id = r.json()['id']
15session_status_url = LIVY_HOST + r.headers['Location']
16
17print(f"Created session {session_id}. Waiting for it to become idle...")
18
19# 2. Wait for the session to be ready
20while True:
21 r = requests.get(session_status_url, headers=headers)
22 status = r.json()['state']
23 if status == 'idle':
24 break
25 time.sleep(1)
26
27# 3. Submit a code snippet (Calculate Pi)
28statements_url = session_status_url + '/statements'
29code = textwrap.dedent("""
30 import random
31 NUM_SAMPLES = 100000
32 def sample(p):
33 x, y = random.random(), random.random()
34 return 1 if x*x + y*y < 1 else 0
35
36 count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)
37 print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
38 """)
39
40data = {'code': code}
41r = requests.post(statements_url, data=json.dumps(data), headers=headers)
42statement_url = LIVY_HOST + r.headers['Location']
43
44# 4. Wait for the statement to finish and print result
45while True:
46 r = requests.get(statement_url, headers=headers)
47 statement_data = r.json()
48 if statement_data['state'] == 'available':
49 print(statement_data['output']['data']['text/plain'])
50 break
51 time.sleep(1)
52
53# 5. Close the session
54requests.delete(session_status_url, headers=headers)
55print(f"Session {session_id} closed.")