diff --git a/services/login/models/status.py b/services/login/models/status.py index 4465ddb2a3db27be8c0c1b68f729a015e942ccdb..540480f09bb28f07bc4dbf935a2f08eaaf3caec8 100644 --- a/services/login/models/status.py +++ b/services/login/models/status.py @@ -15,3 +15,8 @@ from pydantic import BaseModel class StatusResponse(BaseModel): status: bool message: Optional[str] + + +class StatusResponseError(BaseModel): + error: Optional[str] = None + error_description: Optional[str] = None diff --git a/services/login/paths/revoke.py b/services/login/paths/revoke.py index 0e2b62b7d92c3642ae08c54e137ed369856943c3..8c47badf24d875a079a70e2a47c7767a07f1e58e 100644 --- a/services/login/paths/revoke.py +++ b/services/login/paths/revoke.py @@ -11,23 +11,31 @@ from fastapi import APIRouter # NOC modules from ..auth import revoke_token, get_user_from_jwt from ..models.revoke import RevokeRequest -from ..models.status import StatusResponse +from ..models.status import StatusResponseError, StatusResponse router = APIRouter() -@router.post("/api/login/revoke", response_model=StatusResponse, tags=["login"]) +@router.post("/api/login/revoke", tags=["login"]) async def revoke(req: RevokeRequest): if req.access_token: try: get_user_from_jwt(req.access_token, audience="auth") except ValueError: - return StatusResponse(status=False, message="Invalid access token") + return StatusResponseError( + error="unauthorized_client", error_description="Invalid access token" + ) revoke_token(req.access_token) if req.refresh_token: try: get_user_from_jwt(req.refresh_token, audience="auth") except ValueError: - return StatusResponse(status=False, message="Invalid refresh token") + return StatusResponseError( + error="invalid_request", error_description="Invalid refresh token" + ) revoke_token(req.refresh_token) + if not req.access_token and not req.refresh_token: + return StatusResponseError( + error="invalid_request", error_description="Invalid refresh token" + ) return StatusResponse(status=True, message="Ok") diff --git a/services/login/paths/token.py b/services/login/paths/token.py index 34f62119cdb01043f0ae873430a29de8d8f8f072..4d846adafb2fd408cfe73727688dca9e62e93c57 100644 --- a/services/login/paths/token.py +++ b/services/login/paths/token.py @@ -11,7 +11,8 @@ from typing import Optional, Dict import codecs # Third-party modules -from fastapi import APIRouter, Request, HTTPException, Header +from fastapi import APIRouter, Request, Header +from starlette.responses import JSONResponse # NOC modules from noc.config import config @@ -32,11 +33,20 @@ async def token( if req.grant_type == "refresh_token": # Refresh token if is_revoked(req.refresh_token): - raise HTTPException(detail="Token is expired", status_code=HTTPStatus.FORBIDDEN) + return JSONResponse( + content={"error": "invalid_grant", "error_description": "Token is expired"}, + status_code=HTTPStatus.FORBIDDEN, + ) try: user = get_user_from_jwt(req.refresh_token, audience="refresh") except ValueError as e: - raise HTTPException(detail="Access denied (%s)" % e, status_code=HTTPStatus.FORBIDDEN) + return JSONResponse( + content={ + "error": "unauthorized_client", + "error_description": "Access denied (%s)" % e, + }, + status_code=HTTPStatus.FORBIDDEN, + ) revoke_token(req.refresh_token) return get_token_response(user) elif req.grant_type == "password": @@ -46,24 +56,38 @@ async def token( # CCGrantRequest + Basic auth header schema, data = authorization.split(" ", 1) if schema != "Basic": - raise HTTPException( - detail="Basic authorization header required", status_code=HTTPStatus.BAD_REQUEST + return JSONResponse( + content={ + "error": "unsupported_grant_type", + "error_description": "Basic authorization header required", + }, + status_code=HTTPStatus.BAD_REQUEST, ) auth_data = smart_text(codecs.decode(smart_bytes(data), "base64")) if ":" not in auth_data: - raise HTTPException( - detail="Invalid basic auth header", status_code=HTTPStatus.BAD_REQUEST + return JSONResponse( + content={ + "error": "invalid_request", + "error_description": "Invalid basic auth header", + }, + status_code=HTTPStatus.BAD_REQUEST, ) user, password = auth_data.split(":", 1) auth_req = {"user": user, "password": password, "ip": request.client.host} else: - raise HTTPException(detail="Invalid grant type", status_code=HTTPStatus.BAD_REQUEST) + return JSONResponse( + content={"error": "unsupported_grant_type", "error_description": "Invalid grant type"}, + status_code=HTTPStatus.BAD_REQUEST, + ) # Authenticate if auth_req: user = authenticate(auth_req) if user: return get_token_response(user) - raise HTTPException(detail="Access denied", status_code=HTTPStatus.BAD_REQUEST) + return JSONResponse( + content={"error": "invalid_scope", "error_description": "Access denied"}, + status_code=HTTPStatus.BAD_REQUEST, + ) def get_token_response(user: str) -> TokenResponse: