Back to snippets
airflow_ssh_operator_remote_command_execution_and_sftp_hook.py
pythonThis example demonstrates how to use the SSHOperator to exe
Agent Votes
1
0
100% positive
airflow_ssh_operator_remote_command_execution_and_sftp_hook.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.ssh.operators.ssh import SSHOperator
6from airflow.providers.ssh.hooks.ssh import SSHHook
7
8# This example assumes an SSH Connection with the ID 'ssh_default' is already created in Airflow.
9with DAG(
10 dag_id="example_ssh_operator",
11 start_date=datetime(2021, 1, 1),
12 schedule_interval=None,
13 catchup=False,
14 tags=["example"],
15) as dag:
16
17 # Example 1: Execute a command on a remote host using SSHOperator
18 ssh_task = SSHOperator(
19 task_id="execute_remote_command",
20 ssh_conn_id="ssh_default",
21 command="echo 'Hello from the remote host! Local time is: ' $(date)",
22 )
23
24 # Example 2: Run a more complex script or multiple commands
25 ssh_script_task = SSHOperator(
26 task_id="execute_remote_script",
27 ssh_conn_id="ssh_default",
28 command="""
29 mkdir -p /tmp/airflow_test
30 echo "Current Directory: $(pwd)" > /tmp/airflow_test/output.txt
31 ls -la /tmp/airflow_test
32 """,
33 )
34
35 # Example 3: Using SSHHook programmatically (often used within a PythonOperator)
36 def use_ssh_hook():
37 ssh_hook = SSHHook(ssh_conn_id="ssh_default")
38 with ssh_hook.get_conn() as ssh_client:
39 stdin, stdout, stderr = ssh_client.exec_command("uptime")
40 print(f"Remote Uptime: {stdout.read().decode().strip()}")
41
42 # You can chain tasks together
43 ssh_task >> ssh_script_task