diff --git a/api/utils/pagination.py b/api/utils/pagination.py index 0fab89c9b..c6213aef5 100644 --- a/api/utils/pagination.py +++ b/api/utils/pagination.py @@ -69,8 +69,13 @@ def paginated_response( if filters and join is None: # Apply filters for attr, value in filters.items(): + if value is not None: - query = query.filter(getattr(model, attr).like(f"%{value}%")) + column = getattr(model, attr) + if isinstance(value, bool): + query = query.filter(column == value) + else: + query = query.filter(column.like(f"%{value}%")) elif filters and join is not None: # Apply filters @@ -84,10 +89,7 @@ def paginated_response( results = jsonable_encoder(query.offset(skip).limit(limit).all()) total_pages = int(total / limit) + (total % limit > 0) - return success_response( - status_code=200, - message="Successfully fetched items", - data={ + return { "pages": total_pages, "total": total, "skip": skip, @@ -102,4 +104,4 @@ def paginated_response( } ) } - ) + diff --git a/api/v1/routes/blog.py b/api/v1/routes/blog.py index fd5bc9afd..5a875d70b 100644 --- a/api/v1/routes/blog.py +++ b/api/v1/routes/blog.py @@ -42,7 +42,7 @@ def create_blog( if not current_user: raise HTTPException(status_code=401, detail="You are not Authorized") blog_service = BlogService(db) - new_blogpost = blog_service.create(db=db, schema=blog, author_id=current_user.id) + new_blogpost = blog_service.create(schema=blog, author_id=current_user.id) return success_response( message="Blog created successfully!", @@ -55,7 +55,18 @@ def create_blog( def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0): """Endpoint to get all blogs""" - return paginated_response( + blog = paginated_response( + db=db, + model=Blog, + limit=limit, + skip=skip, + ) + return success_response(200, message="Successfully fetched all blogs", data=blog) + +@blog.get("/active", response_model=success_response) +def get_all_active_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0): + + blog = paginated_response( db=db, model=Blog, limit=limit, @@ -63,6 +74,21 @@ def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0) filters={"is_deleted": False} #filter out soft-deleted blogs ) + return success_response(200, message="Successfully fetched active blogs", data=blog) + +@blog.get("/archive", response_model=success_response) +def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0): + + blog = paginated_response( + db=db, + model=Blog, + limit=limit, + skip=skip, + filters={"is_deleted": True} + ) + return success_response(200, message="Successfully fetched all archived blogs", data=blog) + + # blog search endpoint @blog.get("/search", response_model=BlogSearchResponse) def search_blogs( @@ -174,6 +200,8 @@ def search_blogs( "total_results": search_results["total"], "blogs": processed_blogs } + return success_response(200, message="Successfully fetched all blogs", data=blog) + @blog.get("/{id}", response_model=BlogPostResponse) def get_blog_by_id(id: str, db: Session = Depends(get_db)): @@ -361,11 +389,10 @@ async def archive_blog_post( db: Session = Depends(get_db), current_user: User = Depends(user_service.get_current_super_admin), ): - """Endpoint to archive/soft-delete a blog post""" blog_service = BlogService(db=db) - blog_post = blog_service.fetch(blog_id=id) + blog_post = blog_service.fetch(blog_id=blog_id) if not blog_post: raise HTTPException(status_code=404, detail="Post not found") #check if admin/ authorized user @@ -382,6 +409,34 @@ async def archive_blog_post( data=jsonable_encoder(blog_post), ) +@blog.put("/{blog_id}/restore") +async def restore_blog_post( + blog_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(user_service.get_current_super_admin), +): + + """Endpoint to restore a soft-deleted blog post""" + + blog_service = BlogService(db=db) + blog_post = blog_service.fetch(blog_id=blog_id) + if not blog_post: + raise HTTPException(status_code=404, detail="Post not found") + #check if admin/ authorized user + if not (blog_post.author_id != current_user.id or current_user.is_superadmin): + raise HTTPException(status_code=403, detail="You don't have permission to perform this action") + if not blog_post.is_deleted: + raise HTTPException(status_code=400, detail="Blog post is already active") + blog_post.is_deleted = False + db.commit() + db.refresh(blog_post) + + return success_response( + message="Blog post restored successfully!", + status_code=200, + data=jsonable_encoder(blog_post), + ) + # Post a comment to a blog diff --git a/tests/v1/blog/get_all_blogs_test.py b/tests/v1/blog/get_all_blogs_test.py index 36bd414c2..a4d3c33d3 100644 --- a/tests/v1/blog/get_all_blogs_test.py +++ b/tests/v1/blog/get_all_blogs_test.py @@ -1,22 +1,14 @@ -from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock - import pytest from fastapi.testclient import TestClient from sqlalchemy.orm import Session -from uuid_extensions import uuid7 - -from api.v1.models.blog import Blog from api.v1.routes.blog import get_db - from main import app - # Mock database dependency @pytest.fixture def db_session_mock(): - db_session = MagicMock(spec=Session) - return db_session + return MagicMock(spec=Session) @pytest.fixture def client(db_session_mock): @@ -26,60 +18,35 @@ def client(db_session_mock): app.dependency_overrides = {} def test_get_all_blogs_empty(client, db_session_mock): - # Mock data - mock_blog_data = [] - mock_query = MagicMock() mock_query.count.return_value = 0 mock_query.all.return_value = [] - + + mock_query.filter.return_value = mock_query + mock_query.offset.return_value = mock_query + mock_query.limit.return_value = mock_query db_session_mock.query.return_value = mock_query - db_session_mock.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = mock_blog_data - - # Call the endpoint response = client.get("/api/v1/blogs") - # Assert the response assert response.status_code == 200 assert response.json()["data"]["items"] == [] def test_get_all_blogs_with_data(client, db_session_mock): - blog_id = str(uuid7()) - author_id = str(uuid7()) - timezone_offset = -8.0 - tzinfo = timezone(timedelta(hours=timezone_offset)) - timeinfo = datetime.now(tzinfo) - created_at = timeinfo - updated_at = timeinfo - - # Mock data mock_blog_data = [ - Blog( - id=blog_id, - author_id=author_id, - title="Test Blog", - content="Test Content", - image_url="http://example.com/image.png", - tags=["test", "blog"], - is_deleted=False, - excerpt="Test Excerpt", - created_at=created_at, - updated_at=updated_at - ) + {"id": "123", "title": "Test Blog", "content": "Test Content"} ] mock_query = MagicMock() mock_query.count.return_value = 1 - db_session_mock.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = mock_blog_data mock_query.all.return_value = mock_blog_data + mock_query.filter.return_value = mock_query + mock_query.offset.return_value = mock_query + mock_query.limit.return_value = mock_query db_session_mock.query.return_value = mock_query - # Call the endpoint response = client.get("/api/v1/blogs") - # Assert the response assert response.status_code == 200 - assert len(response.json().get('data')) >= 1 - + assert len(response.json()["data"]["items"]) >= 1 \ No newline at end of file diff --git a/tests/v1/blog/get_archive_blogs_test.py b/tests/v1/blog/get_archive_blogs_test.py new file mode 100644 index 000000000..25400935f --- /dev/null +++ b/tests/v1/blog/get_archive_blogs_test.py @@ -0,0 +1,100 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session +from uuid_extensions import uuid7 + +from api.v1.models.blog import Blog +from api.v1.routes.blog import get_db + +from main import app +@pytest.fixture +def mock_db_session(): + mock_db = MagicMock(spec=Session) + return mock_db + +@pytest.fixture +def client(db_session_mock): + app.dependency_overrides[get_db] = lambda: db_session_mock + client = TestClient(app) + yield client + app.dependency_overrides = {} + + +def test_get_archive_blogs(mock_db_session, monkeypatch): + def override_get_db(): + return mock_db_session + + monkeypatch.setattr("api.v1.routes.blog.get_db", override_get_db) + + mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = [] + + response = client.get("/blog/archive?limit=5&skip=0") + + + assert response.status_code == 200 + json_data = response.json() + assert json_data["status"] == "success" + assert json_data["message"] == "Successfully fetched all archived blogs" + assert isinstance(json_data["data"], list) + + +# def test_get_all_active_blogs(mock_db_session): +# response = client.get("/api/v1/blogs/active") +# assert response.status_code == 200 +# assert response.json()["message"] == "Successfully fetched active blogs" +# assert isinstance(response.json()["data"], list) + + + +@pytest.fixture +def mock_db_session(): + return MagicMock(spec=Session) + +@pytest.fixture +def client(mock_db_session): + app.dependency_overrides[get_db] = lambda: mock_db_session + client = TestClient(app) + yield client + app.dependency_overrides = {} + +def test_get_archive_blogs(client, mock_db_session): + mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = [] + + response = client.get("/api/v1/blogs/archive?limit=5&skip=0") + + assert response.status_code == 200 + json_data = response.json() + assert json_data["status"] == "success" + assert json_data["message"] == "Successfully fetched all archived blogs" + assert isinstance(json_data["data"]["items"], list) + + +from unittest.mock import MagicMock +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session +from api.v1.routes.blog import get_db +from main import app + +@pytest.fixture +def mock_db_session(): + return MagicMock(spec=Session) + +@pytest.fixture +def client(mock_db_session): + app.dependency_overrides[get_db] = lambda: mock_db_session + client = TestClient(app) + yield client + app.dependency_overrides = {} + +def test_get_all_active_blogs(client, mock_db_session): + mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = [] + + response = client.get("/api/v1/blogs/active") + + assert response.status_code == 200 + assert response.json()["message"] == "Successfully fetched active blogs" + assert isinstance(response.json()["data"]["items"], list) \ No newline at end of file diff --git a/tests/v1/blog/test_restore_blog.py b/tests/v1/blog/test_restore_blog.py new file mode 100644 index 000000000..8aeb363f6 --- /dev/null +++ b/tests/v1/blog/test_restore_blog.py @@ -0,0 +1,67 @@ +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session +from uuid_extensions import uuid7 + +from api.db.database import get_db +from api.v1.services.user import user_service +from api.v1.models import User +from api.v1.models.blog import Blog +from main import app + + +def mock_get_db(): + db_session = MagicMock() + yield db_session + + +def mock_get_current_super_admin(): + return User(id="1", is_superadmin=True) + +@pytest.fixture +def db_session_mock(): + db_session = MagicMock(spec=Session) + return db_session + +@pytest.fixture +def client(db_session_mock): + app.dependency_overrides[get_db] = lambda: db_session_mock + client = TestClient(app) + yield client + + +def test_restore_blog_success(client, db_session_mock): + '''Test for success in blog archiving''' + + app.dependency_overrides[get_db] = lambda: db_session_mock + app.dependency_overrides[user_service.get_current_super_admin] = lambda: mock_get_current_super_admin() + blog_id = uuid7() + mock_blog = Blog(id=blog_id, title="Test Blog", + content="Test Content", is_deleted=True) + + db_session_mock.query(Blog).filter(Blog.id==blog_id).first.return_value = mock_blog + + response = client.put(f"/api/v1/blogs/{mock_blog.id}/restore", headers={'Authorization': 'Bearer token'}) + + assert response.status_code == 200 + assert response.json()["message"] == "Blog post restored successfully!" + + +def test_restore_blog_not_found(client, db_session_mock): + '''test for blog not found''' + + db_session_mock.query(Blog).filter(Blog.id == f'{uuid7()}').first.return_value = None + + app.dependency_overrides[get_db] = lambda: db_session_mock + app.dependency_overrides[user_service.get_current_super_admin] = lambda: mock_get_current_super_admin + + response = client.put(f"/api/v1/blogs/{uuid7()}/restore", headers={'Authorization': 'Bearer token'}) + + assert response.status_code == 404 + assert response.json()["message"] == "Post not found" + + +if __name__ == "__main__": + pytest.main()