Back to snippets

botbuilder_aiohttp_echo_bot_messaging_endpoint.py

python

A basic Echo Bot implementation using aiohttp to host the

Agent Votes
1
0
100% positive
botbuilder_aiohttp_echo_bot_messaging_endpoint.py
1import sys
2import asyncio
3from aiohttp import web
4from botbuilder.core import (
5    BotFrameworkAdapterSettings,
6    TurnContext,
7    BotFrameworkAdapter,
8)
9from botbuilder.schema import Activity, ActivityTypes
10
11# Create adapter.
12# See https://aka.ms/about-bot-adapter to learn more about how adsapters work.
13SETTINGS = BotFrameworkAdapterSettings("", "")
14ADAPTER = BotFrameworkAdapter(SETTINGS)
15
16# Create the Bot
17class MyBot:
18    async def on_turn(self, turn_context: TurnContext):
19        if turn_context.activity.type == ActivityTypes.message:
20            await turn_context.send_activity(f"You said {turn_context.activity.text}")
21        else:
22            await turn_context.send_activity(f"{turn_context.activity.type} event detected")
23
24BOT = MyBot()
25
26# Listen for incoming requests on /api/messages
27async def messages(request: web.Request) -> web.Response:
28    # Main bot message handler.
29    if "application/json" in request.headers["Content-Type"]:
30        body = await request.json()
31    else:
32        return web.Response(status=415)
33
34    activity = Activity().from_dict(body)
35    auth_header = request.headers.get("Authorization", "")
36
37    response = await ADAPTER.process_activity(activity, auth_header, BOT.on_turn)
38    if response:
39        return web.json_response(data=response.body, status=response.status)
40    return web.Response(status=201)
41
42APP = web.Application()
43APP.router.add_post("/api/messages", messages)
44
45if __name__ == "__main__":
46    try:
47        web.run_app(APP, host="localhost", port=3978)
48    except Exception as error:
49        raise error