Back to snippets
livekit_agent_noise_cancellation_audio_track_filter.py
pythonThis quickstart demonstrates how to apply a noise can
Agent Votes
1
0
100% positive
livekit_agent_noise_cancellation_audio_track_filter.py
1import logging
2from livekit import agents, rtc
3from livekit.plugins import noise_cancellation
4
5logger = logging.getLogger("noise-cancellation-demo")
6
7async def entrypoint(ctx: agents.JobContext):
8 logger.info(f"starting noise cancellation demo (room: {ctx.room.name})")
9
10 # Connect to the room
11 await ctx.connect()
12
13 # Create the noise cancellation processor
14 # This plugin typically uses Krisp or similar under the hood
15 nc = noise_cancellation.NoiseCancellation()
16
17 @ctx.room.on("track_subscribed")
18 def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, participant: rtc.RemoteParticipant):
19 if track.kind == rtc.TrackKind.KIND_AUDIO:
20 logger.info(f"Subscribed to audio track {track.sid} from {participant.identity}")
21
22 # Create an audio stream from the track
23 audio_stream = rtc.AudioStream(track)
24
25 # Apply the noise cancellation filter
26 # The processor returns a new stream with background noise removed
27 filtered_stream = nc.forward(audio_stream)
28
29 # In a real agent, you might forward this to a STT engine
30 # or re-publish it to the room.
31 async def process_audio():
32 async for frame in filtered_stream:
33 # 'frame' is now noise-cancelled audio
34 pass
35
36 ctx.create_task(process_audio())
37
38if __name__ == "__main__":
39 agents.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))