import uuid from typing import Optional from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from app.models.knowledge import KnowledgeBase class KnowledgeRepository: def __init__(self, session: AsyncSession): self.session = session async def get_by_id(self, id: uuid.UUID) -> Optional[KnowledgeBase]: result = await self.session.execute( select(KnowledgeBase).where(KnowledgeBase.id == id) ) return result.scalar_one_or_none() async def list_by_organization( self, organization_id: uuid.UUID, *, skip: int = 0, limit: int = 100 ) -> list[KnowledgeBase]: result = await self.session.execute( select(KnowledgeBase) .where(KnowledgeBase.organization_id == organization_id) .order_by(KnowledgeBase.created_at.desc()) .offset(skip) .limit(limit) ) return list(result.scalars().all()) async def count_by_organization(self, organization_id: uuid.UUID) -> int: result = await self.session.execute( select(func.count()).select_from(KnowledgeBase).where( KnowledgeBase.organization_id == organization_id ) ) return result.scalar_one() async def get_by_organization(self, organization_id: uuid.UUID) -> list[KnowledgeBase]: result = await self.session.execute( select(KnowledgeBase).where( KnowledgeBase.organization_id == organization_id ) ) return list(result.scalars().all()) async def create(self, **kwargs) -> KnowledgeBase: instance = KnowledgeBase(**kwargs) self.session.add(instance) await self.session.flush() return instance async def update(self, id: uuid.UUID, **kwargs) -> Optional[KnowledgeBase]: instance = await self.get_by_id(id) if instance is None: return None for key, value in kwargs.items(): if hasattr(instance, key): setattr(instance, key, value) await self.session.flush() return instance async def delete(self, id: uuid.UUID) -> bool: instance = await self.get_by_id(id) if instance is None: return False await self.session.delete(instance) await self.session.flush() return True