548 lines
17 KiB
Python
548 lines
17 KiB
Python
from fastapi import FastAPI, HTTPException, Request, Depends
|
|
from fastapi.responses import PlainTextResponse
|
|
from slowapi import Limiter, _rate_limit_exceeded_handler
|
|
from slowapi.util import get_remote_address
|
|
from slowapi.errors import RateLimitExceeded
|
|
import bleach
|
|
import secrets
|
|
import string
|
|
import os
|
|
from pathlib import Path
|
|
import hashlib
|
|
from typing import Optional
|
|
import json
|
|
from datetime import datetime
|
|
import sys
|
|
|
|
|
|
BASEURL = os.getenv('BASEURL', 'http://127.0.0.1:8000')
|
|
DESCRIPTION = os.getenv('DESCRIPTION', 'CLI-only pastebin powered by linedump.com')
|
|
MAX_FILE_SIZE_MB = int(os.getenv('MAX_FILE_SIZE_MB', '50'))
|
|
RATE_LIMIT = os.getenv('RATE_LIMIT', '50/hour')
|
|
URL_PATH_LENGTH = int(os.getenv('URL_PATH_LENGTH', '6'))
|
|
UPLOAD_TOKENS = [t.strip() for t in os.getenv('UPLOAD_TOKENS', '').split(',') if t.strip()] if os.getenv('UPLOAD_TOKENS') else []
|
|
LOGGING_ENABLED = os.getenv('LOGGING_ENABLED', 'false').lower() in ['true', '1', 'yes']
|
|
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
LOG_FILE = 'logs/linedump.log'
|
|
|
|
# Create logs directory and log file if logging is enabled
|
|
if LOGGING_ENABLED:
|
|
Path('logs').mkdir(exist_ok=True)
|
|
Path(LOG_FILE).touch(exist_ok=True)
|
|
|
|
|
|
def log(level: str, event: str, **kwargs):
|
|
"""Simple structured logging function"""
|
|
# Skip if logging is disabled
|
|
if not LOGGING_ENABLED:
|
|
return
|
|
|
|
# Skip logs based on level
|
|
if LOG_LEVEL == 'ERROR' and level in ['INFO', 'WARNING']:
|
|
return
|
|
if LOG_LEVEL == 'WARNING' and level == 'INFO':
|
|
return
|
|
|
|
log_entry = {
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
"level": level,
|
|
"event": event,
|
|
**kwargs
|
|
}
|
|
|
|
log_line = json.dumps(log_entry)
|
|
|
|
# Write to file
|
|
try:
|
|
with open(LOG_FILE, 'a') as f:
|
|
f.write(log_line + '\n')
|
|
except:
|
|
pass # Fail silently if logging fails
|
|
|
|
# Write to stdout
|
|
print(log_line, file=sys.stdout)
|
|
|
|
|
|
def get_real_ip(request: Request) -> str:
|
|
"""Get real client IP for rate limiting and logging (supports reverse proxy)"""
|
|
# Check X-Real-IP header first (set by reverse proxy)
|
|
x_real_ip = request.headers.get("X-Real-IP")
|
|
if x_real_ip:
|
|
return x_real_ip.strip()
|
|
# Fallback to direct connection IP
|
|
return request.client.host
|
|
|
|
limiter = Limiter(key_func=get_real_ip)
|
|
app = FastAPI(title="linedump", version="1.0.0")
|
|
app.state.limiter = limiter
|
|
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
|
|
|
|
|
@app.exception_handler(RateLimitExceeded)
|
|
async def log_rate_limit(request: Request, exc: RateLimitExceeded):
|
|
"""Custom handler to log rate limit violations"""
|
|
log("WARNING", "rate_limit_exceeded",
|
|
client_ip=get_real_ip(request),
|
|
user_agent=request.headers.get("User-Agent", "unknown"),
|
|
endpoint=request.url.path)
|
|
return await _rate_limit_exceeded_handler(request, exc)
|
|
|
|
|
|
# Log startup
|
|
log("INFO", "application_started",
|
|
base_url=BASEURL,
|
|
max_file_size_mb=MAX_FILE_SIZE_MB,
|
|
auth_enabled=bool(UPLOAD_TOKENS))
|
|
|
|
|
|
UPLOAD_DIR = Path("uploads")
|
|
UPLOAD_DIR.mkdir(exist_ok=True)
|
|
|
|
MAX_FILE_SIZE = MAX_FILE_SIZE_MB * 1024 * 1024 # Convert MB to bytes
|
|
|
|
def generate_random_path(length: int = None) -> str:
|
|
if length is None:
|
|
length = URL_PATH_LENGTH
|
|
alphabet = string.ascii_letters + string.digits
|
|
return ''.join(secrets.choice(alphabet) for _ in range(length))
|
|
|
|
|
|
def generate_deletion_token() -> str:
|
|
"""Generate a secure deletion token"""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
def validate_paste_id(paste_id: str) -> bool:
|
|
"""Validate paste ID to prevent path traversal and other attacks"""
|
|
# Must be alphanumeric only
|
|
if not paste_id.isalnum():
|
|
return False
|
|
# Reasonable length check (prevent extremely long IDs)
|
|
if len(paste_id) > 64:
|
|
return False
|
|
# Must not be empty
|
|
if len(paste_id) == 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
def save_metadata(paste_id: str, deletion_token: str, client_ip: str) -> None:
|
|
"""Save paste metadata to JSON file"""
|
|
# Validate paste_id before file operations
|
|
if not validate_paste_id(paste_id):
|
|
raise ValueError("Invalid paste ID")
|
|
|
|
meta_path = UPLOAD_DIR / f"{paste_id}.meta"
|
|
|
|
# Ensure resolved path is within UPLOAD_DIR (prevent path traversal)
|
|
if not str(meta_path.resolve()).startswith(str(UPLOAD_DIR.resolve())):
|
|
raise ValueError("Invalid paste ID: path traversal detected")
|
|
|
|
metadata = {
|
|
"deletion_token": deletion_token,
|
|
"created_at": datetime.utcnow().isoformat() + "Z",
|
|
"client_ip": client_ip
|
|
}
|
|
with open(meta_path, 'w') as f:
|
|
json.dump(metadata, f)
|
|
|
|
|
|
def load_metadata(paste_id: str) -> Optional[dict]:
|
|
"""Load paste metadata from JSON file"""
|
|
# Validate paste_id before file operations
|
|
if not validate_paste_id(paste_id):
|
|
return None
|
|
|
|
meta_path = UPLOAD_DIR / f"{paste_id}.meta"
|
|
|
|
# Ensure resolved path is within UPLOAD_DIR (prevent path traversal)
|
|
if not str(meta_path.resolve()).startswith(str(UPLOAD_DIR.resolve())):
|
|
return None
|
|
|
|
if not meta_path.exists():
|
|
return None
|
|
try:
|
|
with open(meta_path, 'r') as f:
|
|
return json.load(f)
|
|
except:
|
|
return None
|
|
|
|
|
|
def delete_paste(paste_id: str) -> bool:
|
|
"""Delete paste and its metadata"""
|
|
# Validate paste_id before file operations
|
|
if not validate_paste_id(paste_id):
|
|
return False
|
|
|
|
paste_path = UPLOAD_DIR / paste_id
|
|
meta_path = UPLOAD_DIR / f"{paste_id}.meta"
|
|
|
|
# Ensure resolved paths are within UPLOAD_DIR (prevent path traversal)
|
|
if not str(paste_path.resolve()).startswith(str(UPLOAD_DIR.resolve())):
|
|
return False
|
|
if not str(meta_path.resolve()).startswith(str(UPLOAD_DIR.resolve())):
|
|
return False
|
|
|
|
deleted = False
|
|
if paste_path.exists():
|
|
paste_path.unlink()
|
|
deleted = True
|
|
if meta_path.exists():
|
|
meta_path.unlink()
|
|
|
|
return deleted
|
|
|
|
|
|
def validate_upload_token(request: Request) -> bool:
|
|
"""Validate upload token if authentication is enabled"""
|
|
if not UPLOAD_TOKENS:
|
|
# No tokens configured, authentication is disabled
|
|
return True
|
|
|
|
auth = request.headers.get("Authorization", "")
|
|
if not auth.startswith("Bearer "):
|
|
log("WARNING", "auth_failed",
|
|
client_ip=get_real_ip(request),
|
|
user_agent=request.headers.get("User-Agent", "unknown"),
|
|
reason="missing_bearer")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Unauthorized",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
token = auth[7:] # Remove "Bearer " prefix
|
|
|
|
# Use constant-time comparison to prevent timing attacks
|
|
if not any(secrets.compare_digest(token, valid_token) for valid_token in UPLOAD_TOKENS):
|
|
log("WARNING", "auth_failed",
|
|
client_ip=get_real_ip(request),
|
|
user_agent=request.headers.get("User-Agent", "unknown"),
|
|
reason="invalid_token")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Unauthorized",
|
|
headers={"WWW-Authenticate": "Bearer"}
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
def validate_content(content: str) -> bool:
|
|
"""Basic validation for content size and encoding"""
|
|
if len(content) > MAX_FILE_SIZE:
|
|
return False
|
|
|
|
# Check for null bytes (file system attacks)
|
|
if '\x00' in content:
|
|
return False
|
|
|
|
try:
|
|
# Ensure it's valid UTF-8
|
|
content.encode('utf-8')
|
|
return True
|
|
except UnicodeEncodeError:
|
|
return False
|
|
|
|
@app.post("/", response_class=PlainTextResponse)
|
|
@limiter.limit(RATE_LIMIT)
|
|
async def upload_text(request: Request, authorized: bool = Depends(validate_upload_token)):
|
|
|
|
client_ip = get_real_ip(request)
|
|
user_agent = request.headers.get("User-Agent", "unknown")
|
|
body = await request.body()
|
|
content = body.decode('utf-8', errors='ignore')
|
|
|
|
if not validate_content(content):
|
|
log("WARNING", "upload_failed",
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
reason="invalid_content",
|
|
size_bytes=len(content))
|
|
raise HTTPException(status_code=400, detail="Invalid content")
|
|
|
|
if not content.strip():
|
|
log("WARNING", "upload_failed",
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
reason="empty_content")
|
|
raise HTTPException(status_code=400, detail="Empty content")
|
|
|
|
random_path = generate_random_path()
|
|
while (UPLOAD_DIR / random_path).exists():
|
|
random_path = generate_random_path()
|
|
|
|
file_path = UPLOAD_DIR / random_path
|
|
|
|
try:
|
|
# Generate deletion token
|
|
deletion_token = generate_deletion_token()
|
|
|
|
# Save paste content
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
# Save metadata with deletion token
|
|
save_metadata(random_path, deletion_token, client_ip)
|
|
|
|
log("INFO", "paste_created",
|
|
paste_id=random_path,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
size_bytes=len(content))
|
|
|
|
# Return URL and deletion token
|
|
return f"{BASEURL}/{random_path}\nDelete with HTTP POST: {BASEURL}/{random_path}?token={deletion_token}\n"
|
|
|
|
except Exception as e:
|
|
log("ERROR", "upload_failed",
|
|
paste_id=random_path,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
error=str(e))
|
|
raise HTTPException(status_code=500, detail="Failed to save file")
|
|
|
|
@app.get("/{paste_id}", response_class=PlainTextResponse)
|
|
async def get_file(paste_id: str, request: Request, token: Optional[str] = None):
|
|
"""Get paste content or delete if token is provided"""
|
|
if not paste_id.isalnum():
|
|
raise HTTPException(status_code=404, detail="Paste not found")
|
|
|
|
file_location = UPLOAD_DIR / paste_id
|
|
|
|
if not file_location.exists() or not file_location.is_file():
|
|
raise HTTPException(status_code=404, detail="Paste not found")
|
|
|
|
try:
|
|
with open(file_location, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
return content
|
|
except Exception as e:
|
|
log("ERROR", "download_failed",
|
|
paste_id=paste_id,
|
|
error=str(e))
|
|
raise HTTPException(status_code=500, detail="Failed to read file")
|
|
|
|
|
|
@app.post("/{paste_id}", response_class=PlainTextResponse)
|
|
async def delete_paste_endpoint(paste_id: str, request: Request, token: Optional[str] = None):
|
|
"""Delete a paste using its deletion token"""
|
|
client_ip = get_real_ip(request)
|
|
user_agent = request.headers.get("User-Agent", "unknown")
|
|
|
|
# Validate paste_id format
|
|
if not paste_id.isalnum():
|
|
raise HTTPException(status_code=404, detail="Paste not found")
|
|
|
|
# Check if token is provided (query param or header)
|
|
deletion_token = token or request.headers.get("X-Delete-Token")
|
|
if not deletion_token:
|
|
log("WARNING", "deletion_failed",
|
|
paste_id=paste_id,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
reason="missing_token")
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
|
|
# Validate token length (prevent abuse with extremely long tokens)
|
|
if len(deletion_token) > 128:
|
|
log("WARNING", "deletion_failed",
|
|
paste_id=paste_id,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
reason="invalid_token_length")
|
|
raise HTTPException(status_code=403, detail="Deletion failed")
|
|
|
|
# Load metadata
|
|
metadata = load_metadata(paste_id)
|
|
if not metadata:
|
|
log("WARNING", "deletion_failed",
|
|
paste_id=paste_id,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
reason="metadata_not_found")
|
|
raise HTTPException(status_code=404, detail="Deletion failed")
|
|
|
|
# Verify deletion token using constant-time comparison
|
|
if not secrets.compare_digest(deletion_token, metadata.get("deletion_token", "")):
|
|
log("WARNING", "deletion_failed",
|
|
paste_id=paste_id,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
reason="invalid_token")
|
|
raise HTTPException(status_code=403, detail="Deletion failed")
|
|
|
|
# Delete the paste and metadata
|
|
if delete_paste(paste_id):
|
|
log("INFO", "paste_deleted",
|
|
paste_id=paste_id,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
deletion_method="user_requested")
|
|
return "Paste deleted successfully\n"
|
|
else:
|
|
log("ERROR", "deletion_failed",
|
|
paste_id=paste_id,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
reason="file_not_found")
|
|
raise HTTPException(status_code=500, detail="Failed to delete paste")
|
|
|
|
|
|
@app.get("/", response_class=PlainTextResponse)
|
|
async def root():
|
|
# Build authentication notice and examples if tokens are configured
|
|
auth_notice = "\n- Authentication: not required"
|
|
auth_section = ""
|
|
auth_header_curl = ""
|
|
auth_header_wget = ""
|
|
auth_header_ps = ""
|
|
|
|
if UPLOAD_TOKENS:
|
|
auth_notice = "\n- Authentication: REQUIRED (Bearer token)"
|
|
auth_header_curl = '-H "Authorization: Bearer $LINEDUMP_TOKEN" '
|
|
auth_header_wget = '--header="Authorization: Bearer $LINEDUMP_TOKEN" '
|
|
auth_header_ps = ' -Headers @{"Authorization"="Bearer $env:LINEDUMP_TOKEN"}'
|
|
auth_section = f"""
|
|
|
|
████ Authentication Examples ████
|
|
|
|
When authentication is enabled, include Bearer token in Authorization header:
|
|
|
|
Set token as environment variable (recommended):
|
|
export LINEDUMP_TOKEN="your-token-here"
|
|
|
|
█ curl:
|
|
curl -H "Authorization: Bearer $LINEDUMP_TOKEN" -X POST -d "Cheers" {BASEURL}/
|
|
|
|
█ wget:
|
|
wget --header="Authorization: Bearer $LINEDUMP_TOKEN" --post-data="Cheers" -O- {BASEURL}/
|
|
|
|
█ Powershell:
|
|
$env:LINEDUMP_TOKEN="your-token-here"
|
|
Invoke-RestMethod -Uri "{BASEURL}/" -Headers @{{"Authorization"="Bearer $env:LINEDUMP_TOKEN"}} -Method Post -Body "Cheers"
|
|
"""
|
|
|
|
return f"""LD {BASEURL}
|
|
|
|
████ General ████
|
|
|
|
{DESCRIPTION}
|
|
|
|
- File limit: {MAX_FILE_SIZE_MB} MB
|
|
- Rate limit: {RATE_LIMIT}
|
|
- text-only
|
|
- no server-side encryption, consider content public or use client-side encryption{auth_notice}
|
|
{auth_section}
|
|
|
|
████ Usage ████
|
|
|
|
|
|
█ Upload curl:
|
|
|
|
curl {auth_header_curl}-X POST -d "Cheers" {BASEURL}/ # string
|
|
curl {auth_header_curl}-X POST {BASEURL} --data-binary @- < file.txt # file
|
|
ip -br a | curl {auth_header_curl}-X POST {BASEURL} --data-binary @- # command output
|
|
|
|
|
|
█ Upload wget:
|
|
|
|
echo "Cheers" | wget {auth_header_wget}--post-data=@- -O- {BASEURL}/ # string
|
|
wget {auth_header_wget}--post-file=file.txt -O- {BASEURL}/ # file
|
|
ip -br a | wget {auth_header_wget}--post-data=@- -O- {BASEURL}/ # command output
|
|
|
|
|
|
█ Upload Powershell:
|
|
|
|
Invoke-RestMethod -Uri "{BASEURL}/"{auth_header_ps} -Method Post -Body "Cheers" # string
|
|
Invoke-RestMethod -Uri "{BASEURL}/"{auth_header_ps} -Method Post -InFile "file.txt" # file
|
|
ipconfig | Invoke-RestMethod -Uri "{BASEURL}/"{auth_header_ps} -Method Post -Body {{ $_ }} # command output
|
|
|
|
|
|
█ Download:
|
|
|
|
curl {BASEURL}/{{path}} # print to stdout
|
|
curl -o filename.txt {BASEURL}/{{path}} # save to file
|
|
|
|
wget -O- {BASEURL}/{{path}} # print to stdout
|
|
wget -O filename.txt {BASEURL}/{{path}} # save to file
|
|
|
|
Invoke-RestMethod -Uri "{BASEURL}/{{path}}" # print to stdout
|
|
Invoke-RestMethod -Uri "{BASEURL}/{{path}}" -OutFile "filename.txt" # save to file
|
|
|
|
|
|
█ Delete:
|
|
|
|
curl -X POST "{BASEURL}/{{path}}?token={{deletion_token}}" # delete paste
|
|
|
|
|
|
|
|
██ Encryption Examples with curl ██
|
|
|
|
|
|
█ Upload text:
|
|
|
|
echo 'Cheers' \
|
|
| openssl enc -aes-256-cbc -pbkdf2 -salt -base64 -pass pass:yourkey \
|
|
| curl {auth_header_curl}-X POST -d @- {BASEURL}/
|
|
|
|
|
|
█ Upload file:
|
|
|
|
openssl enc -aes-256-cbc -pbkdf2 -salt -pass pass:yourkey -base64 < file.txt \
|
|
| curl {auth_header_curl}-sS -X POST {BASEURL} --data-binary @-
|
|
|
|
|
|
█ Upload command output:
|
|
|
|
ip -br a \
|
|
| openssl enc -aes-256-cbc -pbkdf2 -salt -pass pass:yourkey -base64 \
|
|
| curl {auth_header_curl}-sS -X POST {BASEURL} --data-binary @-
|
|
|
|
|
|
█ Download:
|
|
|
|
curl -s {BASEURL}/{{path}} \
|
|
| base64 -d \
|
|
| openssl enc -d -aes-256-cbc -pbkdf2 -pass pass:yourkey
|
|
|
|
|
|
|
|
██ Adv Examples ██
|
|
|
|
|
|
█ Multiple commands:
|
|
|
|
{{ cmd() {{ printf "\\n# %s\\n" "$*"; "$@"; }}; \\
|
|
cmd hostname; \\
|
|
cmd ip -br a; \\
|
|
}} 2>&1 | curl {auth_header_curl}-X POST {BASEURL} --data-binary @-
|
|
|
|
|
|
█ Continous command:
|
|
|
|
(timeout --signal=INT --kill-after=5s 10s \\
|
|
ping 127.1; \\
|
|
echo "--- Terminated ---") | \\
|
|
curl {auth_header_curl}-X POST --data-binary @- {BASEURL}
|
|
|
|
|
|
|
|
████ Further Information ████
|
|
|
|
|
|
Powered by linedump
|
|
|
|
Source:
|
|
https://git.uphillsecurity.com/cf7/linedump
|
|
|
|
License:
|
|
Apache-2.0
|
|
https://git.uphillsecurity.com/cf7/linedump/src/branch/main/LICENSE
|
|
|
|
"""
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|