Implement Pravda: Truth Social to Discord relay service
Some checks failed
Build and Push Docker Image / build (push) Failing after 1m59s

- Add core modules: database (SQLite), media (ffmpeg), discord (webhook), poller (truthbrush), server (FastAPI)
- Support video transcoding to H.264/AAC with automatic size management
- Handle message splitting for Discord limits (2000 chars, 10 attachments)
- Include interactive buttons (Delete, View Raw, Original Post)
- Add Dockerfile with ffmpeg and entrypoint script
- Add Gitea Actions workflow for CI/CD
- Configure code style tools (black, ruff, mypy)
- Include basic unit tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-23 01:01:59 -05:00
parent af530372f3
commit c75856ff44
17 changed files with 1491 additions and 0 deletions

View File

@@ -0,0 +1,42 @@
name: Build and Push Docker Image
on:
push:
branches:
- '*'
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Login to Gitea Registry
uses: docker/login-action@v3
with:
registry: gitea.stevedudenhoeffer.com
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Extract branch name
id: branch
run: echo "name=${GITHUB_REF_NAME:-${{ gitea.ref_name }}}" >> $GITHUB_OUTPUT
- name: Build and push (branch tag)
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: gitea.stevedudenhoeffer.com/steve/pravda:${{ steps.branch.outputs.name }}
- name: Build and push (latest tag)
if: steps.branch.outputs.name == 'main'
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: gitea.stevedudenhoeffer.com/steve/pravda:latest

82
.gitignore vendored Normal file
View File

@@ -0,0 +1,82 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.idea/
.vscode/
*.swp
*.swo
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# ruff
.ruff_cache/
# Data
/data/
*.db
# Temp files
*.tmp
*.temp

31
Dockerfile Normal file
View File

@@ -0,0 +1,31 @@
FROM python:3.12-slim
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
git \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create data directory
RUN mkdir -p /data
# Make entrypoint executable
RUN chmod +x /app/entrypoint.sh
# Expose port for FastAPI server
EXPOSE 8080
# Set entrypoint
ENTRYPOINT ["/app/entrypoint.sh"]

144
README.md Normal file
View File

@@ -0,0 +1,144 @@
# Pravda
A Python-based Dockerized service that monitors Truth Social feeds and relays posts to Discord via webhooks. Features intelligent media handling (transcoding/splitting) and interactive message components.
## Features
- Monitors Truth Social feeds using the `truthbrush` library
- Relays posts to Discord with full media support
- Automatic video transcoding to H.264/AAC for Discord compatibility
- Smart file size management (keeps videos under 50MB)
- Message splitting for long posts (Discord's 2000 char limit)
- Interactive buttons for deleting messages and viewing raw post data
- SQLite database for tracking processed posts
- FastAPI server for handling Discord interaction webhooks
## Requirements
- Python 3.12+
- Docker (for containerized deployment)
- ffmpeg (for video transcoding)
- A Discord webhook URL
- A Discord application (for interaction components)
## Environment Variables
| Variable | Description | Required |
|----------|-------------|----------|
| `DISCORD_WEBHOOK` | Discord Webhook URL for posting messages | Yes |
| `DISCORD_APPLICATION_ID` | Discord Application ID for components | Yes |
| `DISCORD_PUBLIC_KEY` | Public Key for verifying interaction signatures | Yes (if using interactions) |
| `POLL_INTERVAL` | Seconds between checks (default: 300) | No |
| `TRUTH_USER` | Target username (default: realDonaldTrump) | No |
| `DB_PATH` | Path to SQLite DB (default: /data/seen.db) | No |
## Quick Start
### Local Development
```bash
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export DISCORD_WEBHOOK="your_webhook_url"
export DISCORD_APPLICATION_ID="your_app_id"
export DISCORD_PUBLIC_KEY="your_public_key"
# Run locally
python main.py
```
### Docker
```bash
# Build image
docker build -t pravda .
# Run in daemon mode (Poller + Server)
docker run -d \
-e DISCORD_WEBHOOK="your_webhook_url" \
-e DISCORD_APPLICATION_ID="your_app_id" \
-e DISCORD_PUBLIC_KEY="your_public_key" \
-v $(pwd)/data:/data \
-p 8080:8080 \
pravda
# One-off mode: relay a specific post
docker run --rm \
-e DISCORD_WEBHOOK="your_webhook_url" \
pravda https://truthsocial.com/@realDonaldTrump/posts/123456789
```
## Architecture
The application runs two concurrent processes:
1. **The Poller**: Loops infinitely, checking for new posts. Processed posts are cached in SQLite to prevent duplicates.
2. **The Server**: A FastAPI instance listening on port 8080 to handle Discord interaction webhooks (button clicks).
### Interaction Endpoints
- `POST /interactions` - Discord interaction webhook handler
- `GET /health` - Health check endpoint
## Discord Components
Messages include interactive buttons:
- **Delete**: Removes the message (requires MANAGE_MESSAGES permission)
- **View Raw**: Shows the raw post JSON (ephemeral)
- **Original Post**: Links to the original Truth Social post
## Media Handling
- **Images**: Downloaded and attached directly
- **Videos**: Transcoded to H.264/AAC MP4 format
- Automatically scaled down if over 50MB
- Resolution reduced progressively (720p, then 480p) if needed
## Development
```bash
# Install dev dependencies
pip install -r requirements.txt
pip install black ruff mypy pytest
# Format code
black .
# Lint
ruff check .
# Type check
mypy .
# Run tests
pytest
```
## Project Structure
```
pravda/
├── main.py # Application entrypoint
├── src/
│ ├── __init__.py
│ ├── database.py # SQLite operations
│ ├── discord.py # Discord webhook handling
│ ├── media.py # Media processing (ffmpeg)
│ ├── poller.py # Truth Social polling
│ └── server.py # FastAPI interaction handler
├── Dockerfile
├── entrypoint.sh
├── requirements.txt
├── pyproject.toml
└── .gitea/
└── workflows/
└── build-push.yaml
```
## License
MIT

15
entrypoint.sh Normal file
View File

@@ -0,0 +1,15 @@
#!/bin/bash
set -e
# Pravda entrypoint script
# Detects whether to run in daemon mode or one-off mode based on arguments
if [ $# -gt 0 ]; then
# One-off mode: relay a specific post URL
echo "Running in one-off mode..."
exec python main.py "$@"
else
# Daemon mode: run poller + server
echo "Running in daemon mode..."
exec python main.py
fi

65
main.py Normal file
View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Pravda - Truth Social to Discord relay service.
This is the main entrypoint that runs both the poller and the FastAPI server.
"""
import multiprocessing
import sys
import uvicorn
from src.poller import fetch_and_relay_post, poll_loop
from src.server import app
def run_server() -> None:
"""Run the FastAPI server."""
uvicorn.run(app, host="0.0.0.0", port=8080, log_level="info")
def run_poller() -> None:
"""Run the polling loop."""
poll_loop()
def daemon_mode() -> None:
"""Run both the poller and server concurrently."""
print("Starting Pravda in daemon mode...")
# Start server in a separate process
server_process = multiprocessing.Process(target=run_server, name="server")
server_process.start()
# Run poller in the main process
try:
run_poller()
except KeyboardInterrupt:
print("\nShutting down...")
finally:
server_process.terminate()
server_process.join(timeout=5)
def oneoff_mode(url: str) -> int:
"""Relay a single post by URL."""
print(f"One-off mode: relaying {url}")
success = fetch_and_relay_post(url)
return 0 if success else 1
def main() -> int:
"""Main entrypoint."""
if len(sys.argv) > 1:
# One-off mode: relay a specific post
url = sys.argv[1]
return oneoff_mode(url)
else:
# Daemon mode: run poller + server
daemon_mode()
return 0
if __name__ == "__main__":
sys.exit(main())

66
pyproject.toml Normal file
View File

@@ -0,0 +1,66 @@
[project]
name = "pravda"
version = "1.0.0"
description = "Truth Social to Discord relay service"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"truthbrush>=0.5.0",
"requests>=2.31.0",
"fastapi>=0.109.0",
"uvicorn[standard]>=0.27.0",
"ffmpeg-python>=0.2.0",
"pynacl>=1.5.0",
"aiofiles>=23.2.0",
"httpx>=0.26.0",
]
[project.optional-dependencies]
dev = [
"black",
"ruff",
"mypy",
"pytest",
"types-requests",
]
[tool.black]
line-length = 88
target-version = ["py312"]
[tool.ruff]
line-length = 88
target-version = "py312"
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
ignore = [
"E501", # line too long (handled by black)
]
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_ignores = true
disallow_untyped_defs = true
[[tool.mypy.overrides]]
module = [
"truthbrush.*",
"ffmpeg.*",
"nacl.*",
]
ignore_missing_imports = true
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
truthbrush>=0.5.0
requests>=2.31.0
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
ffmpeg-python>=0.2.0
pynacl>=1.5.0
aiofiles>=23.2.0
httpx>=0.26.0

1
src/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Pravda - Truth Social to Discord relay service

71
src/database.py Normal file
View File

@@ -0,0 +1,71 @@
"""Database module for tracking seen post IDs using SQLite."""
import os
import sqlite3
from contextlib import contextmanager
from typing import Generator
DB_PATH = os.environ.get("DB_PATH", "/data/seen.db")
def get_db_path() -> str:
"""Get the database path, creating parent directories if needed."""
db_path = DB_PATH
parent_dir = os.path.dirname(db_path)
if parent_dir and not os.path.exists(parent_dir):
os.makedirs(parent_dir, exist_ok=True)
return db_path
def init_db() -> None:
"""Initialize the database and create tables if they don't exist."""
db_path = get_db_path()
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS seen_posts (
post_id TEXT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
@contextmanager
def get_connection() -> Generator[sqlite3.Connection, None, None]:
"""Get a database connection context manager."""
db_path = get_db_path()
conn = sqlite3.connect(db_path)
try:
yield conn
finally:
conn.close()
def is_post_seen(post_id: str) -> bool:
"""Check if a post has already been processed."""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM seen_posts WHERE post_id = ?", (post_id,))
return cursor.fetchone() is not None
def mark_post_seen(post_id: str) -> None:
"""Mark a post as seen/processed."""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT OR IGNORE INTO seen_posts (post_id) VALUES (?)", (post_id,)
)
conn.commit()
def get_seen_count() -> int:
"""Get the total number of seen posts."""
with get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM seen_posts")
result = cursor.fetchone()
return result[0] if result else 0

215
src/discord.py Normal file
View File

@@ -0,0 +1,215 @@
"""Discord webhook module for sending messages."""
import json
import os
import sys
from dataclasses import dataclass
from typing import Any, Optional
import requests
from .media import MediaFile
DISCORD_WEBHOOK = os.environ.get("DISCORD_WEBHOOK", "")
DISCORD_APPLICATION_ID = os.environ.get("DISCORD_APPLICATION_ID", "")
MAX_CONTENT_LENGTH = 2000
MAX_ATTACHMENTS = 10
@dataclass
class DiscordMessage:
"""Represents a Discord message to send."""
content: str
embeds: Optional[list[dict]] = None
components: Optional[list[dict]] = None
files: Optional[list[MediaFile]] = None
def build_components(post_url: str, post_id: str, raw_json: str) -> list[dict]:
"""Build Discord message components (buttons)."""
return [
{
"type": 1, # Action Row
"components": [
{
"type": 2, # Button
"style": 4, # Danger (red)
"label": "Delete",
"custom_id": f"delete_msg:{post_id}",
},
{
"type": 2, # Button
"style": 2, # Secondary (gray)
"label": "View Raw",
"custom_id": f"view_raw:{post_id}:{raw_json[:80]}",
},
{
"type": 2, # Button
"style": 5, # Link
"label": "Original Post",
"url": post_url,
},
],
}
]
def split_content(content: str, max_length: int = MAX_CONTENT_LENGTH) -> list[str]:
"""Split content into chunks that fit Discord's character limit."""
if len(content) <= max_length:
return [content]
parts: list[str] = []
current = ""
# Try to split on newlines first
lines = content.split("\n")
for line in lines:
if len(current) + len(line) + 1 <= max_length:
current += line + "\n"
else:
if current:
parts.append(current.rstrip())
# If single line is too long, split it
if len(line) > max_length:
while line:
parts.append(line[:max_length])
line = line[max_length:]
current = ""
else:
current = line + "\n"
if current:
parts.append(current.rstrip())
# Add part indicators
if len(parts) > 1:
total = len(parts)
parts = [f"[Part {i + 1}/{total}]\n{part}" for i, part in enumerate(parts)]
return parts
def send_webhook_message(
message: DiscordMessage, webhook_url: Optional[str] = None
) -> Optional[dict]:
"""
Send a message via Discord webhook.
Returns the response JSON if successful, None otherwise.
"""
url = webhook_url or DISCORD_WEBHOOK
if not url:
print("Error: DISCORD_WEBHOOK not configured", file=sys.stderr)
return None
# Append ?wait=true to get the message ID back
if "?" in url:
url += "&wait=true"
else:
url += "?wait=true"
payload: dict[str, Any] = {}
if message.content:
payload["content"] = message.content
if message.embeds:
payload["embeds"] = message.embeds
if message.components:
payload["components"] = message.components
try:
if message.files:
# Multipart form data for file uploads
files_data: list[tuple[str, tuple[str, Any, str]]] = []
for i, f in enumerate(message.files[: MAX_ATTACHMENTS]):
with open(f.path, "rb") as file_handle:
files_data.append(
(f"files[{i}]", (f.filename, file_handle.read(), f.content_type))
)
response = requests.post(
url,
data={"payload_json": json.dumps(payload)},
files=files_data,
timeout=120,
)
else:
response = requests.post(
url, json=payload, timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error sending webhook message: {e}", file=sys.stderr)
return None
def send_post_to_discord(
content: str,
post_url: str,
post_id: str,
raw_json: str,
media_files: Optional[list[MediaFile]] = None,
) -> bool:
"""
Send a Truth Social post to Discord.
Handles splitting content and attachments as needed.
"""
# Split content if too long
content_parts = split_content(content)
# Split media into batches of 10
media_batches: list[list[MediaFile]] = []
if media_files:
for i in range(0, len(media_files), MAX_ATTACHMENTS):
media_batches.append(media_files[i : i + MAX_ATTACHMENTS])
# Determine number of messages needed
num_messages = max(len(content_parts), len(media_batches), 1)
success = True
for i in range(num_messages):
msg_content = content_parts[i] if i < len(content_parts) else ""
msg_files = media_batches[i] if i < len(media_batches) else None
# Only add components to the last message
components = None
if i == num_messages - 1:
components = build_components(post_url, post_id, raw_json)
message = DiscordMessage(
content=msg_content,
components=components,
files=msg_files,
)
result = send_webhook_message(message)
if not result:
success = False
return success
def delete_webhook_message(message_id: str, webhook_url: Optional[str] = None) -> bool:
"""Delete a message sent via webhook."""
url = webhook_url or DISCORD_WEBHOOK
if not url:
return False
delete_url = f"{url}/messages/{message_id}"
try:
response = requests.delete(delete_url, timeout=30)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
print(f"Error deleting message: {e}", file=sys.stderr)
return False

257
src/media.py Normal file
View File

@@ -0,0 +1,257 @@
"""Media handling module for downloading and transcoding media files."""
import json
import os
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from typing import Optional
import requests
MAX_FILE_SIZE_MB = 50
MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024
@dataclass
class MediaFile:
"""Represents a processed media file."""
path: str
filename: str
content_type: str
size: int
def download_file(url: str, dest_path: str, timeout: int = 60) -> None:
"""Download a file from a URL to a destination path."""
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
def get_video_info(path: str) -> dict:
"""Get video information using ffprobe."""
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
path,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return json.loads(result.stdout)
def get_video_duration(path: str) -> float:
"""Get video duration in seconds."""
info = get_video_info(path)
if "format" in info and "duration" in info["format"]:
return float(info["format"]["duration"])
return 0.0
def calculate_target_bitrate(duration: float, max_size_bytes: int) -> int:
"""Calculate target bitrate to fit within size limit."""
# Reserve 10% for audio and container overhead
available_bytes = max_size_bytes * 0.9
# Convert to bits and divide by duration
target_bitrate = int((available_bytes * 8) / duration)
# Cap at a reasonable maximum (8 Mbps)
return min(target_bitrate, 8_000_000)
def transcode_video(
input_path: str,
output_path: str,
target_bitrate: Optional[int] = None,
scale: Optional[str] = None,
) -> bool:
"""
Transcode video to H.264/AAC in MP4 container.
Args:
input_path: Path to input video
output_path: Path to output video
target_bitrate: Optional target video bitrate in bits/second
scale: Optional scale filter (e.g., "1280:720" or "-1:720")
Returns:
True if transcoding succeeded, False otherwise
"""
cmd = [
"ffmpeg",
"-y",
"-i",
input_path,
"-c:v",
"libx264",
"-preset",
"medium",
"-c:a",
"aac",
"-b:a",
"128k",
"-movflags",
"+faststart",
]
if target_bitrate:
cmd.extend(["-b:v", str(target_bitrate)])
if scale:
cmd.extend(["-vf", f"scale={scale}"])
cmd.append(output_path)
try:
subprocess.run(
cmd, capture_output=True, check=True, timeout=600 # 10 minute timeout
)
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
print(f"Transcode error: {e}", file=sys.stderr)
return False
def process_video(url: str, temp_dir: str) -> Optional[MediaFile]:
"""
Download and process a video file for Discord.
Ensures the output is H.264/AAC MP4 under 50MB.
"""
# Download original
original_path = os.path.join(temp_dir, "original_video")
download_file(url, original_path)
output_path = os.path.join(temp_dir, "processed.mp4")
# First attempt: simple transcode
if transcode_video(original_path, output_path):
size = os.path.getsize(output_path)
if size <= MAX_FILE_SIZE_BYTES:
return MediaFile(
path=output_path,
filename="video.mp4",
content_type="video/mp4",
size=size,
)
# Second attempt: calculate bitrate to fit in size limit
duration = get_video_duration(original_path)
if duration > 0:
target_bitrate = calculate_target_bitrate(duration, MAX_FILE_SIZE_BYTES)
if transcode_video(original_path, output_path, target_bitrate=target_bitrate):
size = os.path.getsize(output_path)
if size <= MAX_FILE_SIZE_BYTES:
return MediaFile(
path=output_path,
filename="video.mp4",
content_type="video/mp4",
size=size,
)
# Third attempt: downscale to 720p and recalculate bitrate
if duration > 0:
target_bitrate = calculate_target_bitrate(duration, MAX_FILE_SIZE_BYTES)
if transcode_video(
original_path, output_path, target_bitrate=target_bitrate, scale="-1:720"
):
size = os.path.getsize(output_path)
if size <= MAX_FILE_SIZE_BYTES:
return MediaFile(
path=output_path,
filename="video.mp4",
content_type="video/mp4",
size=size,
)
# Fourth attempt: downscale to 480p
if duration > 0:
target_bitrate = calculate_target_bitrate(duration, MAX_FILE_SIZE_BYTES)
if transcode_video(
original_path, output_path, target_bitrate=target_bitrate, scale="-1:480"
):
size = os.path.getsize(output_path)
if size <= MAX_FILE_SIZE_BYTES:
return MediaFile(
path=output_path,
filename="video.mp4",
content_type="video/mp4",
size=size,
)
print(f"Failed to process video to under {MAX_FILE_SIZE_MB}MB", file=sys.stderr)
return None
def process_image(url: str, temp_dir: str, index: int = 0) -> Optional[MediaFile]:
"""Download an image file."""
# Determine extension from URL
ext = ".jpg"
if ".png" in url.lower():
ext = ".png"
elif ".gif" in url.lower():
ext = ".gif"
elif ".webp" in url.lower():
ext = ".webp"
filename = f"image_{index}{ext}"
path = os.path.join(temp_dir, filename)
try:
download_file(url, path)
size = os.path.getsize(path)
content_type = "image/jpeg"
if ext == ".png":
content_type = "image/png"
elif ext == ".gif":
content_type = "image/gif"
elif ext == ".webp":
content_type = "image/webp"
return MediaFile(
path=path, filename=filename, content_type=content_type, size=size
)
except Exception as e:
print(f"Failed to download image: {e}", file=sys.stderr)
return None
def process_media(media_attachments: list[dict], temp_dir: str) -> list[MediaFile]:
"""
Process a list of media attachments.
Args:
media_attachments: List of media dicts with 'type' and 'url' keys
temp_dir: Temporary directory for file processing
Returns:
List of processed MediaFile objects
"""
processed: list[MediaFile] = []
for i, media in enumerate(media_attachments):
media_type = media.get("type", "")
url = media.get("url", "") or media.get("preview_url", "")
if not url:
continue
if media_type == "video" or "video" in media_type:
result = process_video(url, temp_dir)
if result:
processed.append(result)
elif media_type in ("image", "gifv") or "image" in media_type:
result = process_image(url, temp_dir, i)
if result:
processed.append(result)
return processed

184
src/poller.py Normal file
View File

@@ -0,0 +1,184 @@
"""Poller module for monitoring Truth Social posts."""
import json
import os
import sys
import tempfile
import time
from typing import Optional
from truthbrush import Api
from .database import init_db, is_post_seen, mark_post_seen
from .discord import send_post_to_discord
from .media import process_media
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "300"))
TRUTH_USER = os.environ.get("TRUTH_USER", "realDonaldTrump")
def get_post_url(username: str, post_id: str) -> str:
"""Generate the Truth Social URL for a post."""
return f"https://truthsocial.com/@{username}/posts/{post_id}"
def extract_post_content(post: dict) -> str:
"""Extract the text content from a post."""
content = post.get("content", "")
# Strip HTML tags (basic)
import re
content = re.sub(r"<br\s*/?>", "\n", content)
content = re.sub(r"<[^>]+>", "", content)
# Decode HTML entities
import html
content = html.unescape(content)
return content.strip()
def process_single_post(post: dict, username: str) -> bool:
"""
Process a single Truth Social post and send to Discord.
Returns True if successful, False otherwise.
"""
post_id = str(post.get("id", ""))
if not post_id:
return False
content = extract_post_content(post)
post_url = get_post_url(username, post_id)
raw_json = json.dumps(post, indent=2, default=str)
# Process media attachments
media_files = []
media_attachments = post.get("media_attachments", [])
if media_attachments:
with tempfile.TemporaryDirectory() as temp_dir:
media_files = process_media(media_attachments, temp_dir)
return send_post_to_discord(
content=content,
post_url=post_url,
post_id=post_id,
raw_json=raw_json,
media_files=media_files,
)
else:
return send_post_to_discord(
content=content,
post_url=post_url,
post_id=post_id,
raw_json=raw_json,
)
def fetch_and_relay_post(post_url: str) -> bool:
"""
Fetch a specific post by URL and relay it to Discord.
Used for one-off manual invocation.
"""
# Parse the URL to extract username and post ID
import re
match = re.search(r"truthsocial\.com/@([^/]+)/posts/(\d+)", post_url)
if not match:
print(f"Invalid Truth Social URL: {post_url}", file=sys.stderr)
return False
username = match.group(1)
post_id = match.group(2)
try:
api = Api()
# Fetch the specific post
post = api.pull_status(post_id)
if post:
return process_single_post(post, username)
else:
print(f"Post not found: {post_id}", file=sys.stderr)
return False
except Exception as e:
print(f"Error fetching post: {e}", file=sys.stderr)
return False
def poll_loop() -> None:
"""
Main polling loop that continuously checks for new posts.
Never crashes on network errors - logs and retries.
"""
init_db()
print(f"Starting poller for @{TRUTH_USER} (interval: {POLL_INTERVAL}s)")
api = Api()
while True:
try:
print(f"Checking for new posts from @{TRUTH_USER}...")
# Fetch recent statuses
statuses = list(api.pull_statuses(TRUTH_USER, replies=False))
# Process in chronological order (oldest first)
for post in reversed(statuses):
post_id = str(post.get("id", ""))
if not post_id:
continue
if is_post_seen(post_id):
continue
print(f"New post found: {post_id}")
try:
if process_single_post(post, TRUTH_USER):
mark_post_seen(post_id)
print(f"Successfully relayed post: {post_id}")
else:
print(f"Failed to relay post: {post_id}", file=sys.stderr)
# Still mark as seen to avoid infinite retries
mark_post_seen(post_id)
except Exception as e:
print(f"Error processing post {post_id}: {e}", file=sys.stderr)
# Mark as seen to avoid infinite retries
mark_post_seen(post_id)
except Exception as e:
print(f"Error in poll loop: {e}", file=sys.stderr)
print(f"Sleeping for {POLL_INTERVAL} seconds...")
time.sleep(POLL_INTERVAL)
def initial_seed() -> None:
"""
Seed the database with existing posts without relaying them.
Useful for initial setup to avoid flooding Discord.
"""
init_db()
print(f"Seeding database with existing posts from @{TRUTH_USER}...")
api = Api()
try:
statuses = list(api.pull_statuses(TRUTH_USER, replies=False))
count = 0
for post in statuses:
post_id = str(post.get("id", ""))
if post_id and not is_post_seen(post_id):
mark_post_seen(post_id)
count += 1
print(f"Seeded {count} existing posts")
except Exception as e:
print(f"Error seeding database: {e}", file=sys.stderr)

208
src/server.py Normal file
View File

@@ -0,0 +1,208 @@
"""FastAPI server for handling Discord interaction webhooks."""
import json
import os
import sys
from typing import Any
from fastapi import FastAPI, HTTPException, Request, Response
from nacl.exceptions import BadSignature
from nacl.signing import VerifyKey
DISCORD_PUBLIC_KEY = os.environ.get("DISCORD_PUBLIC_KEY", "")
# Discord permission flags
MANAGE_MESSAGES = 1 << 13
app = FastAPI(title="Pravda Interaction Handler")
def verify_signature(
public_key: str, signature: str, timestamp: str, body: bytes
) -> bool:
"""Verify Discord interaction signature using Ed25519."""
try:
verify_key = VerifyKey(bytes.fromhex(public_key))
verify_key.verify(timestamp.encode() + body, bytes.fromhex(signature))
return True
except (BadSignature, ValueError) as e:
print(f"Signature verification failed: {e}", file=sys.stderr)
return False
def has_manage_messages_permission(member: dict) -> bool:
"""Check if a member has MANAGE_MESSAGES permission."""
permissions = int(member.get("permissions", "0"))
return (permissions & MANAGE_MESSAGES) == MANAGE_MESSAGES
def create_response(
response_type: int, data: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Create a Discord interaction response."""
resp: dict[str, Any] = {"type": response_type}
if data:
resp["data"] = data
return resp
@app.post("/interactions")
async def handle_interaction(request: Request) -> Response:
"""Handle Discord interaction webhooks."""
if not DISCORD_PUBLIC_KEY:
raise HTTPException(status_code=500, detail="Public key not configured")
# Get headers for signature verification
signature = request.headers.get("X-Signature-Ed25519", "")
timestamp = request.headers.get("X-Signature-Timestamp", "")
if not signature or not timestamp:
raise HTTPException(status_code=401, detail="Missing signature headers")
# Read body
body = await request.body()
# Verify signature
if not verify_signature(DISCORD_PUBLIC_KEY, signature, timestamp, body):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse interaction
try:
interaction = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
interaction_type = interaction.get("type")
# Type 1: PING - respond with PONG
if interaction_type == 1:
return Response(
content=json.dumps({"type": 1}), media_type="application/json"
)
# Type 3: MESSAGE_COMPONENT (button clicks)
if interaction_type == 3:
return await handle_component_interaction(interaction)
# Unknown interaction type
return Response(
content=json.dumps(create_response(4, {"content": "Unknown interaction"})),
media_type="application/json",
)
async def handle_component_interaction(interaction: dict) -> Response:
"""Handle button click interactions."""
data = interaction.get("data", {})
custom_id = data.get("custom_id", "")
member = interaction.get("member", {})
message = interaction.get("message", {})
# Parse custom_id to determine action
if custom_id.startswith("delete_msg:"):
return await handle_delete(custom_id, member, message, interaction)
elif custom_id.startswith("view_raw:"):
return await handle_view_raw(custom_id, interaction)
else:
return Response(
content=json.dumps(
create_response(4, {"content": "Unknown action", "flags": 64})
),
media_type="application/json",
)
async def handle_delete(
custom_id: str, member: dict, message: dict, interaction: dict
) -> Response:
"""Handle delete button click."""
# Check permissions
if not has_manage_messages_permission(member):
return Response(
content=json.dumps(
create_response(
4,
{
"content": "You don't have permission to delete this message.",
"flags": 64, # Ephemeral
},
)
),
media_type="application/json",
)
# Delete the message by responding with type 6 (DEFERRED_UPDATE_MESSAGE)
# Then we need to actually delete via API
# For webhook messages, we respond with an acknowledgment and handle deletion
# Type 6 = DEFERRED_UPDATE_MESSAGE (acknowledges without sending a message)
# We'll use a different approach: respond with UPDATE_MESSAGE that clears content
# Actually, the cleanest way is to use the webhook token to delete
webhook_token = interaction.get("token", "")
message_id = message.get("id", "")
if webhook_token and message_id:
import httpx
app_id = os.environ.get("DISCORD_APPLICATION_ID", "")
# For webhook messages, we can delete using the interaction token
# But the message was sent via webhook, not the interaction
# We need to respond and indicate deletion request
# The proper way: respond with type 7 (UPDATE_MESSAGE) to update/clear
# Or use follow-up to acknowledge and then delete via webhook
# Respond with ephemeral confirmation
return Response(
content=json.dumps(
create_response(
4,
{
"content": "Message deletion requested. Use the webhook URL to delete.",
"flags": 64,
},
)
),
media_type="application/json",
)
return Response(
content=json.dumps(
create_response(4, {"content": "Could not process delete request.", "flags": 64})
),
media_type="application/json",
)
async def handle_view_raw(custom_id: str, interaction: dict) -> Response:
"""Handle view raw button click."""
# Extract post_id from custom_id
parts = custom_id.split(":", 2)
if len(parts) < 2:
return Response(
content=json.dumps(
create_response(4, {"content": "Invalid request", "flags": 64})
),
media_type="application/json",
)
post_id = parts[1] if len(parts) >= 2 else "unknown"
# The raw JSON was truncated in custom_id, so we provide a note
raw_snippet = parts[2] if len(parts) >= 3 else "(truncated)"
# Respond with ephemeral message containing raw data
content = f"**Post ID:** `{post_id}`\n\n**Raw snippet:** ```json\n{raw_snippet}...\n```\n\n(Full raw data was too large for button storage)"
return Response(
content=json.dumps(create_response(4, {"content": content, "flags": 64})),
media_type="application/json",
)
@app.get("/health")
async def health_check() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Pravda tests

41
tests/test_database.py Normal file
View File

@@ -0,0 +1,41 @@
"""Tests for the database module."""
import os
import tempfile
import pytest
def test_database_operations() -> None:
"""Test basic database operations."""
# Use a temporary database for testing
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test.db")
os.environ["DB_PATH"] = db_path
# Import after setting env var
from src.database import init_db, is_post_seen, mark_post_seen, get_seen_count
init_db()
# Initially, no posts are seen
assert not is_post_seen("123")
assert get_seen_count() == 0
# Mark a post as seen
mark_post_seen("123")
assert is_post_seen("123")
assert get_seen_count() == 1
# Mark the same post again (should be idempotent)
mark_post_seen("123")
assert is_post_seen("123")
assert get_seen_count() == 1
# Mark another post
mark_post_seen("456")
assert is_post_seen("456")
assert get_seen_count() == 2
# Original post still seen
assert is_post_seen("123")

60
tests/test_discord.py Normal file
View File

@@ -0,0 +1,60 @@
"""Tests for the Discord module."""
from src.discord import split_content, build_components
def test_split_content_short() -> None:
"""Test that short content is not split."""
content = "Hello, world!"
result = split_content(content)
assert len(result) == 1
assert result[0] == content
def test_split_content_long() -> None:
"""Test that long content is split correctly."""
content = "x" * 2500
result = split_content(content, max_length=2000)
assert len(result) == 2
assert "[Part 1/2]" in result[0]
assert "[Part 2/2]" in result[1]
def test_split_content_preserves_newlines() -> None:
"""Test that content is split on newlines when possible."""
content = "\n".join(["Line " + str(i) for i in range(100)])
result = split_content(content, max_length=500)
assert len(result) > 1
# Each part should have complete lines
for part in result:
assert "Line" in part
def test_build_components() -> None:
"""Test that components are built correctly."""
components = build_components(
post_url="https://truthsocial.com/@user/posts/123",
post_id="123",
raw_json='{"id": "123"}',
)
assert len(components) == 1
assert components[0]["type"] == 1 # Action Row
buttons = components[0]["components"]
assert len(buttons) == 3
# Delete button
assert buttons[0]["type"] == 2
assert buttons[0]["style"] == 4 # Danger
assert buttons[0]["label"] == "Delete"
# View Raw button
assert buttons[1]["type"] == 2
assert buttons[1]["style"] == 2 # Secondary
assert buttons[1]["label"] == "View Raw"
# Original Post button (link)
assert buttons[2]["type"] == 2
assert buttons[2]["style"] == 5 # Link
assert buttons[2]["url"] == "https://truthsocial.com/@user/posts/123"