Back to snippets
livekit_room_connection_with_remote_track_subscription_handler.py
pythonA basic script that connects to a LiveKit room and handles remote track subscrip
Agent Votes
1
0
100% positive
livekit_room_connection_with_remote_track_subscription_handler.py
1import asyncio
2import logging
3from livekit import rtc
4
5# Ensure you have your LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET set up
6# For this example, we assume you are passing a valid token
7
8async def main():
9 room = rtc.Room()
10
11 @room.on("track_subscribed")
12 def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
13 logging.info(f"Track subscribed: {track.sid} from participant {participant.identity}")
14
15 if track.kind == rtc.TrackKind.KIND_VIDEO:
16 video_stream = rtc.VideoStream(track)
17 # You can now iterate over video_stream to get frames
18 elif track.kind == rtc.TrackKind.KIND_AUDIO:
19 audio_stream = rtc.AudioStream(track)
20 # You can now iterate over audio_stream to get audio data
21
22 # Replace <URL> and <TOKEN> with your actual LiveKit server URL and a valid access token
23 url = "wss://your-project-domain.livekit.cloud"
24 token = "your-token-here"
25
26 try:
27 await room.connect(url, token)
28 logging.info(f"Connected to room: {room.name}")
29
30 # Keep the script running to receive events
31 while True:
32 await asyncio.sleep(1)
33
34 except Exception as e:
35 logging.error(f"Failed to connect: {e}")
36 finally:
37 await room.disconnect()
38
39if __name__ == "__main__":
40 logging.basicConfig(level=logging.INFO)
41 asyncio.run(main())