-
Notifications
You must be signed in to change notification settings - Fork 2
[Security] Unprotected POST /sessions #6
Copy link
Copy link
Open
Description
Security Finding Report
This issue was automatically created by TeeSec Security during a code security analysis.
Finding Details
| Field | Value |
|---|---|
| ID | POST-/sessions-41 |
| Severity | critical |
| CWE | N/A |
| OWASP | N/A |
| Endpoint | POST /sessions |
Description
Proposed Fix (confidence: 0.4):
"""FastAPI gateway for microservice runtime control."""
from __future__ import annotations
- from fastapi import FastAPI, HTTPException
+ import os
+ import secrets
+
+ from fastapi import FastAPI, HTTPException, Security
+ from fastapi.security import APIKeyHeader
from pydantic import BaseModel
import uvicorn
from ..infra.config import get_redis_url, get_queue_backend
from ..services.controller import ControllerService
app = FastAPI(title="SecNode API Pentest Platform", version="0.2.0")
controller = ControllerService()
+
+ API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=True)
+ _API_KEY = os.environ.get("SECNODE_API_KEY", "")
+
+
+ def verify_api_key(api_key: str = Security(API_KEY_HEADER)) -> str:
+ if not _API_KEY:
+ raise HTTPException(status_code=500, detail="API key not configured")
+ if not secrets.compare_digest(api_key, _API_KEY):
+ raise HTTPException(status_code=401, detail="invalid or missing API key")
+ return api_key
class SessionCreateRequest(BaseModel):
target: str
@app.get("/healthz")
async def healthz():
return {"status": "ok"}
@app.get("/readyz")
async def readyz():
checks: dict = {"controller": True}
if get_queue_backend() == "redis":
try:
import redis.asyncio as aioredis
r = aioredis.from_url(get_redis_url())
await r.ping()
await r.aclose()
checks["redis"] = True
except Exception:
checks["redis"] = False
ready = all(checks.values())
return {"ready": ready, "checks": checks}
- @app.post("/sessions")
+ @app.post("/sessions", dependencies=[Security(verify_api_key)])
async def create_session(payload: SessionCreateRequest):
session = controller.create_session(payload.target)
return session.model_dump()
- @app.post("/sessions/{session_id}/run")
+ @app.post("/sessions/{session_id}/run", dependencies=[Security(verify_api_key)])
async def run_session(session_id: str):
session = controller.get_session(session_id)
if session is None:
raise HTTPException(status_code=404, detail="session not found")
findings = await controller.run_session(session_id)
return {"session_id": session_id, "findings": [f.model_dump() for f in findings]}
- @app.get("/sessions/{session_id}")
+ @app.get("/sessions/{session_id}", dependencies=[Security(verify_api_key)])
async def get_session(session_id: str):
session = controller.get_session(session_id)
if session is None:
raise HTTPException(status_code=404, detail="session not found")
return {
"session": session.model_dump(),
"timeline": controller.timeline(session_id),
"findings": [f.model_dump() for f in controller.findings(session_id)],
"attack_paths": [p.model_dump() for p in controller.attack_paths(session_id)],
}
def run() -> None:
uvicorn.run("secnodeapi.api.server:app", host="0.0.0.0", port=8000, reload=False)Responsible Disclosure
This finding was identified through automated security analysis. We are reporting it to help improve the security of this project. If you have questions or need additional context, please comment on this issue.
Reported by TeeSec Security — automated security analysis platform
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels