PYTHON
tasks.py🐍python
"""
Task routes - CRUD operations for tasks.
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from ..database import get_db
from ..models import Task, User
from ..schemas import TaskCreate, TaskUpdate, TaskResponse, TaskPriority
from ..auth import get_current_active_user
router = APIRouter()
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(
task: TaskCreate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Create a new task.
- **title**: Task title (required)
- **description**: Optional description
- **priority**: low, medium, high, urgent
- **category**: Optional category
- **due_date**: Optional due date
"""
db_task = Task(
**task.model_dump(),
owner_id=current_user.id
)
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
@router.get("/", response_model=List[TaskResponse])
def get_tasks(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
completed: Optional[bool] = None,
priority: Optional[TaskPriority] = None,
category: Optional[str] = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get all tasks for the current user with optional filtering.
- **skip**: Number of tasks to skip (pagination)
- **limit**: Maximum tasks to return
- **completed**: Filter by completion status
- **priority**: Filter by priority level
- **category**: Filter by category
"""
query = db.query(Task).filter(Task.owner_id == current_user.id)
# Apply filters
if completed is not None:
query = query.filter(Task.completed == completed)
if priority is not None:
query = query.filter(Task.priority == priority.value)
if category is not None:
query = query.filter(Task.category == category)
# Order by created_at descending (newest first)
query = query.order_by(Task.created_at.desc())
return query.offset(skip).limit(limit).all()
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(
task_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get a specific task by ID."""
task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.put("/{task_id}", response_model=TaskResponse)
def update_task(
task_id: int,
task_update: TaskUpdate,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update a task. Only provided fields will be updated."""
task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
# Update only provided fields
update_data = task_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
return task
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(
task_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Delete a task."""
task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
db.delete(task)
db.commit()
return None
@router.patch("/{task_id}/complete", response_model=TaskResponse)
def mark_task_complete(
task_id: int,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Mark a task as completed."""
task = db.query(Task).filter(
Task.id == task_id,
Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
task.completed = True
db.commit()
db.refresh(task)
return task
@router.get("/stats/summary")
def get_task_stats(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get task statistics for the current user."""
total = db.query(Task).filter(Task.owner_id == current_user.id).count()
completed = db.query(Task).filter(
Task.owner_id == current_user.id,
Task.completed == True
).count()
pending = total - completed
# Count by priority
by_priority = {}
for priority in TaskPriority:
count = db.query(Task).filter(
Task.owner_id == current_user.id,
Task.priority == priority.value,
Task.completed == False
).count()
by_priority[priority.value] = count
return {
"total": total,
"completed": completed,
"pending": pending,
"completion_rate": round(completed / total * 100, 1) if total > 0 else 0,
"pending_by_priority": by_priority
}