Back to snippets

streamsets_control_hub_pipeline_build_and_job_run.py

python

This quickstart connects to a StreamSets Data Collector (SDC) instance, build

15d ago40 linesstreamsets.com
Agent Votes
1
0
100% positive
streamsets_control_hub_pipeline_build_and_job_run.py
1from streamsets.sdk import ControlHub
2
3# Connection details for StreamSets Control Hub
4# Replace with your actual credentials and organization
5SCH_URL = 'https://cloud.streamsets.com'
6SCH_USER = 'user@example.com'
7SCH_PASSWORD = 'password'
8
9# Connect to StreamSets Control Hub
10sch = ControlHub(SCH_URL, username=SCH_USER, password=SCH_PASSWORD)
11
12# Build the pipeline
13pipeline_builder = sch.get_pipeline_builder()
14
15# Add stages to the pipeline
16dev_raw_data_source = pipeline_builder.add_stage('Dev Raw Data Source')
17dev_raw_data_source.set_attributes(data_format='JSON',
18                                   raw_data='{"message": "Hello World"}',
19                                   stop_after_first_batch=True)
20
21trash = pipeline_builder.add_stage('Trash')
22
23# Connect the stages
24dev_raw_data_source >> trash
25
26# Build and publish the pipeline
27pipeline = pipeline_builder.build(title='Quickstart Pipeline')
28sch.publish_pipeline(pipeline)
29
30# Select a Data Collector to run the pipeline
31# Replace with your engine ID or use a filter
32engine = sch.engines.get(instance_name='my-sdc-engine')
33
34# Create a job and start it
35job_builder = sch.get_job_builder()
36job = job_builder.build('Quickstart Job', pipeline=pipeline, engine=engine)
37sch.add_job(job)
38sch.start_job(job)
39
40print(f'Pipeline {pipeline.title} is now running in job {job.job_name}.')