PYTHONPython

tasks

real world projects / task api / app / routers

PYTHON
tasks.py🐍
"""
Task routes - CRUD operations for tasks.
"""

from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional

from ..database import get_db
from ..models import Task, User
from ..schemas import TaskCreate, TaskUpdate, TaskResponse, TaskPriority
from ..auth import get_current_active_user

router = APIRouter()


@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(
    task: TaskCreate,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    Create a new task.
    
    - **title**: Task title (required)
    - **description**: Optional description
    - **priority**: low, medium, high, urgent
    - **category**: Optional category
    - **due_date**: Optional due date
    """
    db_task = Task(
        **task.model_dump(),
        owner_id=current_user.id
    )
    db.add(db_task)
    db.commit()
    db.refresh(db_task)
    return db_task


@router.get("/", response_model=List[TaskResponse])
def get_tasks(
    skip: int = Query(0, ge=0),
    limit: int = Query(100, ge=1, le=100),
    completed: Optional[bool] = None,
    priority: Optional[TaskPriority] = None,
    category: Optional[str] = None,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    Get all tasks for the current user with optional filtering.
    
    - **skip**: Number of tasks to skip (pagination)
    - **limit**: Maximum tasks to return
    - **completed**: Filter by completion status
    - **priority**: Filter by priority level
    - **category**: Filter by category
    """
    query = db.query(Task).filter(Task.owner_id == current_user.id)
    
    # Apply filters
    if completed is not None:
        query = query.filter(Task.completed == completed)
    if priority is not None:
        query = query.filter(Task.priority == priority.value)
    if category is not None:
        query = query.filter(Task.category == category)
    
    # Order by created_at descending (newest first)
    query = query.order_by(Task.created_at.desc())
    
    return query.offset(skip).limit(limit).all()


@router.get("/{task_id}", response_model=TaskResponse)
def get_task(
    task_id: int,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """Get a specific task by ID."""
    task = db.query(Task).filter(
        Task.id == task_id,
        Task.owner_id == current_user.id
    ).first()
    
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Task not found"
        )
    
    return task


@router.put("/{task_id}", response_model=TaskResponse)
def update_task(
    task_id: int,
    task_update: TaskUpdate,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """Update a task. Only provided fields will be updated."""
    task = db.query(Task).filter(
        Task.id == task_id,
        Task.owner_id == current_user.id
    ).first()
    
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Task not found"
        )
    
    # Update only provided fields
    update_data = task_update.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(task, field, value)
    
    db.commit()
    db.refresh(task)
    
    return task


@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(
    task_id: int,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """Delete a task."""
    task = db.query(Task).filter(
        Task.id == task_id,
        Task.owner_id == current_user.id
    ).first()
    
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Task not found"
        )
    
    db.delete(task)
    db.commit()
    
    return None


@router.patch("/{task_id}/complete", response_model=TaskResponse)
def mark_task_complete(
    task_id: int,
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """Mark a task as completed."""
    task = db.query(Task).filter(
        Task.id == task_id,
        Task.owner_id == current_user.id
    ).first()
    
    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Task not found"
        )
    
    task.completed = True
    db.commit()
    db.refresh(task)
    
    return task


@router.get("/stats/summary")
def get_task_stats(
    current_user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """Get task statistics for the current user."""
    total = db.query(Task).filter(Task.owner_id == current_user.id).count()
    completed = db.query(Task).filter(
        Task.owner_id == current_user.id,
        Task.completed == True
    ).count()
    pending = total - completed
    
    # Count by priority
    by_priority = {}
    for priority in TaskPriority:
        count = db.query(Task).filter(
            Task.owner_id == current_user.id,
            Task.priority == priority.value,
            Task.completed == False
        ).count()
        by_priority[priority.value] = count
    
    return {
        "total": total,
        "completed": completed,
        "pending": pending,
        "completion_rate": round(completed / total * 100, 1) if total > 0 else 0,
        "pending_by_priority": by_priority
    }
PreviousNext