geo/backend/app/repositories/alert_repository.py

78 lines
2.4 KiB
Python

import uuid
from typing import Optional
from sqlalchemy import select, func, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.alert import Alert
class AlertRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, id: uuid.UUID) -> Optional[Alert]:
result = await self.session.execute(
select(Alert).where(Alert.id == id)
)
return result.scalar_one_or_none()
async def list_by_user(
self, user_id: uuid.UUID, *, skip: int = 0, limit: int = 100
) -> list[Alert]:
result = await self.session.execute(
select(Alert)
.where(Alert.user_id == user_id)
.order_by(Alert.created_at.desc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def count_by_user(self, user_id: uuid.UUID) -> int:
result = await self.session.execute(
select(func.count()).select_from(Alert).where(Alert.user_id == user_id)
)
return result.scalar_one()
async def get_unread_by_user(self, user_id: uuid.UUID) -> list[Alert]:
result = await self.session.execute(
select(Alert).where(
Alert.user_id == user_id,
Alert.is_read == False,
).order_by(Alert.created_at.desc())
)
return list(result.scalars().all())
async def mark_as_read(self, alert_id: uuid.UUID) -> bool:
instance = await self.get_by_id(alert_id)
if instance is None:
return False
instance.is_read = True
await self.session.flush()
return True
async def create(self, **kwargs) -> Alert:
instance = Alert(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, id: uuid.UUID, **kwargs) -> Optional[Alert]:
instance = await self.get_by_id(id)
if instance is None:
return None
for key, value in kwargs.items():
if hasattr(instance, key):
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, id: uuid.UUID) -> bool:
instance = await self.get_by_id(id)
if instance is None:
return False
await self.session.delete(instance)
await self.session.flush()
return True