bazario – обзор библиотеки
В традиционной архитектуре приложения одни и те же методы используются как для чтения данных, так и для их изменения. Это удобно, но со временем усложняет код:
- Запросы на чтение и команды на изменение смешиваются, что делает код менее предсказуемым.
- Логика обработки данных становится сложной и трудно расширяемой.
- Производительность страдает, так как одни и те же модели данных используются для разных целей.
Command Query Responsibility Segregation (CQRS) — это архитектурный паттерн, который разделяет операции чтения (Query) и изменения данных (Command).
- Команды (Commands) изменяют состояние системы и не возвращают значений.
- Запросы (Queries) запрашивают данные, но не изменяют их.
Такой подход делает систему более масштабируемой, понятной и удобной в сопровождении. Но его реализация требует правильного инструмента для маршрутизации команд и запросов.
bazario — это легковесная библиотека маршрутизации команд и событий, которую можно использовать для удобного внедрения CQRS. Она помогает четко разделить:
- Запросы (Request) – команды или операции, которые ожидают результат.
- События (Notification) – уведомления о произошедших изменениях или операции, которые не требуют ответа.
Использование bazario позволяет:
- Централизовать обработку запросов и событий, упрощая архитектуру приложения.
- Гибко управлять обработчиками, легко добавляя или заменяя их без изменения остального кода.
- Подключать промежуточные этапы обработки (Pipeline Behaviors) для валидации, логирования, кеширования и других задач.
- Использовать DI-контейнеры для удобного управления зависимостями обработчиков.
В следующих разделах мы подробно разберем, как использовать bazario для обработки запросов и событий на практике.
Запросы
Начнем разбираться с библиотекой с объявления класса для запроса на получение курса валют:
from dataclasses import dataclass
from bazario import Request
@dataclass
class RetrieveCurrencyPriceRequest(Request[float]):
from_currency: str
to_currency: str- Для удобства использования импортируем dataclass, использовать датакласс или pydantic модель не обязательно, но возможно.
- Импортируем
Requestизbazario, все запросы должны наследоваться от этого класса. В типе параметра указываемfloat, как тип, который должен вернуться при исполнении этого запроса. - Затем указываем валюты,
from_currency– базовую валюту, по отношению к которой мы хотим получить ценуto_currency, например цену USD/EUR.
Нам необходимо объявить обработчик, который будет заниматься обработкой этого запроса.
import httpx
from bazario import RequestHandler
class RetrieveCurrencyPriceHandler(RequestHandler[RetrieveCurrencyPriceRequest, float]):
def handle(self, request: RetrieveCurrencyPriceRequest) -> float:
response = httpx.get(
url="https://api.frankfurter.app/latest",
params={
"from": request.from_currency,
"to": request.to_currency
}
).json()
return response["rates"][request.to_currency]- Мы импортируем
httpxкак библиотеку через которую мы будем совершать запросы к сервису, который вернет курс валют. - Класс обработчика должен наследоваться от
RequestHandlerдля того, мы его импортируем изbazario. - Мы объявляем класс
RetrieveCurrencyPriceHandlerи в параметрах типа вRequestHandlerуказываем класс запроса, который будет обрабатывать этот обработчик и тип ответа, который будет возвращаться после исполнения запроса.
Если вы используете какой-либо DI провайдер, вы можете написать __init__ функцию в обработчике и принимать дополнительные параметры при инициализации обработчика.
У каждого обработчика обязателен метод handle, который вызывается при исполнении запроса и в котором непосредственно происходит вся обработка запроса.
Итого, мы совершаем запрос в API frankfurter.app, передавая туда параметры с валютами, курс которых мы этим получить и возвращаем курс валюты.
В библиотеке на данный момент по умолчанию есть 2 резолвера: DishkaResolver и PunqResolver, с помощью которых можно использовать DI при работе с обработчиками, но в текущем примере мы не используем DI, и нам необходимо написать свой небольшой резолвер в виде SimpleResolver.
Затем мы создаем registry и resolver и создаем dispatcher через который мы можем посылать запросы на исполнение.
from bazario import Registry, Dispatcher, Resolver
from bazario.abc.resolver import TDependency
class SimpleResolver(Resolver):
def resolve(self, dependency_type: type[TDependency]) -> TDependency:
return dependency_type()
def main():
registry = Registry()
resolver = SimpleResolver()
dispatcher = Dispatcher(
registry=registry,
resolver=resolver
)
registry.add_request_handler(RetrieveCurrencyPriceRequest, RetrieveCurrencyPriceHandler)
from_currency = "EUR"
to_currency = "USD"
request = RetrieveCurrencyPriceRequest(
from_currency=from_currency,
to_currency=to_currency
)
currency_price = dispatcher.send(request)
print(f"{from_currency}/{to_currency} price: {currency_price}")
if __name__ == "__main__":
main()
Полный код примера с запросами.
from dataclasses import dataclass
import httpx
from bazario import Request, RequestHandler, Registry, Dispatcher, Resolver
from bazario.abc.resolver import TDependency
class SimpleResolver(Resolver):
def resolve(self, dependency_type: type[TDependency]) -> TDependency:
return dependency_type()
@dataclass
class RetrieveCurrencyPriceRequest(Request[float]):
from_currency: str
to_currency: str
class RetrieveCurrencyPriceHandler(RequestHandler[RetrieveCurrencyPriceRequest, float]):
def handle(self, request: RetrieveCurrencyPriceRequest) -> float:
response = httpx.get(
url="https://api.frankfurter.app/latest",
params={
"from": request.from_currency,
"to": request.to_currency
}
).json()
return response["rates"][request.to_currency]
def main():
registry = Registry()
resolver = SimpleResolver()
dispatcher = Dispatcher(
registry=registry,
resolver=resolver
)
registry.add_request_handler(RetrieveCurrencyPriceRequest, RetrieveCurrencyPriceHandler)
from_currency = "EUR"
to_currency = "USD"
request = RetrieveCurrencyPriceRequest(
from_currency=from_currency,
to_currency=to_currency
)
currency_price = dispatcher.send(request)
print(f"{from_currency}/{to_currency} price: {currency_price}")
if __name__ == "__main__":
main()
Уведомления
С запросами мы разобрались, с уведомлениями отличий от запросов и обработчиков запросов не так много, и мы рассмотрим это далее.
from dataclasses import dataclass
from bazario import Notification
@dataclass
class PostAdded(Notification):
post_id: int
user_id: int- Для удобства использования также импортируем dataclass.
- Импортируем
Notificationизbazario, все уведомления должны наследоваться от этого класса. Параметр типа здесь не требуется, так как при вызове уведомлений в ответе ничего не возвращается. - Затем указываем параметры идентификаторы поста, который создался и автора поста.
Далее необходимо также написать обработчик для текущего типа уведомлений. Библиотека позволяет использовать несколько обработчиков для каждого типа уведомлений.
from bazario import NotificationHandler
class PostAddedFirstHandler(NotificationHandler[PostAdded]):
def handle(self, notification: PostAdded) -> None:
print(f"Post first added: post_id={notification.post_id}, user_id={notification.user_id}")
class PostAddedSecondHandler(NotificationHandler[PostAdded]):
def handle(self, notification: PostAdded) -> None:
print(f"Post second added: post_id={notification.post_id}, user_id={notification.user_id}")- Импортируем
NotificationHandlerи указываемPostAddedкак тип параметра, что принимают данные обработчики уведомлений. Они исполняются последовательно друг за другом согласно очередности регистрации вRegistry. - Обработчики уведомлений не отличаются от обработчиков запросов в том плане, что метод
handleтакже здесь обязателен, и в нем происходит вся обработка уведомления.
Здесь также есть поддержка дополнительных параметров при инициализации обработчика, но в этом примере дополнительных параметров не используется и поэтому у нас нет __init__ метода.
Полный код примера с обработчиками уведомлений.
from dataclasses import dataclass
from bazario import Registry, Dispatcher, Resolver, Notification, NotificationHandler
from bazario.abc.resolver import TDependency
class SimpleResolver(Resolver):
def resolve(self, dependency_type: type[TDependency]) -> TDependency:
return dependency_type()
@dataclass
class PostAdded(Notification):
post_id: int
user_id: int
class PostAddedFirstHandler(NotificationHandler[PostAdded]):
def handle(self, notification: PostAdded) -> None:
print(f"Post first added: post_id={notification.post_id}, user_id={notification.user_id}")
class PostAddedSecondHandler(NotificationHandler[PostAdded]):
def handle(self, notification: PostAdded) -> None:
print(f"Post second added: post_id={notification.post_id}, user_id={notification.user_id}")
def main():
registry = Registry()
resolver = SimpleResolver()
dispatcher = Dispatcher(
registry=registry,
resolver=resolver
)
registry.add_notification_handlers(PostAdded, PostAddedFirstHandler, PostAddedSecondHandler)
notification = PostAdded(post_id=1, user_id=1)
dispatcher.publish(notification)
if __name__ == "__main__":
main()
Для регистрации обработчиков уведомлений используется уже другой метод: registry.add_notification_handlers – первым аргументом туда передается тип уведомления, и затем неограниченное количество обработчиков для указанного уведомления.
Для публикации метода необходимо вызвать dispatcher.publish передавая туда инстанс уведомления.
Pipelines
Pipeline behaviors в Bazario позволяют добавлять логику предварительной и последующей обработки запросов и уведомлений.
Эти механизмы образуют цепочку вокруг основной логики обработчика, предоставляя возможность модифицировать или расширять процесс обработки данных.
Можно сказать, что библиотека позволяет подключать мидлвари к запросам и событиям на любом уровне. Их можно добавлять и к самим классам Request и Notification и в более частных случаях – к самим запросам/уведомлениям.
Разберем на примере с запросами.
from typing import Any
from bazario import Request, PipelineBehavior, HandleNext
class RequestLogBehavior(PipelineBehavior[Request, Any]):
def handle(self, request: Request, handle_next: HandleNext[Request, Any]) -> Any:
print(f"Received request: {request}")
return handle_next(request)- Каждый пайплайн должен наследоваться от
PipelineBehavior. Параметрами типов указывается класс запроса или уведомления с которым оно взаимодействует, и тип возвращаемого ответа. - В качестве параметров метод
handleпринимаетrequest– сам запрос или уведомление, иhandle_next– то, что вызывается следующим по очереди.
Этот пайплайн делает самую простую функцию: выводит какой запрос пришел для исполнения.
Для регистрации пайплайна в регистри необходимо добавить следующее:
registry.add_pipeline_behaviors(RetrieveCurrencyPriceRequest, RequestLogBehavior)
Полный код примера (запрос и добавленный к нему пайплайн).
from dataclasses import dataclass
from typing import Any
import httpx
from bazario import Request, RequestHandler, Registry, Dispatcher, Resolver, PipelineBehavior, HandleNext
from bazario.abc.resolver import TDependency
class SimpleResolver(Resolver):
def resolve(self, dependency_type: type[TDependency]) -> TDependency:
return dependency_type()
class RequestLogBehavior(PipelineBehavior[Request, Any]):
def handle(self, request: Request, handle_next: HandleNext[Request, Any]) -> Any:
print(f"Received request: {request}")
return handle_next(request)
@dataclass
class RetrieveCurrencyPriceRequest(Request[float]):
from_currency: str
to_currency: str
class RetrieveCurrencyPriceHandler(RequestHandler[RetrieveCurrencyPriceRequest, float]):
def handle(self, request: RetrieveCurrencyPriceRequest) -> float:
response = httpx.get(
url="https://api.frankfurter.app/latest",
params={
"from": request.from_currency,
"to": request.to_currency
}
).json()
return response["rates"][request.to_currency]
def main():
registry = Registry()
resolver = SimpleResolver()
dispatcher = Dispatcher(
registry=registry,
resolver=resolver
)
registry.add_pipeline_behaviors(RetrieveCurrencyPriceRequest, RequestLogBehavior)
registry.add_request_handler(RetrieveCurrencyPriceRequest, RetrieveCurrencyPriceHandler)
from_currency = "EUR"
to_currency = "USD"
request = RetrieveCurrencyPriceRequest(
from_currency=from_currency,
to_currency=to_currency
)
currency_price = dispatcher.send(request)
print(f"{from_currency}/{to_currency} price: {currency_price}")
if __name__ == "__main__":
main()
На этом обзор библиотеки и кончился ;)
Буду рад обратной связи, ее можно оставить в чате моего канала – тык сюда.
Репозиторий библиотеки: https://github.com/chessenjoyer17/bazario
Документация: https://chessenjoyer17.github.io/bazario/
Мой Telegram канал: https://t.me/goduniblog – подписывайтесь ;)