Skip to main content

Make pydantic have a GraphQL-like assembly experience.

Project description

Pydantic-resolve

CI Python Versions Test Coverage pypi

import asyncio
from random import random
from pydantic import BaseModel
from pydantic_resolve import resolve

class Human(BaseModel):
    name: str
    lucky: bool = True

    async def resolve_lucky(self):
        print('calculating...')
        await asyncio.sleep(1)  # mock i/o
        return random() > 0.5

async def main():
    humans = [Human(name=f'man-{i}') for i in range(10)]
    results = await resolve(humans)
    print(results)

asyncio.run(main())

# calculating... x 10
# [
#   Human(name='man-0', lucky=False),
#   Human(name='man-1', lucky=False),
#   ...
#   Human(name='man-8', lucky=False),
#   Human(name='man-9', lucky=True)
# ]
  • Helps you asynchoronously, resursively resolve a pydantic object (or dataclass object)
  • When used in conjunction with aiodataloader, allows you to easily generate nested data structures without worrying about generating N+1 queries.
  • say byebye to contextvars when using dataloader.
  • Inspired by GraphQL and graphene

Why create this package?

Install

pip install pydantic-resolve
pip install "pydantic-resolve[dataloader]"  # install aiodataloader

imports

from pydantic_resolve import (
    Resolver, LoaderDepend,      # handle schema resolver with DataLoader
    resolve                      # handle simple resolve
)

Feature 1, Resolve asynchoronously, recursiverly, concurrently.

import asyncio
from random import random
from time import time
from pydantic import BaseModel
from pydantic_resolve import resolve

t = time()

class NodeB(BaseModel):
    value_1: int = 0
    async def resolve_value_1(self):
        print(f"resolve_value_1, {time() - t}")
        await asyncio.sleep(1)  # sleep 1
        return random()

class NodeA(BaseModel):
    node_b_1: int = 0
    async def resolve_node_b_1(self):
        print(f"resolve_node_b_1, {time() - t}")
        await asyncio.sleep(1)
        return NodeB()

class Root(BaseModel):  # [!] resolve fields concurrently
    node_a_1: int = 0
    async def resolve_node_a_1(self):
        print(f"resolve_node_a_1, {time() - t}")
        await asyncio.sleep(1)
        return NodeA()

    node_a_2: int = 0
    async def resolve_node_a_2(self):
        print(f"resolve_node_a_2, {time() - t}")
        await asyncio.sleep(1)
        return NodeA()

    node_a_3: int = 0
    async def resolve_node_a_3(self):
        print(f"resolve_node_a_3, {time() - t}")
        await asyncio.sleep(1)
        return NodeA()

async def main():
    root = Root()
    result = await resolve(root)
    print(result.json())
    print(f'total {time() - t}')

asyncio.run(main())
resolve_node_a_1, 0.002000093460083008
resolve_node_a_2, 0.002000093460083008
resolve_node_a_3, 0.002000093460083008

resolve_node_b_1, 1.0142452716827393
resolve_node_b_1, 1.0142452716827393
resolve_node_b_1, 1.0142452716827393

resolve_value_1, 2.0237653255462646
resolve_value_1, 2.0237653255462646
resolve_value_1, 2.0237653255462646

total 3.0269699096679688
{
    "node_a_1": {"node_b_1": {"value_1": 0.912570826381839}}, 
    "node_a_2": {"node_b_1": {"value_1": 0.41784985892912485}}, 
    "node_a_3": {"node_b_1": {"value_1": 0.6148494329990393}}
}

Feature 2: Integrated with aiodataloader:

pydantic_resolve.Resolver will handle the lifecycle and injection of loader instance, you don't need to manage it with contextvars any more.

  1. Define loaders
class FeedbackLoader(DataLoader):
    async def batch_load_fn(self, comment_ids):
        async with async_session() as session:
            res = await session.execute(select(Feedback).where(Feedback.comment_id.in_(comment_ids)))
            rows = res.scalars().all()
            dct = defaultdict(list)
            for row in rows:
                dct[row.comment_id].append(FeedbackSchema.from_orm(row))
            return [dct.get(k, []) for k in comment_ids]


class CommentLoader(DataLoader):
    async def batch_load_fn(self, task_ids):
        async with async_session() as session:
            res = await session.execute(select(Comment).where(Comment.task_id.in_(task_ids)))
            rows = res.scalars().all()

            dct = defaultdict(list)
            for row in rows:
                dct[row.task_id].append(CommentSchema.from_orm(row))
            return [dct.get(k, []) for k in task_ids]
  1. Define schemas
class FeedbackSchema(BaseModel):
    id: int
    comment_id: int
    content: str

    class Config:
        orm_mode = True

class CommentSchema(BaseModel):
    id: int
    task_id: int
    content: str
    feedbacks: Tuple[FeedbackSchema, ...]  = tuple()
    def resolve_feedbacks(self, feedback_loader = LoaderDepend(FeedbackLoader)):  
        # LoaderDepend will manage contextvars for you
        return feedback_loader.load(self.id)

    class Config:
        orm_mode = True

class TaskSchema(BaseModel):
    id: int
    name: str
    comments: Tuple[CommentSchema, ...]  = tuple()
    def resolve_comments(self, comment_loader = LoaderDepend(CommentLoader)):
        return comment_loader.load(self.id)

    class Config:
        orm_mode = True
  1. Resolve it
tasks = (await session.execute(select(Task))).scalars().all()
tasks = [TaskSchema.from_orm(t) for t in tasks]
results = await Resolver().resolve(tasks)  # <=== resolve schema with DataLoaders

# output
[
    {
        'id': 1,
        'name': 'task-1 xyz',
        'comments': [
            {
                'content': 'comment-1 for task 1 (changes)',
                'feedbacks': [
                    {'comment_id': 1, 'content': 'feedback-1 for comment-1 (changes)', 'id': 1},
                    {'comment_id': 1, 'content': 'feedback-2 for comment-1', 'id': 2},
                    {'comment_id': 1, 'content': 'feedback-3 for comment-1', 'id': 3}
                ],
                'id': 1,
                'task_id': 1
            },
            {
                'content': 'comment-2 for task 1',
                'feedbacks': [
                    {'comment_id': 2, 'content': 'test', 'id': 4},
                ],
                'id': 2,
                'task_id': 1
            }
        ]
    }
]

For more examples, please explore examples folder.

Unittest

poetry run python -m unittest  # or
poetry run pytest  # or
poetry run tox

Coverage

poetry run coverage run -m pytest
poetry run coverage report -m

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydantic_resolve-0.4.0.tar.gz (6.3 kB view hashes)

Uploaded Source

Built Distribution

pydantic_resolve-0.4.0-py3-none-any.whl (7.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page