diff --git a/api/utils/community_base_service.py b/api/utils/community_base_service.py new file mode 100644 index 000000000..2fb32dd6e --- /dev/null +++ b/api/utils/community_base_service.py @@ -0,0 +1,66 @@ +from sqlalchemy.orm import Session +from fastapi import HTTPException, status +from sqlalchemy.exc import SQLAlchemyError +from typing import Optional, Any, Dict, Type, List + +class BaseService: + """Base service class to handle common CRUD operations.""" + def __init__(self,model: Type[Any]) -> None: + self.model = model + + def fetch_all(self, db: Session, **query_params: Optional[Any]) -> List[Any]: + """Fetch all records with optional filtering.""" + query = db.query(self.model) + if query_params: + for column, value in query_params.items(): + if hasattr(self.model, column) and value: + query = query.filter(getattr(self.model, column).ilike(f"%{value}%")) + return query.all() + + def fetch_by_id(self, db: Session, item_id: str) -> Any: + """Fetch a record by its ID.""" + item = db.query(self.model).filter(self.model.id == item_id).first() + if not item: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"{self.model.__name__} with ID {item_id} not found" + ) + return item + + def fetch_by_column(self, db: Session, column: str, value: Any) -> List[Any]: + """Fetch records by a specific column value.""" + if hasattr(self.model, column): + return db.query(self.model).filter(getattr(self.model, column) == value).all() + return [] + + def update(self, db: Session, item_id: str, update_data: Dict[str, Any]) -> Any: + """Update a record with the provided data.""" + item = self.fetch_by_id(db, item_id) + try: + for key, value in update_data.items(): + if hasattr(item, key): + setattr(item, key, value) + db.commit() + db.refresh(item) + return item + except SQLAlchemyError as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to update {self.model.__name__}: {str(e)}" + ) + + def delete(self, db: Session, item_id: str) -> Dict[str, str]: + """Delete a record by its ID.""" + item = self.fetch_by_id(db, item_id) + try: + db.delete(item) + db.commit() + return {"status": "success", "detail": f"{self.model.__name__} with ID {item_id} deleted successfully"} + except SQLAlchemyError as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to delete {self.model.__name__}: {str(e)}" + ) + diff --git a/api/v1/models/__init__.py b/api/v1/models/__init__.py index a39ec4bae..a240424d3 100644 --- a/api/v1/models/__init__.py +++ b/api/v1/models/__init__.py @@ -30,7 +30,7 @@ from api.v1.models.terms import TermsAndConditions from api.v1.models.reset_password_token import ResetPasswordToken from api.v1.models.faq_inquiries import FAQInquiries +from api.v1.models.community import CommunityQuestion, CommunityAnswer from api.v1.models.wishlist import Wishlist from api.v1.models.totp_device import TOTPDevice -from api.v1.models.bookmark import Bookmark - +from api.v1.models.bookmark import Bookmark \ No newline at end of file diff --git a/api/v1/models/community.py b/api/v1/models/community.py new file mode 100644 index 000000000..8685070a9 --- /dev/null +++ b/api/v1/models/community.py @@ -0,0 +1,38 @@ +from sqlalchemy import Column, String, DateTime, Text, ForeignKey, Boolean +from sqlalchemy.orm import relationship +from sqlalchemy.sql import func +from api.v1.models.base_model import BaseTableModel + +class CommunityQuestion(BaseTableModel): + __tablename__ = "community_questions" + + title = Column(String, nullable=False) + message = Column(Text, nullable=False) + user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) + timestamp = Column(DateTime(timezone=True), server_default=func.now()) + is_resolved = Column(Boolean, default=False) + + # Relationships + user = relationship("User", back_populates="questions") + answers = relationship("CommunityAnswer", back_populates="question", cascade="all, delete-orphan") + + def __repr__(self): + return f"" + + +class CommunityAnswer(BaseTableModel): + __tablename__ = "community_answers" + + message = Column(Text, nullable=False) + user_id = Column(String, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) + question_id = Column(String, ForeignKey("community_questions.id", ondelete="CASCADE"), nullable=False) + timestamp = Column(DateTime(timezone=True), server_default=func.now()) + is_accepted = Column(Boolean, default=False) + + # Relationships + user = relationship("User", back_populates="answers") + question = relationship("CommunityQuestion", back_populates="answers") + + def __repr__(self): + return f"" + diff --git a/api/v1/models/user.py b/api/v1/models/user.py index 85ad5fa12..73f0e227b 100644 --- a/api/v1/models/user.py +++ b/api/v1/models/user.py @@ -103,20 +103,23 @@ class User(BaseTableModel): "Reply", back_populates="user", cascade="all, delete-orphan" ) + reset_password_token = relationship("ResetPasswordToken", back_populates="user", cascade="all, delete-orphan") - - wishlist = relationship("Wishlist", - back_populates="user", - cascade="all, delete-orphan") + questions = relationship("CommunityQuestion", back_populates="user", cascade="all, delete-orphan") + answers = relationship("CommunityAnswer", back_populates="user", cascade="all, delete-orphan") + wishlist = relationship("Wishlist", + back_populates="user", + cascade="all, delete-orphan") + totp_device = relationship("TOTPDevice", back_populates="user", cascade="all, delete-orphan") bookmarks = relationship( - "Bookmark", back_populates="user", cascade="delete" + "Bookmark", back_populates="user", cascade="delete" ) - + def to_dict(self): obj_dict = super().to_dict() obj_dict.pop("password") diff --git a/api/v1/routes/__init__.py b/api/v1/routes/__init__.py index fdd167d85..835e2ddc1 100644 --- a/api/v1/routes/__init__.py +++ b/api/v1/routes/__init__.py @@ -46,8 +46,9 @@ from api.v1.routes.settings import settings from api.v1.routes.terms_and_conditions import terms_and_conditions from api.v1.routes.stripe import subscription_ +from api.v1.routes.community import community_questions +from api.v1.routes.community import community_answers from api.v1.routes.wishlist import wishlist - api_version_one = APIRouter(prefix="/api/v1") api_version_one.include_router(api_status) @@ -97,4 +98,6 @@ api_version_one.include_router(terms_and_conditions) api_version_one.include_router(product_comment) api_version_one.include_router(subscription_) +api_version_one.include_router(community_questions) +api_version_one.include_router(community_answers) api_version_one.include_router(wishlist) diff --git a/api/v1/routes/community.py b/api/v1/routes/community.py new file mode 100644 index 000000000..ac49cae91 --- /dev/null +++ b/api/v1/routes/community.py @@ -0,0 +1,364 @@ +from fastapi import APIRouter, Depends, status, HTTPException, Query +from fastapi.encoders import jsonable_encoder +from sqlalchemy.orm import Session +from typing import Optional, List + +from api.v1.models.user import User +from api.v1.schemas.community import ( + CommunityQuestionCreate, + CommunityQuestionResponse, + CommunityAnswerCreate, + CommunityAnswerResponse, + CommunityQuestionWithAnswers +) +from api.v1.services.community import community_question_service, community_answer_service +from api.v1.services.user import user_service +from api.db.database import get_db +from api.utils.success_response import success_response + +# Router for Questions +community_questions = APIRouter(prefix="/community/questions", tags=["Community Questions"]) + +@community_questions.post("/create", status_code=status.HTTP_201_CREATED) +async def create_question( + question: CommunityQuestionCreate, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Create a new community question""" + + new_question = community_question_service.create_question( + db=db, + title=question.title, + message=question.message, + user_id=current_user.id # Use the authenticated user's ID + ) + + return success_response( + status_code=201, + message="Question created successfully", + data=jsonable_encoder(new_question) + ) + +@community_questions.get("", response_model=List[CommunityQuestionResponse]) +async def get_all_questions( + title: Optional[str] = Query(None, description="Filter by title"), + is_resolved: Optional[bool] = Query(None, description="Filter by resolution status"), + db: Session = Depends(get_db) +): + """Get all community questions with optional filters""" + + query_params = {} + if title: + query_params["title"] = title + if is_resolved is not None: + query_params["is_resolved"] = is_resolved + + questions = community_question_service.fetch_all(db=db, **query_params) + + return success_response( + status_code=200, + message="Questions retrieved successfully", + data=jsonable_encoder(questions) + ) + +from fastapi import HTTPException + +@community_questions.get("/{question_id}", response_model=CommunityQuestionWithAnswers) +async def get_question_with_answers( + question_id: str, + db: Session = Depends(get_db) +): + """Get a specific question with all its answers""" + + question = community_question_service.fetch_by_id(db=db, question_id=question_id) + + if question is None: + raise HTTPException( + status_code=404, + detail=f"Question with ID {question_id} not found" + ) + + answers = community_answer_service.fetch_by_question_id(db=db, question_id=question_id) + + question_data = jsonable_encoder(question) + question_data["answers"] = jsonable_encoder(answers) + question_data["answer_count"] = len(answers) + + return success_response( + status_code=200, + message="Question and answers retrieved successfully", + data=question_data + ) + + +@community_questions.get("/user/{user_id}", response_model=List[CommunityQuestionResponse]) +async def get_user_questions( + user_id: str, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Get all questions asked by a specific user""" + + # Check if user is requesting their own questions or is an admin + if current_user.id != user_id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only view your own questions unless you're an admin" + ) + + questions = community_question_service.fetch_by_user_id(db=db, user_id=user_id) + + return success_response( + status_code=200, + message=f"Questions for user {user_id} retrieved successfully", + data=jsonable_encoder(questions) + ) + +@community_questions.put("/{question_id}", response_model=CommunityQuestionResponse) +async def update_question( + question_id: str, + question_update: CommunityQuestionCreate, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Update a specific question""" + + # Fetch the question first to check ownership + existing_question = community_question_service.fetch_by_id(db=db, question_id=question_id) + + # Check if user is the owner or an admin + if existing_question.user_id != current_user.id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only update your own questions" + ) + + update_data = { + "title": question_update.title, + "message": question_update.message + } + + updated_question = community_question_service.update_question( + db=db, + question_id=question_id, + update_data=update_data + ) + + return success_response( + status_code=200, + message="Question updated successfully", + data=jsonable_encoder(updated_question) + ) + +@community_questions.patch("/{question_id}/resolve", response_model=CommunityQuestionResponse) +async def mark_question_resolved( + question_id: str, + is_resolved: bool = True, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Mark a question as resolved or unresolved""" + + # Fetch the question first to check ownership + existing_question = community_question_service.fetch_by_id(db=db, question_id=question_id) + + # Check if user is the owner or an admin + if existing_question.user_id != current_user.id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only mark your own questions as resolved" + ) + + updated_question = community_question_service.mark_as_resolved( + db=db, + question_id=question_id, + is_resolved=is_resolved + ) + + return success_response( + status_code=200, + message=f"Question marked as {'resolved' if is_resolved else 'unresolved'} successfully", + data=jsonable_encoder(updated_question) + ) + +@community_questions.delete("/{question_id}", status_code=status.HTTP_200_OK) +async def delete_question( + question_id: str, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Delete a specific question""" + + # Fetch the question first to check ownership + existing_question = community_question_service.fetch_by_id(db=db, question_id=question_id) + + # Check if user is the owner or an admin + if existing_question.user_id != current_user.id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only delete your own questions" + ) + + result = community_question_service.delete_question(db=db, question_id=question_id) + + return success_response( + status_code=200, + message=result["detail"] + ) + +# Router for Answers +community_answers = APIRouter(prefix="/community/answers", tags=["Community Answers"]) + +@community_answers.post("/create", status_code=status.HTTP_201_CREATED) +async def create_answer( + answer: CommunityAnswerCreate, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Create a new answer to a community question""" + + new_answer = community_answer_service.create_answer( + db=db, + message=answer.message, + user_id=current_user.id, # Use the authenticated user's ID + question_id=answer.question_id + ) + + return success_response( + status_code=201, + message="Answer created successfully", + data=jsonable_encoder(new_answer) + ) + +@community_answers.get("/question/{question_id}", response_model=List[CommunityAnswerResponse]) +async def get_question_answers( + question_id: str, + db: Session = Depends(get_db) +): + """Get all answers for a specific question""" + + answers = community_answer_service.fetch_by_question_id(db=db, question_id=question_id) + + return success_response( + status_code=200, + message="Answers retrieved successfully", + data=jsonable_encoder(answers) + ) + +@community_answers.get("/user/{user_id}", response_model=List[CommunityAnswerResponse]) +async def get_user_answers( + user_id: str, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Get all answers provided by a specific user""" + + # Check if user is requesting their own answers or is an admin + if current_user.id != user_id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only view your own answers unless you're an admin" + ) + + answers = community_answer_service.fetch_by_user_id(db=db, user_id=user_id) + + return success_response( + status_code=200, + message=f"Answers for user {user_id} retrieved successfully", + data=jsonable_encoder(answers) + ) + +@community_answers.put("/{answer_id}", response_model=CommunityAnswerResponse) +async def update_answer( + answer_id: str, + answer_update: CommunityAnswerCreate, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Update a specific answer""" + + # Fetch the answer first to check ownership + existing_answer = community_answer_service.fetch_by_id(db=db, answer_id=answer_id) + + # Check if user is the owner or an admin + if existing_answer.user_id != current_user.id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only update your own answers" + ) + + update_data = { + "message": answer_update.message + } + + updated_answer = community_answer_service.update_answer( + db=db, + answer_id=answer_id, + update_data=update_data + ) + + return success_response( + status_code=200, + message="Answer updated successfully", + data=jsonable_encoder(updated_answer) + ) + +@community_answers.patch("/{answer_id}/accept", response_model=CommunityAnswerResponse) +async def mark_answer_accepted( + answer_id: str, + is_accepted: bool = True, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Mark an answer as accepted or not accepted""" + + # Fetch the answer + answer = community_answer_service.fetch_by_id(db=db, answer_id=answer_id) + + # Fetch the question to check ownership + question = community_question_service.fetch_by_id(db=db, question_id=answer.question_id) + + # Only the question owner or an admin can mark an answer as accepted + if question.user_id != current_user.id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only the question owner can mark an answer as accepted" + ) + + updated_answer = community_answer_service.mark_as_accepted( + db=db, + answer_id=answer_id, + is_accepted=is_accepted + ) + + return success_response( + status_code=200, + message=f"Answer marked as {'accepted' if is_accepted else 'not accepted'} successfully", + data=jsonable_encoder(updated_answer) + ) + +@community_answers.delete("/{answer_id}", status_code=status.HTTP_200_OK) +async def delete_answer( + answer_id: str, + current_user: User = Depends(user_service.get_current_user), + db: Session = Depends(get_db) +): + """Delete a specific answer""" + + # Fetch the answer first to check ownership + existing_answer = community_answer_service.fetch_by_id(db=db, answer_id=answer_id) + + # Check if user is the owner or an admin + if existing_answer.user_id != current_user.id and not current_user.is_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You can only delete your own answers" + ) + + result = community_answer_service.delete_answer(db=db, answer_id=answer_id) + + return success_response( + status_code=200, + message=result["detail"] + ) \ No newline at end of file diff --git a/api/v1/schemas/community.py b/api/v1/schemas/community.py new file mode 100644 index 000000000..932c7646f --- /dev/null +++ b/api/v1/schemas/community.py @@ -0,0 +1,53 @@ +from datetime import datetime +from typing import List, Optional, Union +from pydantic import BaseModel, Field + + +# Question models +class CommunityQuestionCreate(BaseModel): + title: str + message: str + user_id: str + timestamp: Optional[datetime] = Field(default_factory=datetime.now) + + +class CommunityQuestionResponse(BaseModel): + id: str + title: str + message: str + user_id: str + timestamp: datetime + is_resolved: bool + + model_config = {"from_attributes": True} + + +# Answer models +class CommunityAnswerCreate(BaseModel): + message: str + user_id: str + question_id: str + timestamp: Optional[datetime] = Field(default_factory=datetime.now) + + +class CommunityAnswerResponse(BaseModel): + id: str + message: str + user_id: str + question_id: str + timestamp: datetime + is_accepted: bool + + model_config = {"from_attributes": True} + + +# Extended response models for nested data +class CommunityAnswerWithUser(CommunityAnswerResponse): + user_name: str # Assuming you have a user_name field + + +class CommunityQuestionWithAnswers(CommunityQuestionResponse): + answers: List[CommunityAnswerWithUser] = [] + answer_count: int = 0 + + model_config = {"from_attributes": True} \ No newline at end of file diff --git a/api/v1/services/community.py b/api/v1/services/community.py new file mode 100644 index 000000000..00b380c9b --- /dev/null +++ b/api/v1/services/community.py @@ -0,0 +1,142 @@ +from sqlalchemy.orm import Session +from fastapi import HTTPException, status +from sqlalchemy.exc import SQLAlchemyError +from typing import Optional, Any, List, Dict +from api.utils.community_base_service import BaseService +from api.v1.models.community import CommunityQuestion, CommunityAnswer + + +class CommunityQuestionService(BaseService): + """Community Question service""" + def __init__(self,model) -> Any: + super().__init__(model) + + def create_question(self, db: Session, title: str, message: str, user_id: str): + """Creates a new community question""" + try: + question = CommunityQuestion( + title=title, + message=message, + user_id=user_id + ) + db.add(question) + db.commit() + db.refresh(question) + return question + except SQLAlchemyError as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create question: {str(e)}" + ) + + def fetch_all(self, db: Session, **query_params: Optional[Any]): + """Fetch all questions with option to search using query parameters""" + return super().fetch_all(db, **query_params) + + def fetch_by_id(self, db: Session, question_id: str): + """Fetch a question by its ID""" + return super().fetch_by_id(db, question_id) + + def fetch_by_user_id(self, db: Session, user_id: str): + """Fetch all questions by a specific user""" + return db.query(CommunityQuestion).filter(CommunityQuestion.user_id == user_id).all() + + def update_question(self, db: Session, question_id: str, update_data: Dict[str, Any]): + """Update a question with the provided data""" + return super().update(db, question_id, update_data) + + def mark_as_resolved(self, db: Session, question_id: str, is_resolved: bool = True): + """Mark a question as resolved or unresolved""" + return self.update_question(db, question_id, {"is_resolved": is_resolved}) + + def delete_question(self, db: Session, question_id: str): + """Delete a question by its ID""" + return super().delete(db, question_id) + +class CommunityAnswerService(BaseService): + """Community Answer service""" + def __init__(self,model): + super().__init__(model) + + def create_answer(self, db: Session, message: str, user_id: str, question_id: str): + """Creates a new answer to a community question""" + # First verify the question exists + question = db.query(CommunityQuestion).filter(CommunityQuestion.id == question_id).first() + if not question: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Question with ID {question_id} not found" + ) + + try: + answer = CommunityAnswer( + message=message, + user_id=user_id, + question_id=question_id + ) + db.add(answer) + db.commit() + db.refresh(answer) + return answer + except SQLAlchemyError as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create answer: {str(e)}" + ) + + def fetch_all(self, db: Session, **query_params: Optional[Any]): + """Fetch all answers with option to search using query parameters""" + return super().fetch_all(db, **query_params) + + def fetch_by_id(self, db: Session, answer_id: str): + """Fetch an answer by its ID""" + return super().fetch_by_id(db, answer_id) + + def fetch_by_question_id(self, db: Session, question_id: str): + """Fetch all answers for a specific question""" + return db.query(CommunityAnswer).filter(CommunityAnswer.question_id == question_id).all() + + def fetch_by_user_id(self, db: Session, user_id: str): + """Fetch all answers by a specific user""" + return db.query(CommunityAnswer).filter(CommunityAnswer.user_id == user_id).all() + + def update_answer(self, db: Session, answer_id: str, update_data: Dict[str, Any]): + """Update an answer with the provided data""" + return super().update(db, answer_id, update_data) + def mark_as_accepted(self, db: Session, answer_id: str, is_accepted: bool = True): + """Mark an answer as accepted or not accepted""" + answer = self.fetch_by_id(db, answer_id) + + try: + # If marking as accepted, unmark any previously accepted answers for this question + if is_accepted: + previously_accepted = db.query(CommunityAnswer).filter( + CommunityAnswer.question_id == answer.question_id, + CommunityAnswer.is_accepted == True, + CommunityAnswer.id != answer_id + ).all() + + for prev_answer in previously_accepted: + prev_answer.is_accepted = False + + # Mark the current answer + answer.is_accepted = is_accepted + db.commit() + db.refresh(answer) + return answer + except SQLAlchemyError as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to update answer acceptance status: {str(e)}" + ) + + def delete_answer(self, db: Session, answer_id: str): + """Delete an answer by its ID""" + return super().delete(db, answer_id) + +# Create service instances +community_question_service = CommunityQuestionService(CommunityQuestion) +community_answer_service = CommunityAnswerService(CommunityAnswer) \ No newline at end of file diff --git a/tests/v1/community/base.py b/tests/v1/community/base.py new file mode 100644 index 000000000..a669baa7e --- /dev/null +++ b/tests/v1/community/base.py @@ -0,0 +1,71 @@ +import pytest +from unittest.mock import patch, MagicMock +from api.v1.models.user import User +from api.v1.services.user import user_service +from main import app +from uuid_extensions import uuid7 +from api.db.database import get_db +from api.v1.models.community import CommunityQuestion +from fastapi import status +from datetime import datetime, timezone, timedelta + + +CREATE_QUESTION_ENDPOINT = '/api/v1/community/questions/create' +GET_ALL_QUESTIONS_ENDPOINT = '/api/v1/community/questions' +GET_QUESTION_BY_ID_ENDPOINT = '/api/v1/community/questions/{question_id}' +GET_QUESTIONS_BY_USER_ENDPOINT = '/api/v1/community/questions/user/{user_id}' +UPDATE_QUESTION_ENDPOINT = '/api/v1/community/questions/{question_id}' +MARK_RESOLVED_ENDPOINT = '/api/v1/community/questions/{question_id}/resolve' +DELETE_QUESTION_ENDPOINT = '/api/v1/community/questions/{question_id}' + +@pytest.fixture +def mock_db_session(): + """Fixture to create a mock database session.""" + with patch("api.v1.services.user.get_db", autospec=True) as mock_get_db: + mock_db = MagicMock() + app.dependency_overrides[get_db] = lambda: mock_db + yield mock_db + app.dependency_overrides = {} + +@pytest.fixture +def mock_user_service(): + """Fixture to create a mock user service.""" + + with patch("api.v1.services.user.user_service", autospec=True) as mock_service: + yield mock_service + + +def create_mock_user(mock_user_service, mock_db_session, is_superadmin=True): + """Create a mock user in the mock database session.""" + mock_user = User( + id=str(uuid7()), + email="testuser@gmail.com", + password=user_service.hash_password("Testpassword@123"), + first_name='Test', + last_name='User', + is_active=True, + is_superadmin=is_superadmin, + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc) + ) + mock_db_session.query.return_value.filter.return_value.first.return_value = mock_user + return mock_user + +@pytest.fixture +def mock_question_service(): + """Fixture to create a mock activity log service.""" + with patch("api.v1.services.community.community_question_service",autospec=True) as mock_service: + yield mock_service + +def create_mock_question(mock_db_session, question_id="1", title="Test Question", message="Test message", user_id="101", is_resolved=False): + """Create a mock community question in the mock database session.""" + mock_question = CommunityQuestion( + id=question_id, + title=title, + message=message, + user_id=user_id, + is_resolved=is_resolved, + timestamp="2023-01-01T00:00:00" + ) + mock_db_session.query.return_value.filter.return_value.first.return_value = mock_question + return mock_question diff --git a/tests/v1/community/test_answers.py b/tests/v1/community/test_answers.py new file mode 100644 index 000000000..ee5398a9a --- /dev/null +++ b/tests/v1/community/test_answers.py @@ -0,0 +1,207 @@ +import pytest +from fastapi.testclient import TestClient +from unittest.mock import patch, MagicMock +from main import app +from api.v1.models.community import CommunityAnswer, CommunityQuestion +from api.v1.services.community import community_answer_service, community_question_service +from api.v1.services.user import user_service +from api.db.database import get_db +from fastapi import status +from uuid_extensions import uuid7 +from tests.v1.community.base import create_mock_user,mock_user_service +client = TestClient(app) + +# API Endpoints +CREATE_ANSWER_ENDPOINT = '/api/v1/community/answers/create' +GET_ALL_ANSWERS_ENDPOINT = '/api/v1/community/answers' +GET_ANSWER_BY_ID_ENDPOINT = '/api/v1/community/answers/{answer_id}' +GET_ANSWERS_BY_QUESTION_ENDPOINT = '/api/v1/community/answers/question/{question_id}' +GET_ANSWERS_BY_USER_ENDPOINT = '/api/v1/community/answers/user/{user_id}' +UPDATE_ANSWER_ENDPOINT = '/api/v1/community/answers/{answer_id}' +MARK_ACCEPTED_ENDPOINT = '/api/v1/community/answers/{answer_id}/accept' +DELETE_ANSWER_ENDPOINT = '/api/v1/community/answers/{answer_id}' + +@pytest.fixture +def mock_db_session(): + """Fixture to create a mock database session.""" + with patch("api.v1.services.user.get_db", autospec=True) as mock_get_db: + mock_db = MagicMock() + app.dependency_overrides[get_db] = lambda: mock_db + yield mock_db + app.dependency_overrides = {} + +@pytest.fixture +def mock_answer_service(): + """Fixture to create a mock community answer service.""" + with patch("api.v1.services.community.community_answer_service", autospec=True) as mock_service: + yield mock_service + +@pytest.fixture +def mock_question_service(): + """Fixture to create a mock community question service.""" + with patch("api.v1.services.community.community_question_service", autospec=True) as mock_service: + yield mock_service + +@pytest.fixture +def mock_auth_user(): + """Fixture to create a mock authenticated user.""" + mock_user = MagicMock() + mock_user.id = "101" + mock_user.is_admin = True + + app.dependency_overrides[user_service.get_current_user] = lambda: mock_user + yield mock_user + if user_service.get_current_user in app.dependency_overrides: + app.dependency_overrides.pop(user_service.get_current_user) + +def create_mock_answer(mock_db_session, answer_id="1", message="Test answer", user_id="101", question_id="201", is_accepted=False): + """Create a mock community answer in the mock database session.""" + mock_answer = CommunityAnswer( + id=answer_id, + message=message, + user_id=user_id, + question_id=question_id, + is_accepted=is_accepted, + timestamp="2023-01-01T00:00:00" + ) + mock_db_session.query.return_value.filter.return_value.first.return_value = mock_answer + return mock_answer + +def create_mock_question(mock_db_session, question_id="201", title="Test Question", message="Test message", user_id="101"): + """Create a mock community question for testing answers.""" + mock_question = CommunityQuestion( + id=question_id, + title=title, + message=message, + user_id=user_id, + timestamp="2023-01-01T00:00:00" + ) + return mock_question + +# Test cases for each service method +@pytest.mark.usefixtures("mock_db_session", "mock_answer_service", "mock_user_service") +def test_create_answer(mock_answer_service, mock_db_session, mock_user_service): + """Test for creating a community answer.""" + mock_message = "This is a test answer" + mock_question_id = "201" + mock_user = create_mock_user(mock_user_service, mock_db_session) + mock_answer = create_mock_answer( + mock_db_session, + message=mock_message, + user_id=mock_user.id, + question_id=mock_question_id + ) + mock_answer_service.create_answer.return_value = mock_answer + access_token = user_service.create_access_token(user_id=str(uuid7())) + + response = client.post( + CREATE_ANSWER_ENDPOINT, + headers={'Authorization': f'Bearer {access_token}'}, + json={"message": mock_message,"user_id": mock_user.id,"question_id": mock_question_id} + ) + + + assert response.status_code == status.HTTP_201_CREATED + assert response.json()["status_code"] == 201 + assert response.json()["status"] == "success" + assert "data" in response.json() + assert response.json()["data"]["message"] == mock_message + assert response.json()["data"]["question_id"] == mock_question_id + + +@pytest.mark.usefixtures("mock_db_session", "mock_answer_service") +def test_get_answers_by_question(mock_answer_service, mock_db_session): + """Test for fetching community answers by question ID.""" + question_id = "201" + mock_answers = [ + create_mock_answer(mock_db_session, answer_id="1", question_id=question_id), + create_mock_answer(mock_db_session, answer_id="2", message="Another answer", question_id=question_id) + ] + mock_answer_service.fetch_by_question_id.return_value = mock_answers + + response = client.get(GET_ANSWERS_BY_QUESTION_ENDPOINT.format(question_id=question_id)) + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "success" + +@pytest.mark.usefixtures("mock_db_session", "mock_answer_service", "mock_auth_user") +def test_get_answers_by_user(mock_answer_service, mock_db_session, mock_auth_user): + """Test for fetching community answers by user ID.""" + user_id = mock_auth_user.id + mock_answers = [ + create_mock_answer(mock_db_session, answer_id="1", user_id=user_id), + create_mock_answer(mock_db_session, answer_id="2", message="Another answer", user_id=user_id) + ] + mock_answer_service.fetch_by_user_id.return_value = mock_answers + + response = client.get(GET_ANSWERS_BY_USER_ENDPOINT.format(user_id=user_id)) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "success" + +@pytest.mark.usefixtures("mock_db_session", "mock_answer_service", "mock_auth_user") +def test_update_answer(mock_answer_service, mock_db_session, mock_user_service): + """Test for updating a community answer.""" + answer_id = "1" + mock_user = create_mock_user(mock_user_service, mock_db_session) + update_data = { + "message": "Updated answer content", + "user_id": mock_user.id, # Include this since the route requires it + "question_id": "201" # Include this since the route requires it + } + + mock_answer = create_mock_answer(mock_db_session, answer_id=answer_id, user_id=mock_user.id) + mock_answer.message = update_data["message"] + + # Mock fetch_by_id to return the answer for ownership check + mock_answer_service.fetch_by_id.return_value = mock_answer + mock_answer_service.update_answer.return_value = mock_answer + + response = client.put( + UPDATE_ANSWER_ENDPOINT.format(answer_id=answer_id), + json=update_data + ) + assert response.status_code == status.HTTP_200_OK + assert response.json()["data"]["message"] == update_data["message"] + +@pytest.mark.usefixtures("mock_db_session", "mock_answer_service", "mock_question_service", "mock_auth_user") +def test_mark_answer_accepted(mock_answer_service, mock_db_session, mock_question_service, mock_auth_user): + """Test for marking a community answer as accepted.""" + answer_id = "1" + question_id = "201" + is_accepted = True + + # Create mock answer and question + mock_answer = create_mock_answer(mock_db_session, answer_id=answer_id, question_id=question_id) + mock_question = create_mock_question(mock_db_session, question_id=question_id, user_id=mock_auth_user.id) + + # Mock fetch_by_id for both answer and question + mock_answer_service.fetch_by_id.return_value = mock_answer + mock_question_service.fetch_by_id.return_value = mock_question + + # Mock mark_as_accepted + mock_answer.is_accepted = is_accepted + mock_answer_service.mark_as_accepted.return_value = mock_answer + + response = client.patch( + MARK_ACCEPTED_ENDPOINT.format(answer_id=answer_id), + json={"is_accepted": is_accepted} + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["data"]["is_accepted"] == is_accepted + +@pytest.mark.usefixtures("mock_db_session", "mock_answer_service", "mock_auth_user") +def test_delete_answer(mock_answer_service, mock_db_session, mock_auth_user): + """Test for deleting a community answer.""" + answer_id = "1" + + # Create mock answer for ownership check + mock_answer = create_mock_answer(mock_db_session, answer_id=answer_id, user_id=mock_auth_user.id) + mock_answer_service.fetch_by_id.return_value = mock_answer + + delete_result = {"status": "success", "detail": f"Answer with ID {answer_id} deleted successfully"} + mock_answer_service.delete_answer.return_value = delete_result + + response = client.delete(DELETE_ANSWER_ENDPOINT.format(answer_id=answer_id)) + + assert response.status_code == status.HTTP_200_OK \ No newline at end of file diff --git a/tests/v1/community/test_create_question_community.py b/tests/v1/community/test_create_question_community.py new file mode 100644 index 000000000..ef6177b01 --- /dev/null +++ b/tests/v1/community/test_create_question_community.py @@ -0,0 +1,38 @@ +from api.v1.services.community import community_question_service +from fastapi import status +from fastapi.testclient import TestClient +import pytest +from main import app +from uuid_extensions import uuid7 +from api.v1.services.user import user_service +from tests.v1.community.base import CREATE_QUESTION_ENDPOINT,mock_db_session,mock_question_service,create_mock_question,create_mock_user,mock_user_service +client = TestClient(app) + + + +@pytest.mark.usefixtures("mock_db_session", "mock_question_service", "mock_user_service") +def test_create_question(mock_question_service, mock_db_session,mock_user_service): + """Test for creating a community question.""" + mock_title = "Test Question" + mock_message = "This is a test question" + mock_user = create_mock_user(mock_user_service, mock_db_session) + mock_user_id = mock_user.id + mock_question = create_mock_question(mock_db_session, title=mock_title, message=mock_message, user_id=mock_user_id) + mock_question_service.create_question.return_value = mock_question + access_token = user_service.create_access_token(user_id=str(uuid7())) + response = client.post( + CREATE_QUESTION_ENDPOINT, + headers={'Authorization': f'Bearer {access_token}'}, + json={"title": mock_title, "message": mock_message, "user_id": mock_user_id} + ) + assert response.status_code == status.HTTP_201_CREATED + assert response.json() == { + "status_code": 201, + "message": "Question created successfully", + "status": "success", + "data": { + "title": mock_question.title, + "message": mock_question.message, + "user_id": mock_question.id + } + } diff --git a/tests/v1/community/test_delete_question_community.py b/tests/v1/community/test_delete_question_community.py new file mode 100644 index 000000000..9925cfeb0 --- /dev/null +++ b/tests/v1/community/test_delete_question_community.py @@ -0,0 +1,41 @@ +from fastapi import status +from fastapi.testclient import TestClient +import pytest +from main import app +from api.v1.services.user import user_service +from uuid_extensions import uuid7 +from tests.v1.community.base import DELETE_QUESTION_ENDPOINT,mock_question_service,mock_db_session,create_mock_user,mock_user_service,create_mock_question,CREATE_QUESTION_ENDPOINT +client = TestClient(app) + + +@pytest.mark.usefixtures("mock_db_session", "mock_question_service", "mock_user_service") +def test_delete_question(mock_question_service, mock_db_session, mock_user_service): + # Create mock user and question + mock_user = create_mock_user(mock_user_service, mock_db_session) + mock_question = create_mock_question( + mock_db_session, + title="Test Question", + message="This is a test question", + user_id=mock_user.id + ) + + # Set up the mock service to return success for delete operation + mock_question_service.delete_question.return_value = { + "status": "success", + "detail": f"Question with ID {mock_question.id} deleted successfully" + } + app.dependency_overrides[user_service.get_current_user] = lambda: mock_user + # Create an access token for authorization + access_token = user_service.create_access_token(user_id=str(uuid7())) + + # Make the DELETE request + response = client.delete( + DELETE_QUESTION_ENDPOINT.format(question_id=mock_question.id), + headers={'Authorization': f'Bearer {access_token}'} + ) + + # Assert response + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "success" + + app.dependency_overrides.pop(user_service.get_current_user, None) \ No newline at end of file diff --git a/tests/v1/community/test_get_questions_endpoint.py b/tests/v1/community/test_get_questions_endpoint.py new file mode 100644 index 000000000..c1ff8f4c2 --- /dev/null +++ b/tests/v1/community/test_get_questions_endpoint.py @@ -0,0 +1,86 @@ +import pytest +from fastapi import status +from fastapi.testclient import TestClient +import pytest +from api.v1.services.user import user_service +from api.v1.services.community import community_question_service +from uuid_extensions import uuid7 +from main import app +from tests.v1.community.base import create_mock_user,mock_user_service,create_mock_question,GET_ALL_QUESTIONS_ENDPOINT,GET_QUESTION_BY_ID_ENDPOINT,GET_QUESTIONS_BY_USER_ENDPOINT,mock_db_session,mock_question_service +from api.db.database import get_db +client = TestClient(app) + +@pytest.mark.usefixtures("mock_db_session", "mock_question_service","mock_user_service") +def test_get_all_questions(mock_question_service, mock_db_session): + """Test for fetching all community questions.""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + mock_questions = [ + create_mock_question(mock_db_session, question_id="1"), + create_mock_question(mock_db_session, question_id="2", title="Another Question") + ] + mock_question_service.fetch_all.return_value = mock_questions + access_token = user_service.create_access_token(user_id=str(uuid7())) + response = client.get(GET_ALL_QUESTIONS_ENDPOINT, + headers={'Authorization': f'Bearer {access_token}'}) + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "success" + +@pytest.mark.usefixtures("mock_db_session", "mock_question_service",) +def test_get_question_by_id(mock_question_service, mock_db_session): + """Test for fetching a community question by ID.""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + mock_question = create_mock_question( + mock_db_session, + title="Test Question", + message="This is a test question", + user_id=mock_user.id + ) + response = client.get(GET_QUESTION_BY_ID_ENDPOINT.format(question_id=mock_question.id)) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "success" + +@pytest.mark.usefixtures("mock_db_session", "mock_question_service") +def test_get_questions_by_user(mock_question_service, mock_db_session): + """Test for fetching community questions by user ID.""" + mock_user = create_mock_user(mock_user_service, mock_db_session) + user_id = mock_user.id + mock_questions = [ + create_mock_question(mock_db_session, question_id="1", user_id=user_id), + create_mock_question(mock_db_session, question_id="2", title="Another Question", user_id=user_id) + ] + + mock_question_service.fetch_by_user_id.return_value = mock_questions + app.dependency_overrides[user_service.get_current_user] = lambda: mock_user + access_token = user_service.create_access_token(user_id=str(uuid7())) + response = client.get(GET_QUESTIONS_BY_USER_ENDPOINT.format(user_id=mock_questions[0].user_id), + headers={'Authorization': f'Bearer {access_token}'}) + assert response.status_code == status.HTTP_200_OK + assert mock_questions[0].user_id == mock_questions[1].user_id + assert response.json()["status"] == "success" + app.dependency_overrides.pop(user_service.get_current_user, None) + +@pytest.mark.usefixtures("mock_db_session") +def test_get_question_not_found(mock_db_session): + """Test for fetching a non-existent community question.""" + question_id = "3f3iiuefuin3" + + # Properly override the dependency function + def override_get_db(): + return mock_db_session + + app.dependency_overrides[get_db] = override_get_db + + # Properly mock the service function + original_fetch_by_id = community_question_service.fetch_by_id + community_question_service.fetch_by_id = lambda db, question_id: None + + try: + response = client.get(GET_QUESTION_BY_ID_ENDPOINT.format(question_id=question_id)) + # Assert the expected 404 response + assert response.status_code == status.HTTP_404_NOT_FOUND + assert response.json()["status"] == False + finally: + # Restore the original function + community_question_service.fetch_by_id = original_fetch_by_id + app.dependency_overrides.pop(get_db, None) \ No newline at end of file diff --git a/tests/v1/community/test_patch_question_community.py b/tests/v1/community/test_patch_question_community.py new file mode 100644 index 000000000..0201095b9 --- /dev/null +++ b/tests/v1/community/test_patch_question_community.py @@ -0,0 +1,46 @@ +from api.v1.services.community import community_question_service +from fastapi import status +from fastapi.testclient import TestClient +import pytest +from main import app +from api.v1.services.user import user_service +from tests.v1.community.base import create_mock_question,MARK_RESOLVED_ENDPOINT,mock_db_session,mock_question_service +client = TestClient(app) + + +@pytest.mark.usefixtures("mock_db_session", "mock_question_service") +def test_mark_question_resolved(mock_question_service, mock_db_session, mocker): + """Test for marking a community question as resolved.""" + question_id = "1" + is_resolved = True + + # Create mock user with admin privileges to bypass permission check + mock_user = mocker.Mock() + mock_user.id = "user1" + mock_user.is_admin = True + + # Create mock question + mock_question = create_mock_question(mock_db_session, question_id=question_id) + mock_question.user_id = mock_user.id # Make the mock user the owner + mock_question.is_resolved = is_resolved + + # Mock the fetch_by_id method + mock_question_service.fetch_by_id.return_value = mock_question + + # Mock the mark_as_resolved method + mock_question_service.mark_as_resolved.return_value = mock_question + + # Mock the user authentication dependency + app.dependency_overrides[user_service.get_current_user] = lambda: mock_user + + try: + response = client.patch( + MARK_RESOLVED_ENDPOINT.format(question_id=question_id), + json={"is_resolved": is_resolved} + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["data"]["is_resolved"] == True + finally: + # Clean up dependency overrides + app.dependency_overrides.pop(user_service.get_current_user, None) \ No newline at end of file diff --git a/tests/v1/community/test_update_questions.py b/tests/v1/community/test_update_questions.py new file mode 100644 index 000000000..b1d1d6b1f --- /dev/null +++ b/tests/v1/community/test_update_questions.py @@ -0,0 +1,51 @@ +from api.v1.services.community import community_question_service +from fastapi import status +from fastapi.testclient import TestClient +import pytest +from api.v1.services.user import user_service +from main import app +from tests.v1.community.base import create_mock_question,UPDATE_QUESTION_ENDPOINT,mock_db_session,mock_question_service +client = TestClient(app) + +@pytest.mark.usefixtures("mock_db_session", "mock_question_service") +def test_update_question(mock_question_service, mock_db_session, mocker): + """Test for updating a community question.""" + question_id = "1" + update_data = { + "title": "Updated Title", + "message": "Updated message content", + "user_id": "user1" + } + + # Create mock user with appropriate permissions + mock_user = mocker.Mock() + mock_user.id = "user1" + mock_user.is_admin = True + + # Create and configure mock question + mock_question = create_mock_question(mock_db_session, question_id=question_id) + mock_question.user_id = mock_user.id # Make the user the owner + mock_question.title = update_data["title"] + mock_question.message = update_data["message"] + + # Mock the fetch_by_id method to return our question for ownership check + mock_question_service.fetch_by_id.return_value = mock_question + + # Mock the update_question method + mock_question_service.update_question.return_value = mock_question + + # Mock the user authentication dependency + app.dependency_overrides[user_service.get_current_user] = lambda: mock_user + + try: + response = client.put( + UPDATE_QUESTION_ENDPOINT.format(question_id=question_id), + json=update_data + ) + + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "success" + + finally: + # Clean up dependency overrides + app.dependency_overrides.pop(user_service.get_current_user, None) \ No newline at end of file