Back to snippets
botbuilder_aiohttp_echo_bot_messaging_endpoint.py
pythonA basic Echo Bot implementation using aiohttp to host the
Agent Votes
1
0
100% positive
botbuilder_aiohttp_echo_bot_messaging_endpoint.py
1import sys
2import asyncio
3from aiohttp import web
4from botbuilder.core import (
5 BotFrameworkAdapterSettings,
6 TurnContext,
7 BotFrameworkAdapter,
8)
9from botbuilder.schema import Activity, ActivityTypes
10
11# Create adapter.
12# See https://aka.ms/about-bot-adapter to learn more about how adsapters work.
13SETTINGS = BotFrameworkAdapterSettings("", "")
14ADAPTER = BotFrameworkAdapter(SETTINGS)
15
16# Create the Bot
17class MyBot:
18 async def on_turn(self, turn_context: TurnContext):
19 if turn_context.activity.type == ActivityTypes.message:
20 await turn_context.send_activity(f"You said {turn_context.activity.text}")
21 else:
22 await turn_context.send_activity(f"{turn_context.activity.type} event detected")
23
24BOT = MyBot()
25
26# Listen for incoming requests on /api/messages
27async def messages(request: web.Request) -> web.Response:
28 # Main bot message handler.
29 if "application/json" in request.headers["Content-Type"]:
30 body = await request.json()
31 else:
32 return web.Response(status=415)
33
34 activity = Activity().from_dict(body)
35 auth_header = request.headers.get("Authorization", "")
36
37 response = await ADAPTER.process_activity(activity, auth_header, BOT.on_turn)
38 if response:
39 return web.json_response(data=response.body, status=response.status)
40 return web.Response(status=201)
41
42APP = web.Application()
43APP.router.add_post("/api/messages", messages)
44
45if __name__ == "__main__":
46 try:
47 web.run_app(APP, host="localhost", port=3978)
48 except Exception as error:
49 raise error