ADD logging feature #13

This commit is contained in:
Caffeine Fueled 2025-10-16 00:09:07 +02:00
parent 2a7bab59f3
commit 4ff65e8561
Signed by: cf7
GPG key ID: CA295D643074C68C
2 changed files with 119 additions and 16 deletions

View file

@ -21,6 +21,7 @@
- up- and download in CLI possible
- rate-limits
- optional auth token for paste creation
- logging
**Ideas**:
- integrated retention/purge function
@ -168,6 +169,8 @@ podman run --replace -d --restart=unless-stopped \
| `RATE_LIMIT` | Rate limit for uploads (format: "requests/timeframe") | `50/hour` | No |
| `URL_PATH_LENGTH` | Length of generated URL paths (number of characters) | `6` | No |
| `UPLOAD_TOKENS` | Comma-separated list of Bearer tokens for upload authentication (if set, uploads require valid token) | _(disabled)_ | No |
| `LOGGING_ENABLED` | Enable structured JSON logging to file and stdout | `false` | No |
| `LOG_LEVEL` | Logging level (INFO, WARNING, ERROR) | `INFO` | No |
Create a secure token with: `openssl rand -base64 32`.

132
main.py
View file

@ -10,6 +10,9 @@ import os
from pathlib import Path
import hashlib
from typing import Optional
import json
from datetime import datetime
import sys
BASEURL = os.getenv('BASEURL', 'http://127.0.0.1:8000')
@ -18,13 +21,80 @@ MAX_FILE_SIZE_MB = int(os.getenv('MAX_FILE_SIZE_MB', '50'))
RATE_LIMIT = os.getenv('RATE_LIMIT', '50/hour')
URL_PATH_LENGTH = int(os.getenv('URL_PATH_LENGTH', '6'))
UPLOAD_TOKENS = [t.strip() for t in os.getenv('UPLOAD_TOKENS', '').split(',') if t.strip()] if os.getenv('UPLOAD_TOKENS') else []
LOGGING_ENABLED = os.getenv('LOGGING_ENABLED', 'false').lower() in ['true', '1', 'yes']
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
LOG_FILE = 'logs/linedump.log'
limiter = Limiter(key_func=get_remote_address)
# Create logs directory and log file if logging is enabled
if LOGGING_ENABLED:
Path('logs').mkdir(exist_ok=True)
Path(LOG_FILE).touch(exist_ok=True)
def log(level: str, event: str, **kwargs):
"""Simple structured logging function"""
# Skip if logging is disabled
if not LOGGING_ENABLED:
return
# Skip logs based on level
if LOG_LEVEL == 'ERROR' and level in ['INFO', 'WARNING']:
return
if LOG_LEVEL == 'WARNING' and level == 'INFO':
return
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": level,
"event": event,
**kwargs
}
log_line = json.dumps(log_entry)
# Write to file
try:
with open(LOG_FILE, 'a') as f:
f.write(log_line + '\n')
except:
pass # Fail silently if logging fails
# Write to stdout
print(log_line, file=sys.stdout)
def get_real_ip(request: Request) -> str:
"""Get real client IP for rate limiting (supports reverse proxy)"""
# Check X-Real-IP header first (set by reverse proxy)
x_real_ip = request.headers.get("X-Real-IP")
if x_real_ip:
return x_real_ip.strip()
# Fallback to direct connection IP
return request.client.host
limiter = Limiter(key_func=get_real_ip)
app = FastAPI(title="linedump", version="1.0.0")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.exception_handler(RateLimitExceeded)
async def log_rate_limit(request: Request, exc: RateLimitExceeded):
"""Custom handler to log rate limit violations"""
log("WARNING", "rate_limit_exceeded",
client_ip=get_client_ip(request),
user_agent=request.headers.get("User-Agent", "unknown"),
endpoint=request.url.path)
return await _rate_limit_exceeded_handler(request, exc)
# Log startup
log("INFO", "application_started",
base_url=BASEURL,
max_file_size_mb=MAX_FILE_SIZE_MB,
auth_enabled=bool(UPLOAD_TOKENS))
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
@ -51,6 +121,10 @@ def validate_upload_token(request: Request) -> bool:
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
log("WARNING", "auth_failed",
client_ip=get_client_ip(request),
user_agent=request.headers.get("User-Agent", "unknown"),
reason="missing_bearer")
raise HTTPException(
status_code=401,
detail="Unauthorized",
@ -61,6 +135,10 @@ def validate_upload_token(request: Request) -> bool:
# Use constant-time comparison to prevent timing attacks
if not any(secrets.compare_digest(token, valid_token) for valid_token in UPLOAD_TOKENS):
log("WARNING", "auth_failed",
client_ip=get_client_ip(request),
user_agent=request.headers.get("User-Agent", "unknown"),
reason="invalid_token")
raise HTTPException(
status_code=401,
detail="Unauthorized",
@ -89,50 +167,72 @@ def validate_content(content: str) -> bool:
@app.post("/", response_class=PlainTextResponse)
@limiter.limit(RATE_LIMIT)
async def upload_text(request: Request, authorized: bool = Depends(validate_upload_token)):
client_ip = get_client_ip(request)
user_agent = request.headers.get("User-Agent", "unknown")
body = await request.body()
content = body.decode('utf-8', errors='ignore')
if not validate_content(content):
log("WARNING", "upload_failed",
client_ip=client_ip,
user_agent=user_agent,
reason="invalid_content",
size_bytes=len(content))
raise HTTPException(status_code=400, detail="Invalid content")
if not content.strip():
log("WARNING", "upload_failed",
client_ip=client_ip,
user_agent=user_agent,
reason="empty_content")
raise HTTPException(status_code=400, detail="Empty content")
random_path = generate_random_path()
while (UPLOAD_DIR / random_path).exists():
random_path = generate_random_path()
file_path = UPLOAD_DIR / random_path
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
log("INFO", "paste_created",
paste_id=random_path,
client_ip=client_ip,
user_agent=user_agent,
size_bytes=len(content))
return f"{BASEURL}/{random_path}\n"
except Exception as e:
# Log the actual error for debugging
import traceback
print(f"Error saving file: {e}")
print(traceback.format_exc())
log("ERROR", "upload_failed",
paste_id=random_path,
client_ip=client_ip,
user_agent=user_agent,
error=str(e))
raise HTTPException(status_code=500, detail="Failed to save file")
@app.get("/{file_path}", response_class=PlainTextResponse)
async def get_file(file_path: str):
async def get_file(file_path: str, request: Request):
if not file_path.isalnum():
raise HTTPException(status_code=404, detail="File not found")
file_location = UPLOAD_DIR / file_path
if not file_location.exists() or not file_location.is_file():
raise HTTPException(status_code=404, detail="File not found")
try:
with open(file_location, 'r', encoding='utf-8') as f:
content = f.read()
return content
except Exception:
except Exception as e:
log("ERROR", "download_failed",
paste_id=file_path,
error=str(e))
raise HTTPException(status_code=500, detail="Failed to read file")
@app.get("/", response_class=PlainTextResponse)