# aukpad.py from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException from fastapi.responses import HTMLResponse, RedirectResponse, PlainTextResponse, FileResponse import json, re, secrets, string, time, os, threading, asyncio, hashlib from collections import defaultdict from typing import Optional app = FastAPI() application = app # alias if you prefer "application" # Environment variables USE_VALKEY = os.getenv("USE_VALKEY", "false").lower() == "true" VALKEY_URL = os.getenv("VALKEY_URL", "redis://localhost:6379/0") MAX_TEXT_SIZE = int(os.getenv("MAX_TEXT_SIZE", "5")) * 1024 * 1024 # 5MB default MAX_CONNECTIONS_PER_IP = int(os.getenv("MAX_CONNECTIONS_PER_IP", "10")) RETENTION_HOURS = int(os.getenv("RETENTION_HOURS", "48")) # Default 48 hours MAX_ROOMS = int(os.getenv("MAX_ROOMS", "10000")) TRUST_PROXY = os.getenv("TRUST_PROXY", "false").lower() == "true" DESCRIPTION = os.getenv("DESCRIPTION", "powered by aukpad.com") DOC_ID_RE = re.compile(r"^[a-zA-Z0-9_-]{1,64}$") def is_valid_doc_id(doc_id: str) -> bool: return bool(DOC_ID_RE.match(doc_id)) def get_client_ip(conn) -> str: # Only honor proxy headers when explicitly opted in — otherwise attackers # can spoof them to bypass per-IP limits. if TRUST_PROXY: xff = conn.headers.get("x-forwarded-for") if xff: return xff.split(",")[0].strip() xri = conn.headers.get("x-real-ip") if xri: return xri.strip() return conn.client.host if conn.client else "unknown" # Valkey/Redis client (initialized later if enabled) redis_client = None # In-memory rooms: {doc_id: {"text": str, "ver": int, "peers": set[WebSocket], "authed_peers": set[WebSocket], "last_access": float, "pw_hash": bytes|None, "pw_salt": bytes|None, "pw_iter": int|None}} rooms: dict[str, dict] = {} # Rate limiting: {ip: [timestamp, timestamp, ...]} rate_limits: dict[str, list] = defaultdict(list) # Failed password attempts per IP (for auth brute-force / CPU-DoS limiting) failed_auth_attempts: dict[str, list] = defaultdict(list) # Connection tracking: {ip: connection_count} connections_per_ip: dict[str, int] = defaultdict(int) # Password hashing parameters PBKDF2_ITERATIONS = 600_000 # OWASP 2023 recommendation for PBKDF2-SHA256 LEGACY_PBKDF2_ITERATIONS = 200_000 # backward-compat for pads hashed before the bump MAX_AUTH_FAILURES = 10 # failed password attempts allowed per window AUTH_FAILURE_WINDOW = 60 # seconds def random_id(n: int = 8) -> str: alphabet = string.ascii_lowercase + string.digits return "".join(secrets.choice(alphabet) for _ in range(n)) def init_valkey(): global redis_client if USE_VALKEY: try: import redis redis_client = redis.from_url(VALKEY_URL, decode_responses=True) redis_client.ping() # Test connection print(f"Valkey/Redis connected: {VALKEY_URL}") except ImportError as e: print(f"Warning: redis package import failed ({e}), falling back to memory-only storage") redis_client = None except Exception as e: print(f"Warning: Failed to connect to Valkey/Redis: {e}") redis_client = None def get_room_data_from_cache(doc_id: str) -> Optional[dict]: if redis_client: try: data = redis_client.get(f"room:{doc_id}") if data: cached = json.loads(data) # Convert hex strings back to bytes if cached.get("pw_hash"): cached["pw_hash"] = bytes.fromhex(cached["pw_hash"]) if cached.get("pw_salt"): cached["pw_salt"] = bytes.fromhex(cached["pw_salt"]) return cached except Exception as e: print(f"Cache read error for {doc_id}: {e}") return None def save_room_data_to_cache(doc_id: str, room: dict): if redis_client: try: data = { "text": room["text"], "ver": room["ver"], "last_access": room.get("last_access", time.time()), "pw_hash": room["pw_hash"].hex() if room.get("pw_hash") else None, "pw_salt": room["pw_salt"].hex() if room.get("pw_salt") else None, "pw_iter": room.get("pw_iter"), } redis_client.setex(f"room:{doc_id}", RETENTION_HOURS * 3600, json.dumps(data)) except Exception as e: print(f"Cache write error for {doc_id}: {e}") def update_room_access_time(doc_id: str): now = time.time() if doc_id in rooms: rooms[doc_id]["last_access"] = now if redis_client: try: data = redis_client.get(f"room:{doc_id}") if data: room_data = json.loads(data) room_data["last_access"] = now redis_client.setex(f"room:{doc_id}", RETENTION_HOURS * 3600, json.dumps(room_data)) # Reset TTL except Exception as e: print(f"Cache access update error for {doc_id}: {e}") def cleanup_old_rooms(): while True: try: now = time.time() cutoff = now - (RETENTION_HOURS * 3600) # Convert hours to seconds # Clean in-memory rooms to_remove = [] for doc_id, room in rooms.items(): if room.get("last_access", 0) < cutoff and len(room.get("peers", set())) == 0: to_remove.append(doc_id) for doc_id in to_remove: del rooms[doc_id] print(f"Cleaned up inactive room: {doc_id}") # Valkey/Redis has TTL, so it cleans up automatically except Exception as e: print(f"Cleanup error: {e}") time.sleep(3600) # Run every hour def check_rate_limit(client_ip: str) -> bool: now = time.time() hour_ago = now - 3600 # Clean old entries rate_limits[client_ip] = [t for t in rate_limits[client_ip] if t > hour_ago] # Check limit (50 per hour) if len(rate_limits[client_ip]) >= 50: return False # Add current request rate_limits[client_ip].append(now) return True def check_auth_rate_limit(client_ip: str) -> bool: now = time.time() cutoff = now - AUTH_FAILURE_WINDOW failed_auth_attempts[client_ip] = [t for t in failed_auth_attempts[client_ip] if t > cutoff] return len(failed_auth_attempts[client_ip]) < MAX_AUTH_FAILURES def record_auth_failure(client_ip: str): failed_auth_attempts[client_ip].append(time.time()) HTML = """ aukpad
disconnected
+
1

This pad is password protected

""" @app.get("/favicon.ico", include_in_schema=False) def favicon(): return FileResponse("favicon.ico") @app.get("/system/info", response_class=HTMLResponse) def get_system_info(): max_text_size_mb = int(os.getenv("MAX_TEXT_SIZE", "5")) html_content = f""" aukpad - System Info

System Information

Instance

{DESCRIPTION}

Simple temporary live collaboration notepad with websockets and FastAPI — pads expire automatically after a configurable retention period.

Features

Configuration

Valkey/Redis: {'Enabled' if USE_VALKEY else 'Disabled'}
Max text size: {max_text_size_mb} MB
Max connections per IP: {MAX_CONNECTIONS_PER_IP}
Retention time: {RETENTION_HOURS} hours

Open Source

Source Code: https://git.uphillsecurity.com/cf7/aukpad
License: Apache License Version 2.0, January 2004
License URL: http://www.apache.org/licenses/
""" return HTMLResponse(html_content) @app.get("/", include_in_schema=False) def root(): return RedirectResponse(url=f"/{random_id()}/", status_code=307) @app.post("/", include_in_schema=False) async def create_pad_with_content(request: Request): # Get client IP client_ip = get_client_ip(request) # Check rate limit if not check_rate_limit(client_ip): raise HTTPException(status_code=429, detail="Rate limit exceeded. Max 50 requests per hour.") # Get and validate content content = await request.body() if not content: raise HTTPException(status_code=400, detail="Empty content not allowed") try: text_content = content.decode('utf-8') except UnicodeDecodeError: raise HTTPException(status_code=400, detail="Content must be valid UTF-8") # Check for null bytes if '\x00' in text_content: raise HTTPException(status_code=400, detail="Null bytes not allowed") # Check text size limit if len(text_content.encode('utf-8')) > MAX_TEXT_SIZE: raise HTTPException(status_code=413, detail=f"Content too large. Max size: {MAX_TEXT_SIZE} bytes") doc_id = random_id() rooms[doc_id] = {"text": text_content, "ver": 1, "peers": set(), "authed_peers": set(), "last_access": time.time(), "pw_hash": None, "pw_salt": None, "pw_iter": None} # Save to cache if enabled save_room_data_to_cache(doc_id, rooms[doc_id]) # Return URL instead of redirect for CLI usage base_url = str(request.base_url).rstrip('/') return PlainTextResponse(f"{base_url}/{doc_id}/\n") @app.get("/{doc_id}/", response_class=HTMLResponse) def pad(doc_id: str): if not is_valid_doc_id(doc_id): raise HTTPException(status_code=400, detail="Invalid pad ID") # Update access time when pad is accessed update_room_access_time(doc_id) return HTMLResponse(HTML) @app.get("/{doc_id}/raw", response_class=PlainTextResponse) def get_raw_pad_content(doc_id: str, request: Request, pw: str = ""): if not is_valid_doc_id(doc_id): raise HTTPException(status_code=400, detail="Invalid pad ID") # Load room into memory if needed if doc_id not in rooms: cached_data = get_room_data_from_cache(doc_id) if cached_data: rooms[doc_id] = { "text": cached_data.get("text", ""), "ver": cached_data.get("ver", 0), "peers": set(), "authed_peers": set(), "last_access": time.time(), "pw_hash": cached_data.get("pw_hash"), "pw_salt": cached_data.get("pw_salt"), "pw_iter": cached_data.get("pw_iter"), } if doc_id not in rooms: return PlainTextResponse("") room = rooms[doc_id] # Enforce password protection if room.get("pw_hash"): if not pw: raise HTTPException(status_code=403, detail="This pad is password protected. Use ?pw=") client_ip = get_client_ip(request) if not check_auth_rate_limit(client_ip): raise HTTPException(status_code=429, detail="Too many failed attempts. Try again later.") iters = room.get("pw_iter") or LEGACY_PBKDF2_ITERATIONS candidate = hashlib.pbkdf2_hmac("sha256", pw.encode(), room["pw_salt"], iters) if candidate != room["pw_hash"]: record_auth_failure(client_ip) raise HTTPException(status_code=403, detail="Wrong password") update_room_access_time(doc_id) return PlainTextResponse(room["text"]) async def _broadcast(doc_id: str, message: dict, exclude: WebSocket | None = None, authed_only: bool = False): room = rooms.get(doc_id) if not room: return dead = [] payload = json.dumps(message) targets = room["authed_peers"] if authed_only else room["peers"] for peer in list(targets): if peer is exclude: continue try: await peer.send_text(payload) except Exception: dead.append(peer) for d in dead: room["peers"].discard(d) room["authed_peers"].discard(d) @app.websocket("/ws/{doc_id}") async def ws(doc_id: str, ws: WebSocket): # Validate doc_id format if not is_valid_doc_id(doc_id): await ws.close(code=1008, reason="Invalid pad ID") return # Get client IP for connection limiting client_ip = get_client_ip(ws) # Check connection limit per IP if connections_per_ip[client_ip] >= MAX_CONNECTIONS_PER_IP: await ws.close(code=1008, reason="Too many connections from this IP") return await ws.accept() connections_per_ip[client_ip] += 1 # Try to load room from cache first if doc_id not in rooms: cached_data = get_room_data_from_cache(doc_id) if cached_data: rooms[doc_id] = { "text": cached_data.get("text", ""), "ver": cached_data.get("ver", 0), "peers": set(), "authed_peers": set(), "last_access": time.time(), "pw_hash": cached_data.get("pw_hash"), "pw_salt": cached_data.get("pw_salt"), "pw_iter": cached_data.get("pw_iter"), } # Refuse to create new rooms when at capacity if doc_id not in rooms and len(rooms) >= MAX_ROOMS: await ws.close(code=1008, reason="Server at capacity") connections_per_ip[client_ip] = max(0, connections_per_ip[client_ip] - 1) return room = rooms.setdefault(doc_id, {"text": "", "ver": 0, "peers": set(), "authed_peers": set(), "last_access": time.time(), "pw_hash": None, "pw_salt": None, "pw_iter": None}) room["peers"].add(ws) # Update access time update_room_access_time(doc_id) # Notify all peers of updated count await _broadcast(doc_id, {"type": "peers_changed", "count": len(room["peers"])}) # Per-connection auth state: already authed if pad has no password authed = room["pw_hash"] is None if authed: room["authed_peers"].add(ws) # Send init; withhold text if protected and not yet authed await ws.send_text(json.dumps({ "type": "init", "text": room["text"] if authed else "", "ver": room["ver"], "protected": room["pw_hash"] is not None, "peers": len(room["peers"]), })) try: while True: msg = await ws.receive_text() data = json.loads(msg) if data.get("type") == "auth": if room["pw_hash"] is None: authed = True room["authed_peers"].add(ws) await ws.send_text(json.dumps({"type": "auth_ok"})) await ws.send_text(json.dumps({ "type": "init", "text": room["text"], "ver": room["ver"], "protected": False, })) else: if not check_auth_rate_limit(client_ip): await ws.send_text(json.dumps({"type": "error", "message": "Too many failed attempts. Try again later."})) continue iters = room.get("pw_iter") or LEGACY_PBKDF2_ITERATIONS candidate = hashlib.pbkdf2_hmac( "sha256", str(data.get("password", "")).encode(), room["pw_salt"], iters ) if candidate == room["pw_hash"]: authed = True room["authed_peers"].add(ws) await ws.send_text(json.dumps({"type": "auth_ok"})) await ws.send_text(json.dumps({ "type": "init", "text": room["text"], "ver": room["ver"], "protected": True, })) else: record_auth_failure(client_ip) await ws.send_text(json.dumps({"type": "error", "message": "Wrong password"})) continue if not authed: await ws.send_text(json.dumps({"type": "error", "message": "Authentication required"})) continue if data.get("type") == "edit": new_text = str(data.get("text", "")) # Check text size limit if len(new_text.encode('utf-8')) > MAX_TEXT_SIZE: await ws.send_text(json.dumps({"type": "error", "message": f"Text too large. Max size: {MAX_TEXT_SIZE} bytes"})) continue room["text"] = new_text room["ver"] += 1 room["last_access"] = time.time() # Save to cache save_room_data_to_cache(doc_id, room) await _broadcast(doc_id, { "type": "update", "text": room["text"], "ver": room["ver"], "clientId": data.get("clientId"), }, authed_only=True) elif data.get("type") == "set_password": pw = str(data.get("password", "")) if pw: salt = os.urandom(16) room["pw_hash"] = hashlib.pbkdf2_hmac("sha256", pw.encode(), salt, PBKDF2_ITERATIONS) room["pw_salt"] = salt room["pw_iter"] = PBKDF2_ITERATIONS else: room["pw_hash"] = None room["pw_salt"] = None room["pw_iter"] = None save_room_data_to_cache(doc_id, room) await _broadcast(doc_id, {"type": "protected_changed", "protected": room["pw_hash"] is not None}) except WebSocketDisconnect: pass finally: room["peers"].discard(ws) room["authed_peers"].discard(ws) await _broadcast(doc_id, {"type": "peers_changed", "count": len(room["peers"])}) # Decrement connection count for this IP connections_per_ip[client_ip] = max(0, connections_per_ip[client_ip] - 1) # Initialize Valkey/Redis and cleanup thread on startup @app.on_event("startup") async def startup_event(): init_valkey() # Start cleanup thread cleanup_thread = threading.Thread(target=cleanup_old_rooms, daemon=True) cleanup_thread.start() print("Aukpad started with cleanup routine") # Run locally: uvicorn aukpad:app --reload