Make pydantic have a GraphQL-like assembly experience.
Project description
Pydantic-resolve
import asyncio
from random import random
from pydantic import BaseModel
from pydantic_resolve import resolve
class Human(BaseModel):
name: str
lucky: bool = True
async def resolve_lucky(self):
print('calculating...')
await asyncio.sleep(1) # mock i/o
return random() > 0.5
async def main():
humans = [Human(name=f'man-{i}') for i in range(10)]
results = await resolve(humans)
print(results)
asyncio.run(main())
# calculating... x 10
# [
# Human(name='man-0', lucky=False),
# Human(name='man-1', lucky=False),
# ...
# Human(name='man-8', lucky=False),
# Human(name='man-9', lucky=True)
# ]
- Helps you asynchoronously, resursively resolve a pydantic object (or dataclass object)
- When used in conjunction with aiodataloader, allows you to easily generate nested data structures without worrying about generating N+1 queries.
- say byebye to contextvars when using dataloader.
- Inspired by GraphQL and graphene
Why create this package?
Install
pip install pydantic-resolve
pip install "pydantic-resolve[dataloader]" # install aiodataloader
imports
from pydantic_resolve import (
Resolver, LoaderDepend, # handle schema resolver with DataLoader
resolve # handle simple resolve
)
Feature 1, Resolve asynchoronously, recursiverly, concurrently.
import asyncio
from random import random
from time import time
from pydantic import BaseModel
from pydantic_resolve import resolve
t = time()
class NodeB(BaseModel):
value_1: int = 0
async def resolve_value_1(self):
print(f"resolve_value_1, {time() - t}")
await asyncio.sleep(1) # sleep 1
return random()
class NodeA(BaseModel):
node_b_1: int = 0
async def resolve_node_b_1(self):
print(f"resolve_node_b_1, {time() - t}")
await asyncio.sleep(1)
return NodeB()
class Root(BaseModel): # [!] resolve fields concurrently
node_a_1: int = 0
async def resolve_node_a_1(self):
print(f"resolve_node_a_1, {time() - t}")
await asyncio.sleep(1)
return NodeA()
node_a_2: int = 0
async def resolve_node_a_2(self):
print(f"resolve_node_a_2, {time() - t}")
await asyncio.sleep(1)
return NodeA()
node_a_3: int = 0
async def resolve_node_a_3(self):
print(f"resolve_node_a_3, {time() - t}")
await asyncio.sleep(1)
return NodeA()
async def main():
root = Root()
result = await resolve(root)
print(result.json())
print(f'total {time() - t}')
asyncio.run(main())
resolve_node_a_1, 0.002000093460083008
resolve_node_a_2, 0.002000093460083008
resolve_node_a_3, 0.002000093460083008
resolve_node_b_1, 1.0142452716827393
resolve_node_b_1, 1.0142452716827393
resolve_node_b_1, 1.0142452716827393
resolve_value_1, 2.0237653255462646
resolve_value_1, 2.0237653255462646
resolve_value_1, 2.0237653255462646
total 3.0269699096679688
{
"node_a_1": {"node_b_1": {"value_1": 0.912570826381839}},
"node_a_2": {"node_b_1": {"value_1": 0.41784985892912485}},
"node_a_3": {"node_b_1": {"value_1": 0.6148494329990393}}
}
Feature 2: Integrated with aiodataloader:
pydantic_resolve.Resolver
will handle the lifecycle and injection of loader instance, you don't need to manage it with contextvars any more.
- Define loaders
class FeedbackLoader(DataLoader):
async def batch_load_fn(self, comment_ids):
async with async_session() as session:
res = await session.execute(select(Feedback).where(Feedback.comment_id.in_(comment_ids)))
rows = res.scalars().all()
dct = defaultdict(list)
for row in rows:
dct[row.comment_id].append(FeedbackSchema.from_orm(row))
return [dct.get(k, []) for k in comment_ids]
class CommentLoader(DataLoader):
async def batch_load_fn(self, task_ids):
async with async_session() as session:
res = await session.execute(select(Comment).where(Comment.task_id.in_(task_ids)))
rows = res.scalars().all()
dct = defaultdict(list)
for row in rows:
dct[row.task_id].append(CommentSchema.from_orm(row))
return [dct.get(k, []) for k in task_ids]
- Define schemas
class FeedbackSchema(BaseModel):
id: int
comment_id: int
content: str
class Config:
orm_mode = True
class CommentSchema(BaseModel):
id: int
task_id: int
content: str
feedbacks: Tuple[FeedbackSchema, ...] = tuple()
def resolve_feedbacks(self, feedback_loader = LoaderDepend(FeedbackLoader)):
# LoaderDepend will manage contextvars for you
return feedback_loader.load(self.id)
class Config:
orm_mode = True
class TaskSchema(BaseModel):
id: int
name: str
comments: Tuple[CommentSchema, ...] = tuple()
def resolve_comments(self, comment_loader = LoaderDepend(CommentLoader)):
return comment_loader.load(self.id)
class Config:
orm_mode = True
- Resolve it
tasks = (await session.execute(select(Task))).scalars().all()
tasks = [TaskSchema.from_orm(t) for t in tasks]
results = await Resolver().resolve(tasks) # <=== resolve schema with DataLoaders
# output
[
{
'id': 1,
'name': 'task-1 xyz',
'comments': [
{
'content': 'comment-1 for task 1 (changes)',
'feedbacks': [
{'comment_id': 1, 'content': 'feedback-1 for comment-1 (changes)', 'id': 1},
{'comment_id': 1, 'content': 'feedback-2 for comment-1', 'id': 2},
{'comment_id': 1, 'content': 'feedback-3 for comment-1', 'id': 3}
],
'id': 1,
'task_id': 1
},
{
'content': 'comment-2 for task 1',
'feedbacks': [
{'comment_id': 2, 'content': 'test', 'id': 4},
],
'id': 2,
'task_id': 1
}
]
}
]
For more examples, please explore examples
folder.
Unittest
poetry run python -m unittest # or
poetry run pytest # or
poetry run tox
Coverage
poetry run coverage run -m pytest
poetry run coverage report -m
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pydantic_resolve-0.4.0.tar.gz
(6.3 kB
view hashes)
Built Distribution
Close
Hashes for pydantic_resolve-0.4.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e33f6cceefb7943e9624611c6aa40f13dd40df1c703c1990b3d9de58cd8a2843 |
|
MD5 | 9f8e2d92501f98fbdec42d8db8d9bda8 |
|
BLAKE2b-256 | fd166c9cd920493b06fbd7c474d8fd3f03f06494118f17c92aa6a88db2876d59 |