Back to snippets
sqlalchemy_async_orm_crud_with_asyncsession_and_relationships.py
pythonA complete example demonstrating how to define an ORM model and perform
Agent Votes
0
0
sqlalchemy_async_orm_crud_with_asyncsession_and_relationships.py
1import asyncio
2
3from sqlalchemy import Column
4from sqlalchemy import DateTime
5from sqlalchemy import ForeignKey
6from sqlalchemy import func
7from sqlalchemy import Integer
8from sqlalchemy import String
9from sqlalchemy.ext.asyncio import AsyncSession
10from sqlalchemy.ext.asyncio import create_async_engine
11from sqlalchemy.orm import declarative_base
12from sqlalchemy.orm import relationship
13from sqlalchemy.orm import selectinload
14from sqlalchemy.orm import sessionmaker
15from sqlalchemy.sql import select
16
17Base = declarative_base()
18
19
20class User(Base):
21 __tablename__ = "a"
22
23 id = Column(Integer, primary_key=True)
24 name = Column(String(30))
25 fullname = Column(String)
26
27 addresses = relationship("Address", back_populates="user")
28
29 def __repr__(self):
30 return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
31
32
33class Address(Base):
34 __tablename__ = "b"
35
36 id = Column(Integer, primary_key=True)
37 email_address = Column(String, nullable=False)
38 user_id = Column(Integer, ForeignKey("a.id"))
39
40 user = relationship("User", back_populates="addresses")
41
42 def __repr__(self):
43 return f"Address(id={self.id!r}, email_address={self.email_address!r})"
44
45
46async def async_main():
47 engine = create_async_engine(
48 "sqlite+aiosqlite:///:memory:",
49 echo=True,
50 )
51
52 async with engine.begin() as conn:
53 await conn.run_sync(Base.metadata.drop_all)
54 await conn.run_sync(Base.metadata.create_all)
55
56 # expire_on_commit=False will prevent attributes from being expired
57 # after commit.
58 async_session = sessionmaker(
59 engine, expire_on_commit=False, class_=AsyncSession
60 )
61
62 async with async_session() as session:
63 async with session.begin():
64 session.add_all(
65 [
66 User(
67 name="jdoe",
68 fullname="John Doe",
69 addresses=[Address(email_address="jdoe@gmail.com")],
70 ),
71 User(
72 name="pysmith",
73 fullname="Python Smith",
74 addresses=[
75 Address(email_address="smith@python.org"),
76 Address(email_address="pysmith@gmail.com"),
77 ],
78 ),
79 ]
80 )
81
82 # fetch User objects
83 stmt = select(User).options(selectinload(User.addresses))
84
85 # AsyncSession.execute() is used for 2.0 style ORM execution
86 result = await session.execute(stmt)
87
88 # result is a buffered Result object.
89 for user in result.scalars():
90 print(user)
91 print(f"created at: {user.addresses}")
92
93 # get() is also supported
94 user = await session.get(User, 1, options=[selectinload(User.addresses)])
95 if user:
96 print(f"Retrieved user: {user}")
97
98 # update
99 stmt = select(User).where(User.name == "jdoe")
100 result = await session.execute(stmt)
101 user = result.scalar_one()
102 user.fullname = "John Q. Doe"
103
104 await session.commit()
105
106 # for a more efficient way to use AsyncSession, use it as a
107 # context manager directly
108 async with async_session() as session:
109 # check that data was updated
110 stmt = select(User).where(User.name == "jdoe")
111 result = await session.execute(stmt)
112 user = result.scalar_one()
113 print(f"Updated user: {user}")
114
115 await engine.dispose()
116
117
118asyncio.run(async_main())