geo/backend/app/repositories/query_repository.py

73 lines
2.3 KiB
Python

import uuid
from typing import Optional
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.query import Query
class QueryRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, id: uuid.UUID) -> Optional[Query]:
result = await self.session.execute(
select(Query).where(Query.id == id)
)
return result.scalar_one_or_none()
async def list_by_user(
self, user_id: str, *, skip: int = 0, limit: int = 100
) -> list[Query]:
result = await self.session.execute(
select(Query)
.where(Query.user_id == user_id)
.order_by(Query.created_at.desc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def count_by_user(self, user_id: str) -> int:
result = await self.session.execute(
select(func.count()).select_from(Query).where(Query.user_id == user_id)
)
return result.scalar_one()
async def get_by_brand(self, brand_name: str) -> list[Query]:
result = await self.session.execute(
select(Query).where(Query.target_brand == brand_name)
)
return list(result.scalars().all())
async def get_active_queries(self) -> list[Query]:
result = await self.session.execute(
select(Query).where(Query.status == "active")
)
return list(result.scalars().all())
async def create(self, **kwargs) -> Query:
instance = Query(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, id: uuid.UUID, **kwargs) -> Optional[Query]:
instance = await self.get_by_id(id)
if instance is None:
return None
for key, value in kwargs.items():
if hasattr(instance, key):
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, id: uuid.UUID) -> bool:
instance = await self.get_by_id(id)
if instance is None:
return False
await self.session.delete(instance)
await self.session.flush()
return True