January 28

bazario – обзор библиотеки

В традиционной архитектуре приложения одни и те же методы используются как для чтения данных, так и для их изменения. Это удобно, но со временем усложняет код:

  1. Запросы на чтение и команды на изменение смешиваются, что делает код менее предсказуемым.
  2. Логика обработки данных становится сложной и трудно расширяемой.
  3. Производительность страдает, так как одни и те же модели данных используются для разных целей.

Command Query Responsibility Segregation (CQRS) — это архитектурный паттерн, который разделяет операции чтения (Query) и изменения данных (Command).

  1. Команды (Commands) изменяют состояние системы и не возвращают значений.
  2. Запросы (Queries) запрашивают данные, но не изменяют их.

Такой подход делает систему более масштабируемой, понятной и удобной в сопровождении. Но его реализация требует правильного инструмента для маршрутизации команд и запросов.

bazario — это легковесная библиотека маршрутизации команд и событий, которую можно использовать для удобного внедрения CQRS. Она помогает четко разделить:

  1. Запросы (Request) – команды или операции, которые ожидают результат.
  2. События (Notification) – уведомления о произошедших изменениях или операции, которые не требуют ответа.

Использование bazario позволяет:

  1. Централизовать обработку запросов и событий, упрощая архитектуру приложения.
  2. Гибко управлять обработчиками, легко добавляя или заменяя их без изменения остального кода.
  3. Подключать промежуточные этапы обработки (Pipeline Behaviors) для валидации, логирования, кеширования и других задач.
  4. Использовать DI-контейнеры для удобного управления зависимостями обработчиков.

В следующих разделах мы подробно разберем, как использовать bazario для обработки запросов и событий на практике.


Запросы

Начнем разбираться с библиотекой с объявления класса для запроса на получение курса валют:

from dataclasses import dataclass
from bazario import Request

@dataclass
class RetrieveCurrencyPriceRequest(Request[float]):
    from_currency: str
    to_currency: str

  1. Для удобства использования импортируем dataclass, использовать датакласс или pydantic модель не обязательно, но возможно.
  2. Импортируем Request из bazario, все запросы должны наследоваться от этого класса. В типе параметра указываем float, как тип, который должен вернуться при исполнении этого запроса.
  3. Затем указываем валюты, from_currency – базовую валюту, по отношению к которой мы хотим получить цену to_currency, например цену USD/EUR.


Нам необходимо объявить обработчик, который будет заниматься обработкой этого запроса.

import httpx
from bazario import RequestHandler

class RetrieveCurrencyPriceHandler(RequestHandler[RetrieveCurrencyPriceRequest, float]):
    def handle(self, request: RetrieveCurrencyPriceRequest) -> float:
        response = httpx.get(
            url="https://api.frankfurter.app/latest",
            params={
                "from": request.from_currency,
                "to": request.to_currency
            }
        ).json()

        return response["rates"][request.to_currency]

  1. Мы импортируем httpx как библиотеку через которую мы будем совершать запросы к сервису, который вернет курс валют.
  2. Класс обработчика должен наследоваться от RequestHandler для того, мы его импортируем из bazario.
  3. Мы объявляем класс RetrieveCurrencyPriceHandler и в параметрах типа в RequestHandler указываем класс запроса, который будет обрабатывать этот обработчик и тип ответа, который будет возвращаться после исполнения запроса.


Если вы используете какой-либо DI провайдер, вы можете написать __init__ функцию в обработчике и принимать дополнительные параметры при инициализации обработчика.

У каждого обработчика обязателен метод handle, который вызывается при исполнении запроса и в котором непосредственно происходит вся обработка запроса.

Итого, мы совершаем запрос в API frankfurter.app, передавая туда параметры с валютами, курс которых мы этим получить и возвращаем курс валюты.

В библиотеке на данный момент по умолчанию есть 2 резолвера: DishkaResolver и PunqResolver, с помощью которых можно использовать DI при работе с обработчиками, но в текущем примере мы не используем DI, и нам необходимо написать свой небольшой резолвер в виде SimpleResolver.

Затем мы создаем registry и resolver и создаем dispatcher через который мы можем посылать запросы на исполнение.

from bazario import Registry, Dispatcher, Resolver
from bazario.abc.resolver import TDependency

class SimpleResolver(Resolver):
    def resolve(self, dependency_type: type[TDependency]) -> TDependency:
        return dependency_type()

def main():
    registry = Registry()
    resolver = SimpleResolver()
    dispatcher = Dispatcher(
        registry=registry,
        resolver=resolver
    )

    registry.add_request_handler(RetrieveCurrencyPriceRequest, RetrieveCurrencyPriceHandler)

    from_currency = "EUR"
    to_currency = "USD"
    request = RetrieveCurrencyPriceRequest(
        from_currency=from_currency,
        to_currency=to_currency
    )
    currency_price = dispatcher.send(request)
    print(f"{from_currency}/{to_currency} price: {currency_price}")


if __name__ == "__main__":
    main()


Полный код примера с запросами.

from dataclasses import dataclass

import httpx
from bazario import Request, RequestHandler, Registry, Dispatcher, Resolver
from bazario.abc.resolver import TDependency


class SimpleResolver(Resolver):
    def resolve(self, dependency_type: type[TDependency]) -> TDependency:
        return dependency_type()


@dataclass
class RetrieveCurrencyPriceRequest(Request[float]):
    from_currency: str
    to_currency: str


class RetrieveCurrencyPriceHandler(RequestHandler[RetrieveCurrencyPriceRequest, float]):
    def handle(self, request: RetrieveCurrencyPriceRequest) -> float:
        response = httpx.get(
            url="https://api.frankfurter.app/latest",
            params={
                "from": request.from_currency,
                "to": request.to_currency
            }
        ).json()

        return response["rates"][request.to_currency]


def main():
    registry = Registry()
    resolver = SimpleResolver()
    dispatcher = Dispatcher(
        registry=registry,
        resolver=resolver
    )

    registry.add_request_handler(RetrieveCurrencyPriceRequest, RetrieveCurrencyPriceHandler)

    from_currency = "EUR"
    to_currency = "USD"
    request = RetrieveCurrencyPriceRequest(
        from_currency=from_currency,
        to_currency=to_currency
    )
    currency_price = dispatcher.send(request)
    print(f"{from_currency}/{to_currency} price: {currency_price}")


if __name__ == "__main__":
    main()


Уведомления

С запросами мы разобрались, с уведомлениями отличий от запросов и обработчиков запросов не так много, и мы рассмотрим это далее.

from dataclasses import dataclass
from bazario import Notification

@dataclass
class PostAdded(Notification):
    post_id: int
    user_id: int

  1. Для удобства использования также импортируем dataclass.
  2. Импортируем Notification из bazario, все уведомления должны наследоваться от этого класса. Параметр типа здесь не требуется, так как при вызове уведомлений в ответе ничего не возвращается.
  3. Затем указываем параметры идентификаторы поста, который создался и автора поста.


Далее необходимо также написать обработчик для текущего типа уведомлений. Библиотека позволяет использовать несколько обработчиков для каждого типа уведомлений.

from bazario import NotificationHandler


class PostAddedFirstHandler(NotificationHandler[PostAdded]):
    def handle(self, notification: PostAdded) -> None:
        print(f"Post first added: post_id={notification.post_id}, user_id={notification.user_id}")


class PostAddedSecondHandler(NotificationHandler[PostAdded]):
    def handle(self, notification: PostAdded) -> None:
        print(f"Post second added: post_id={notification.post_id}, user_id={notification.user_id}")

  1. Импортируем NotificationHandler и указываем PostAdded как тип параметра, что принимают данные обработчики уведомлений. Они исполняются последовательно друг за другом согласно очередности регистрации в Registry.
  2. Обработчики уведомлений не отличаются от обработчиков запросов в том плане, что метод handle также здесь обязателен, и в нем происходит вся обработка уведомления.


Здесь также есть поддержка дополнительных параметров при инициализации обработчика, но в этом примере дополнительных параметров не используется и поэтому у нас нет __init__ метода.

Полный код примера с обработчиками уведомлений.

from dataclasses import dataclass

from bazario import Registry, Dispatcher, Resolver, Notification, NotificationHandler
from bazario.abc.resolver import TDependency


class SimpleResolver(Resolver):
    def resolve(self, dependency_type: type[TDependency]) -> TDependency:
        return dependency_type()


@dataclass
class PostAdded(Notification):
    post_id: int
    user_id: int


class PostAddedFirstHandler(NotificationHandler[PostAdded]):
    def handle(self, notification: PostAdded) -> None:
        print(f"Post first added: post_id={notification.post_id}, user_id={notification.user_id}")


class PostAddedSecondHandler(NotificationHandler[PostAdded]):
    def handle(self, notification: PostAdded) -> None:
        print(f"Post second added: post_id={notification.post_id}, user_id={notification.user_id}")


def main():
    registry = Registry()
    resolver = SimpleResolver()
    dispatcher = Dispatcher(
        registry=registry,
        resolver=resolver
    )

    registry.add_notification_handlers(PostAdded, PostAddedFirstHandler, PostAddedSecondHandler)

    notification = PostAdded(post_id=1, user_id=1)
    dispatcher.publish(notification)


if __name__ == "__main__":
    main()


Для регистрации обработчиков уведомлений используется уже другой метод: registry.add_notification_handlers – первым аргументом туда передается тип уведомления, и затем неограниченное количество обработчиков для указанного уведомления.

Для публикации метода необходимо вызвать dispatcher.publish передавая туда инстанс уведомления.


Pipelines

Pipeline behaviors в Bazario позволяют добавлять логику предварительной и последующей обработки запросов и уведомлений.

Эти механизмы образуют цепочку вокруг основной логики обработчика, предоставляя возможность модифицировать или расширять процесс обработки данных.

Можно сказать, что библиотека позволяет подключать мидлвари к запросам и событиям на любом уровне. Их можно добавлять и к самим классам Request и Notification и в более частных случаях – к самим запросам/уведомлениям.

Разберем на примере с запросами.

from typing import Any
from bazario import Request, PipelineBehavior, HandleNext

class RequestLogBehavior(PipelineBehavior[Request, Any]):
    def handle(self, request: Request, handle_next: HandleNext[Request, Any]) -> Any:
        print(f"Received request: {request}")
        return handle_next(request)

  1. Каждый пайплайн должен наследоваться от PipelineBehavior. Параметрами типов указывается класс запроса или уведомления с которым оно взаимодействует, и тип возвращаемого ответа.
  2. В качестве параметров метод handle принимает request – сам запрос или уведомление, и handle_next – то, что вызывается следующим по очереди.


Этот пайплайн делает самую простую функцию: выводит какой запрос пришел для исполнения.

Для регистрации пайплайна в регистри необходимо добавить следующее:

registry.add_pipeline_behaviors(RetrieveCurrencyPriceRequest, RequestLogBehavior)

Полный код примера (запрос и добавленный к нему пайплайн).

from dataclasses import dataclass
from typing import Any

import httpx
from bazario import Request, RequestHandler, Registry, Dispatcher, Resolver, PipelineBehavior, HandleNext
from bazario.abc.resolver import TDependency


class SimpleResolver(Resolver):
    def resolve(self, dependency_type: type[TDependency]) -> TDependency:
        return dependency_type()


class RequestLogBehavior(PipelineBehavior[Request, Any]):
    def handle(self, request: Request, handle_next: HandleNext[Request, Any]) -> Any:
        print(f"Received request: {request}")
        return handle_next(request)


@dataclass
class RetrieveCurrencyPriceRequest(Request[float]):
    from_currency: str
    to_currency: str


class RetrieveCurrencyPriceHandler(RequestHandler[RetrieveCurrencyPriceRequest, float]):
    def handle(self, request: RetrieveCurrencyPriceRequest) -> float:
        response = httpx.get(
            url="https://api.frankfurter.app/latest",
            params={
                "from": request.from_currency,
                "to": request.to_currency
            }
        ).json()

        return response["rates"][request.to_currency]


def main():
    registry = Registry()
    resolver = SimpleResolver()
    dispatcher = Dispatcher(
        registry=registry,
        resolver=resolver
    )

    registry.add_pipeline_behaviors(RetrieveCurrencyPriceRequest, RequestLogBehavior)
    registry.add_request_handler(RetrieveCurrencyPriceRequest, RetrieveCurrencyPriceHandler)

    from_currency = "EUR"
    to_currency = "USD"
    request = RetrieveCurrencyPriceRequest(
        from_currency=from_currency,
        to_currency=to_currency
    )
    currency_price = dispatcher.send(request)
    print(f"{from_currency}/{to_currency} price: {currency_price}")


if __name__ == "__main__":
    main()


На этом обзор библиотеки и кончился ;)

Буду рад обратной связи, ее можно оставить в чате моего канала – тык сюда.

Репозиторий библиотеки: https://github.com/chessenjoyer17/bazario

Документация: https://chessenjoyer17.github.io/bazario/

Мой Telegram канал: https://t.me/goduniblog – подписывайтесь ;)