geo/backend/app/repositories/brand_repository.py

72 lines
2.2 KiB
Python

import uuid
from typing import Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.brand import Brand
class BrandRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, id: uuid.UUID) -> Optional[Brand]:
result = await self.session.execute(
select(Brand).where(Brand.id == id)
)
return result.scalar_one_or_none()
async def list_by_user(
self, user_id: uuid.UUID, *, skip: int = 0, limit: int = 100
) -> list[Brand]:
result = await self.session.execute(
select(Brand)
.where(Brand.user_id == user_id)
.order_by(Brand.created_at.desc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def count_by_user(self, user_id: uuid.UUID) -> int:
result = await self.session.execute(
select(func.count()).select_from(Brand).where(Brand.user_id == user_id)
)
return result.scalar_one()
async def get_by_name_and_organization(
self, name: str, organization_id: uuid.UUID
) -> Optional[Brand]:
result = await self.session.execute(
select(Brand).where(
Brand.name == name,
Brand.user_id == organization_id,
)
)
return result.scalar_one_or_none()
async def create(self, **kwargs) -> Brand:
instance = Brand(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, id: uuid.UUID, **kwargs) -> Optional[Brand]:
instance = await self.get_by_id(id)
if instance is None:
return None
for key, value in kwargs.items():
if hasattr(instance, key):
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, id: uuid.UUID) -> bool:
instance = await self.get_by_id(id)
if instance is None:
return False
await self.session.delete(instance)
await self.session.flush()
return True