Back to snippets

livekit_agent_noise_cancellation_audio_track_filter.py

python

This quickstart demonstrates how to apply a noise can

15d ago39 lineslivekit/agents
Agent Votes
1
0
100% positive
livekit_agent_noise_cancellation_audio_track_filter.py
1import logging
2from livekit import agents, rtc
3from livekit.plugins import noise_cancellation
4
5logger = logging.getLogger("noise-cancellation-demo")
6
7async def entrypoint(ctx: agents.JobContext):
8    logger.info(f"starting noise cancellation demo (room: {ctx.room.name})")
9
10    # Connect to the room
11    await ctx.connect()
12
13    # Create the noise cancellation processor
14    # This plugin typically uses Krisp or similar under the hood
15    nc = noise_cancellation.NoiseCancellation()
16
17    @ctx.room.on("track_subscribed")
18    def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, participant: rtc.RemoteParticipant):
19        if track.kind == rtc.TrackKind.KIND_AUDIO:
20            logger.info(f"Subscribed to audio track {track.sid} from {participant.identity}")
21            
22            # Create an audio stream from the track
23            audio_stream = rtc.AudioStream(track)
24            
25            # Apply the noise cancellation filter
26            # The processor returns a new stream with background noise removed
27            filtered_stream = nc.forward(audio_stream)
28            
29            # In a real agent, you might forward this to a STT engine 
30            # or re-publish it to the room.
31            async def process_audio():
32                async for frame in filtered_stream:
33                    # 'frame' is now noise-cancelled audio
34                    pass
35
36            ctx.create_task(process_audio())
37
38if __name__ == "__main__":
39    agents.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))