Make pydantic have a GraphQL-like assembly experience.
Project description
Pydantic-resolve
import asyncio
from random import random
from pydantic import BaseModel
from pydantic_resolve import resolve
class Human(BaseModel):
name: str
lucky: bool = True
async def resolve_lucky(self):
print('calculating...')
await asyncio.sleep(1) # mock i/o
return random() > 0.5
async def main():
students = Human(name="john wick's dog" )
results = await resolve(students)
print(results.json())
asyncio.run(main())
# calculating...
# {"name": "john wick's dog", "lucky": false}
- Helps you asynchoronously, resursively resolve a pydantic object (or dataclass object)
- When used in conjunction with aiodataloader, allows you to easily generate nested data structures without worrying about generating N+1 queries.
- say byebye to contextvars when using dataloader.
- Inspired by GraphQL and graphene
Why create this package?
Install
pip install pydantic-resolve
imports
from pydantic_resolve import (
Resolver, LoaderDepend, # schema with DataLoader
resolve # simple resolve
)
Feature 1, Resolve asynchoronously, recursiverly, concurrently.
import asyncio
from random import random
from time import time
from pydantic import BaseModel
from pydantic_resolve import resolve
t = time()
class NodeB(BaseModel):
value_1: int = 0
async def resolve_value_1(self):
print(f"resolve_value_1, {time() - t}")
await asyncio.sleep(1) # sleep 1
return random()
class NodeA(BaseModel):
node_b_1: int = 0
async def resolve_node_b_1(self):
print(f"resolve_node_b_1, {time() - t}")
await asyncio.sleep(1)
return NodeB()
class Root(BaseModel): # [!] resolve fields concurrently
node_a_1: int = 0
async def resolve_node_a_1(self):
print(f"resolve_node_a_1, {time() - t}")
await asyncio.sleep(1)
return NodeA()
node_a_2: int = 0
async def resolve_node_a_2(self):
print(f"resolve_node_a_2, {time() - t}")
await asyncio.sleep(1)
return NodeA()
node_a_3: int = 0
async def resolve_node_a_3(self):
print(f"resolve_node_a_3, {time() - t}")
await asyncio.sleep(1)
return NodeA()
async def main():
root = Root()
result = await resolve(root)
print(result.json())
print(f'total {time() - t}')
asyncio.run(main())
resolve_node_a_1, 0.002000093460083008
resolve_node_a_2, 0.002000093460083008
resolve_node_a_3, 0.002000093460083008
resolve_node_b_1, 1.0142452716827393
resolve_node_b_1, 1.0142452716827393
resolve_node_b_1, 1.0142452716827393
resolve_value_1, 2.0237653255462646
resolve_value_1, 2.0237653255462646
resolve_value_1, 2.0237653255462646
total 3.0269699096679688
{
"node_a_1": {"node_b_1": {"value_1": 0.912570826381839}},
"node_a_2": {"node_b_1": {"value_1": 0.41784985892912485}},
"node_a_3": {"node_b_1": {"value_1": 0.6148494329990393}}
}
Feature 2: Integrated with aiodataloader:
pydantic_resolve.Resolver
will handle the lifecycle and injection of loader instance, you don't need to manage it with contextvars any more.
- Define loaders
class FeedbackLoader(DataLoader):
async def batch_load_fn(self, comment_ids):
async with async_session() as session:
res = await session.execute(select(Feedback).where(Feedback.comment_id.in_(comment_ids)))
rows = res.scalars().all()
dct = defaultdict(list)
for row in rows:
dct[row.comment_id].append(FeedbackSchema.from_orm(row))
return [dct.get(k, []) for k in comment_ids]
class CommentLoader(DataLoader):
async def batch_load_fn(self, task_ids):
async with async_session() as session:
res = await session.execute(select(Comment).where(Comment.task_id.in_(task_ids)))
rows = res.scalars().all()
dct = defaultdict(list)
for row in rows:
dct[row.task_id].append(CommentSchema.from_orm(row))
return [dct.get(k, []) for k in task_ids]
- Define schemas
class FeedbackSchema(BaseModel):
id: int
comment_id: int
content: str
class Config:
orm_mode = True
class CommentSchema(BaseModel):
id: int
task_id: int
content: str
feedbacks: Tuple[FeedbackSchema, ...] = tuple()
def resolve_feedbacks(self, feedback_loader = LoaderDepend(FeedbackLoader)):
# LoaderDepend will manage contextvars for you
return feedback_loader.load(self.id)
class Config:
orm_mode = True
class TaskSchema(BaseModel):
id: int
name: str
comments: Tuple[CommentSchema, ...] = tuple()
def resolve_comments(self, comment_loader = LoaderDepend(CommentLoader)):
return comment_loader.load(self.id)
class Config:
orm_mode = True
- Resolve it
tasks = (await session.execute(select(Task))).scalars().all()
tasks = [TaskSchema.from_orm(t) for t in tasks]
results = await Resolver().resolve(tasks) # <=== resolve schema with DataLoaders
# output
[
{
'id': 1,
'name': 'task-1 xyz',
'comments': [
{
'content': 'comment-1 for task 1 (changes)',
'feedbacks': [
{'comment_id': 1, 'content': 'feedback-1 for comment-1 (changes)', 'id': 1},
{'comment_id': 1, 'content': 'feedback-2 for comment-1', 'id': 2},
{'comment_id': 1, 'content': 'feedback-3 for comment-1', 'id': 3}
],
'id': 1,
'task_id': 1
},
{
'content': 'comment-2 for task 1',
'feedbacks': [
{'comment_id': 2, 'content': 'test', 'id': 4},
],
'id': 2,
'task_id': 1
}
]
}
]
For more examples, please explore examples
folder.
Unittest
poetry run python -m unittest # or
poetry run pytest # or
poetry run tox
Coverage
poetry run coverage run -m pytest
poetry run coverage report -m
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pydantic_resolve-0.3.1.tar.gz
(5.7 kB
view hashes)
Built Distribution
Close
Hashes for pydantic_resolve-0.3.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 468e20bac28f3d84729503e1498b5084b9e505fc5e9d5cba4a3c3fe0c4bb3fef |
|
MD5 | d6d6e8617296ab1aa47e04c7bd7d97ba |
|
BLAKE2b-256 | 9dd8700a42ee385f7c79d1655c59b0343d952fc728b9c41329d9f4a45d9f8ab4 |