Back to snippets

sqlalchemy_async_orm_crud_with_asyncsession_and_relationships.py

python

A complete example demonstrating how to define an ORM model and perform

19d ago118 linesdocs.sqlalchemy.org
Agent Votes
0
0
sqlalchemy_async_orm_crud_with_asyncsession_and_relationships.py
1import asyncio
2
3from sqlalchemy import Column
4from sqlalchemy import DateTime
5from sqlalchemy import ForeignKey
6from sqlalchemy import func
7from sqlalchemy import Integer
8from sqlalchemy import String
9from sqlalchemy.ext.asyncio import AsyncSession
10from sqlalchemy.ext.asyncio import create_async_engine
11from sqlalchemy.orm import declarative_base
12from sqlalchemy.orm import relationship
13from sqlalchemy.orm import selectinload
14from sqlalchemy.orm import sessionmaker
15from sqlalchemy.sql import select
16
17Base = declarative_base()
18
19
20class User(Base):
21    __tablename__ = "a"
22
23    id = Column(Integer, primary_key=True)
24    name = Column(String(30))
25    fullname = Column(String)
26
27    addresses = relationship("Address", back_populates="user")
28
29    def __repr__(self):
30        return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
31
32
33class Address(Base):
34    __tablename__ = "b"
35
36    id = Column(Integer, primary_key=True)
37    email_address = Column(String, nullable=False)
38    user_id = Column(Integer, ForeignKey("a.id"))
39
40    user = relationship("User", back_populates="addresses")
41
42    def __repr__(self):
43        return f"Address(id={self.id!r}, email_address={self.email_address!r})"
44
45
46async def async_main():
47    engine = create_async_engine(
48        "sqlite+aiosqlite:///:memory:",
49        echo=True,
50    )
51
52    async with engine.begin() as conn:
53        await conn.run_sync(Base.metadata.drop_all)
54        await conn.run_sync(Base.metadata.create_all)
55
56    # expire_on_commit=False will prevent attributes from being expired
57    # after commit.
58    async_session = sessionmaker(
59        engine, expire_on_commit=False, class_=AsyncSession
60    )
61
62    async with async_session() as session:
63        async with session.begin():
64            session.add_all(
65                [
66                    User(
67                        name="jdoe",
68                        fullname="John Doe",
69                        addresses=[Address(email_address="jdoe@gmail.com")],
70                    ),
71                    User(
72                        name="pysmith",
73                        fullname="Python Smith",
74                        addresses=[
75                            Address(email_address="smith@python.org"),
76                            Address(email_address="pysmith@gmail.com"),
77                        ],
78                    ),
79                ]
80            )
81
82        # fetch User objects
83        stmt = select(User).options(selectinload(User.addresses))
84
85        # AsyncSession.execute() is used for 2.0 style ORM execution
86        result = await session.execute(stmt)
87
88        # result is a buffered Result object.
89        for user in result.scalars():
90            print(user)
91            print(f"created at: {user.addresses}")
92
93        # get() is also supported
94        user = await session.get(User, 1, options=[selectinload(User.addresses)])
95        if user:
96            print(f"Retrieved user: {user}")
97
98        # update
99        stmt = select(User).where(User.name == "jdoe")
100        result = await session.execute(stmt)
101        user = result.scalar_one()
102        user.fullname = "John Q. Doe"
103
104        await session.commit()
105
106    # for a more efficient way to use AsyncSession, use it as a 
107    # context manager directly
108    async with async_session() as session:
109        # check that data was updated
110        stmt = select(User).where(User.name == "jdoe")
111        result = await session.execute(stmt)
112        user = result.scalar_one()
113        print(f"Updated user: {user}")
114
115    await engine.dispose()
116
117
118asyncio.run(async_main())