68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import Boolean, DateTime, ForeignKey, Index, String, Uuid, func
|
|
from sqlalchemy.orm import Mapped, mapped_column
|
|
|
|
from app.database import Base, JSONType
|
|
|
|
VALID_FREQUENCIES = {"hourly", "daily", "weekly"}
|
|
|
|
FREQUENCY_DELTAS = {
|
|
"hourly": timedelta(hours=1),
|
|
"daily": timedelta(days=1),
|
|
"weekly": timedelta(weeks=1),
|
|
}
|
|
|
|
|
|
class DetectionTask(Base):
|
|
__tablename__ = "detection_tasks"
|
|
|
|
id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
primary_key=True,
|
|
default=uuid.uuid4,
|
|
)
|
|
brand_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
ForeignKey("brands.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
user_id: Mapped[uuid.UUID] = mapped_column(
|
|
Uuid(as_uuid=True),
|
|
nullable=False,
|
|
index=True,
|
|
)
|
|
name: Mapped[str] = mapped_column(String(200), nullable=False)
|
|
frequency: Mapped[str] = mapped_column(String(20), nullable=False)
|
|
engines: Mapped[list] = mapped_column(JSONType, default=list, nullable=False)
|
|
queries: Mapped[list] = mapped_column(JSONType, default=list, nullable=False)
|
|
competitor_names: Mapped[list | None] = mapped_column(JSONType, nullable=True)
|
|
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
|
|
last_run_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
|
|
next_run_at: Mapped[datetime | None] = mapped_column(
|
|
DateTime(timezone=True), nullable=True, default=lambda: datetime.now(timezone.utc)
|
|
)
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True),
|
|
server_default=func.now(),
|
|
nullable=False,
|
|
)
|
|
updated_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True),
|
|
server_default=func.now(),
|
|
onupdate=func.now(),
|
|
nullable=False,
|
|
)
|
|
|
|
__table_args__ = (
|
|
Index("idx_detection_tasks_brand_id", "brand_id"),
|
|
Index("idx_detection_tasks_user_id", "user_id"),
|
|
Index("idx_detection_tasks_is_active", "is_active"),
|
|
)
|
|
|
|
def compute_next_run_at(self) -> datetime:
|
|
delta = FREQUENCY_DELTAS.get(self.frequency, timedelta(days=1))
|
|
base = self.last_run_at or datetime.now(timezone.utc)
|
|
return base + delta
|