#!/usr/bin/env python3 """ Trac → Forgejo issue migrator (tickets, comments, labels, milestones, attachments-as-links) ✅ Features - Migrates Trac tickets (title, body, metadata) to Forgejo issues - Preserves reporter/owner via user mapping - Converts common Trac wiki markup → Markdown (lightweight, optional) - Replays comments with original author & timestamp banners - Recreates labels (component, type, priority, status/resolution) if missing - Recreates milestones (by name) if missing and assigns to issues - Closes issues that were closed in Trac and adds a resolution label - Adds links for attachments (keeps them downloadable from your Trac site) - Idempotent: keeps a trac_id ↔ forgejo_issue index mapping file - Dry‑run mode ⚙️ Requirements - Python 3.8+ - pip install requests pyyaml 🧰 Usage 1) Create a config YAML (example below) as config.yaml 2) Run: python3 trac_to_forgejo_migrator.py --config config.yaml --dry-run # preview python3 trac_to_forgejo_migrator.py --config config.yaml # migrate Example config.yaml ------------------- forgejo: base_url: "https://forgejo.example.org" # no trailing slash owner: "your-org" repo: "your-repo" token: "" # needs repo:write, admin:repo_hook for labels/milestones rate_limit_sleep_sec: 0.2 # politeness pause between API calls trac: # SQLite DB path of your Trac instance (read-only). For MySQL/PostgreSQL, see notes below. sqlite_path: "/var/lib/trac/project/db/trac.db" # Base URL of your Trac site (used to link back to original ticket & attachments) base_url: "https://trac.example.org" migrate: # restrict to a subset (omit or set to [] for all) include_ticket_ids: [] exclude_ticket_ids: [] # when True, prefix titles with "[Trac #ID]" for easy cross-reference prefix_trac_id_in_title: true # If a Forgejo issue already exists for a Trac id (mapping file), it will be skipped dry_run: false mapping_files: # JSON file created/updated by this script to remember Trac→Forgejo mapping id_map_path: "./trac_forgejo_id_map.json" users: # Map Trac usernames → Forgejo usernames (for assignees); reporter shown in body regardless # Unmapped users are left as plain text. "michael": "michael-niedermayer" "alice": "alice" labels: # Optional color overrides (6-hex without #). Missing labels get a stable hash color. colors: "component/avcodec": "0366d6" "resolution/fixed": "2ea043" markdown: # If you have pandoc installed and want better conversion, set to path like "/usr/bin/pandoc" pandoc_path: null # Or use the built-in lightweight converter (recommended default) use_lightweight_converter: true notes: # For MySQL/PostgreSQL backends, you can export SQLite first (or extend the code to use PyMySQL/psycopg2) # Attachments: this script links to Trac's attachment URLs; to *upload* into Forgejo later, see TODOs in code. """ from __future__ import annotations import argparse import dataclasses import datetime as dt import hashlib import json import os import re import sqlite3 import subprocess import sys import time from typing import Any, Dict, Iterable, List, Optional, Tuple import requests try: import yaml # type: ignore except Exception: # pragma: no cover print("ERROR: pyyaml is required. pip install pyyaml", file=sys.stderr) sys.exit(2) ISO = "%Y-%m-%d %H:%M:%S%z" # ---------------------------- Utilities ---------------------------- def ts_from_trac(microseconds_since_epoch: int) -> dt.datetime: """Trac stores timestamps as microseconds since epoch.""" # Trac uses microseconds epoch (int). Interpret as UTC. sec = microseconds_since_epoch / 1_000_000.0 return dt.datetime.fromtimestamp(sec, tz=dt.timezone.utc) def banner(author: str, when: dt.datetime) -> str: return f"_Originally posted by **{author or 'unknown'}** on {when.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n" def stable_color(name: str) -> str: """Derive a 6-hex color from a name (no leading '#').""" h = hashlib.sha1(name.encode("utf-8")).hexdigest() return h[:6] # ----------------------- Lightweight Trac→MD ----------------------- TRIPLE_SINGLE = re.compile(r"'''(.*?)'''", re.DOTALL) DOUBLE_SINGLE = re.compile(r"''(.*?)''", re.DOTALL) HEADING = re.compile(r"^(=+)[ \t]*(.*?)[ \t]*=+$", re.MULTILINE) WIKI_LINK = re.compile(r"\[(?:wiki:)?([^\] ]+)(?:\s+([^\]]+))?\]") HTTP_LINK = re.compile(r"\[(https?://[^\] ]+)(?:\s+([^\]]+))?\]") BR = re.compile(r"\[\[BR\]\]") def trac_to_markdown(text: str) -> str: """Very small subset of Trac wiki → Markdown conversions. Keeps unknown markup as-is to avoid data loss. """ if not text: return "" out = text # line breaks out = BR.sub(" \n", out) # bold/italic out = TRIPLE_SINGLE.sub(r"**\1**", out) out = DOUBLE_SINGLE.sub(r"*\1*", out) # headings: '== Title ==' → '## Title' def _h(m: re.Match) -> str: level = len(m.group(1)) level = min(max(level, 1), 6) return f"{'#' * level} {m.group(2).strip()}" out = HEADING.sub(_h, out) # external links [http://url label] out = HTTP_LINK.sub(lambda m: f"[{m.group(2) or m.group(1)}]({m.group(1)})", out) # wiki-ish links [wiki:Page Label] or [Page Label] → [Label](Page) def _wiki(m: re.Match) -> str: target = m.group(1) label = m.group(2) or target # naive: leave as code-ish if it looks like CamelCase wiki page return f"[{label}]({target})" out = WIKI_LINK.sub(_wiki, out) return out def maybe_pandoc(text: str, pandoc_path: Optional[str]) -> str: if not text: return "" if not pandoc_path: return text try: p = subprocess.run( [pandoc_path, "-f", "trac", "-t", "gfm"], input=text.encode("utf-8"), stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, ) return p.stdout.decode("utf-8") except Exception as e: # fall back silently return text # ----------------------------- Trac DB ----------------------------- @dataclasses.dataclass class TracTicket: id: int type: str time: dt.datetime changetime: dt.datetime component: Optional[str] severity: Optional[str] priority: Optional[str] owner: Optional[str] reporter: Optional[str] cc: Optional[str] version: Optional[str] milestone: Optional[str] status: str resolution: Optional[str] summary: str description: str @dataclasses.dataclass class TracChange: ticket: int time: dt.datetime author: Optional[str] field: str oldvalue: Optional[str] newvalue: Optional[str] @dataclasses.dataclass class TracAttachment: ticket: int filename: str author: Optional[str] time: dt.datetime description: Optional[str] size: Optional[int] class Trac: def __init__(self, sqlite_path: str): self.sqlite_path = sqlite_path self.conn = sqlite3.connect(sqlite_path) self.conn.row_factory = sqlite3.Row def fetch_ticket_ids(self) -> List[int]: cur = self.conn.execute("SELECT id FROM ticket ORDER BY id ASC") return [row[0] for row in cur.fetchall()] def fetch_ticket(self, tid: int) -> TracTicket: row = self.conn.execute( "SELECT id, type, time, changetime, component, severity, priority, owner, reporter, cc, version, milestone, status, resolution, summary, description FROM ticket WHERE id=?", (tid,), ).fetchone() if not row: raise KeyError(f"Ticket {tid} not found") return TracTicket( id=row["id"], type=row["type"] or "", time=ts_from_trac(row["time"]), changetime=ts_from_trac(row["changetime"]), component=row["component"], severity=row["severity"], priority=row["priority"], owner=row["owner"], reporter=row["reporter"], cc=row["cc"], version=row["version"], milestone=row["milestone"], status=row["status"] or "new", resolution=row["resolution"], summary=row["summary"] or "", description=row["description"] or "", ) def fetch_changes(self, tid: int) -> List[TracChange]: cur = self.conn.execute( "SELECT ticket, time, author, field, oldvalue, newvalue FROM ticket_change WHERE ticket=? ORDER BY time ASC, rowid ASC", (tid,), ) out: List[TracChange] = [] for r in cur.fetchall(): out.append( TracChange( ticket=r["ticket"], time=ts_from_trac(r["time"]), author=r["author"], field=r["field"], oldvalue=r["oldvalue"], newvalue=r["newvalue"], ) ) return out def fetch_attachments(self, tid: int) -> List[TracAttachment]: # Trac 'attachment' table schema: type, id, filename, size, time, description, author, ipnr cur = self.conn.execute( "SELECT filename, size, time, description, author FROM attachment WHERE type='ticket' AND id=? ORDER BY time ASC", (str(tid),), ) out: List[TracAttachment] = [] for r in cur.fetchall(): out.append( TracAttachment( ticket=tid, filename=r["filename"], author=r["author"], time=ts_from_trac(r["time"]), description=r["description"], size=r["size"], ) ) return out # --------------------------- Forgejo Client --------------------------- class Forgejo: def __init__(self, base_url: str, owner: str, repo: str, token: str, rate_sleep: float = 0.0): self.base = base_url.rstrip('/') self.owner = owner self.repo = repo self.session = requests.Session() self.session.headers.update({ 'Authorization': f'token {token}', 'Accept': 'application/json', }) self.rate_sleep = rate_sleep # ---- helpers ---- def _url(self, path: str) -> str: return f"{self.base}/api/v1/repos/{self.owner}/{self.repo}{path}" def _req(self, method: str, url: str, **kw) -> requests.Response: r = self.session.request(method, url, timeout=60, **kw) if self.rate_sleep: time.sleep(self.rate_sleep) if r.status_code >= 400: raise RuntimeError(f"HTTP {r.status_code} for {url}: {r.text[:400]}") return r # ---- labels ---- def list_labels(self) -> Dict[str, Dict[str, Any]]: url = self._url("/labels") r = self._req('GET', url) items = r.json() return {it['name']: it for it in items} def ensure_label(self, name: str, color: Optional[str] = None, description: str = "") -> int: existing = self.list_labels() if name in existing: return existing[name]['id'] url = self._url("/labels") payload = { 'name': name, 'color': f"#{(color or stable_color(name)).lstrip('#')}", 'description': description, } r = self._req('POST', url, json=payload) return r.json()['id'] # ---- milestones ---- def list_milestones(self) -> Dict[str, Dict[str, Any]]: url = self._url("/milestones") r = self._req('GET', url) items = r.json() return {it['title']: it for it in items} def ensure_milestone(self, title: str) -> Optional[int]: if not title: return None existing = self.list_milestones() if title in existing: return existing[title]['id'] url = self._url("/milestones") r = self._req('POST', url, json={'title': title}) return r.json()['id'] # ---- issues ---- def create_issue(self, title: str, body: str, labels: List[str], assignees: List[str], milestone_id: Optional[int]) -> Dict[str, Any]: # Ensure labels exist and collect ids label_ids: List[int] = [] for ln in labels: if not ln: continue lid = self.ensure_label(ln) label_ids.append(lid) payload: Dict[str, Any] = { 'title': title, 'body': body, 'assignees': assignees or [], 'labels': label_ids, } if milestone_id: payload['milestone'] = milestone_id url = self._url('/issues') r = self._req('POST', url, json=payload) return r.json() def close_issue(self, index: int) -> None: url = self._url(f'/issues/{index}') self._req('PATCH', url, json={'state': 'closed'}) def comment(self, index: int, body: str) -> Dict[str, Any]: url = self._url(f'/issues/{index}/comments') r = self._req('POST', url, json={'body': body}) return r.json() # ----------------------------- Migration ----------------------------- @dataclasses.dataclass class Config: forgejo_base: str forgejo_owner: str forgejo_repo: str forgejo_token: str rate_sleep: float trac_sqlite: str trac_base_url: str include_ids: List[int] exclude_ids: List[int] prefix_trac_id_in_title: bool id_map_path: str user_map: Dict[str, str] label_colors: Dict[str, str] pandoc_path: Optional[str] use_light_conv: bool dry_run: bool @staticmethod def from_yaml(d: Dict[str, Any]) -> 'Config': forgejo = d.get('forgejo', {}) trac = d.get('trac', {}) migrate = d.get('migrate', {}) mapping_files = d.get('mapping_files', {}) users = d.get('users', {}) labels = d.get('labels', {}) markdown = d.get('markdown', {}) return Config( forgejo_base=forgejo['base_url'], forgejo_owner=forgejo['owner'], forgejo_repo=forgejo['repo'], forgejo_token=forgejo['token'], rate_sleep=float(forgejo.get('rate_limit_sleep_sec', 0) or 0), trac_sqlite=trac['sqlite_path'], trac_base_url=trac['base_url'].rstrip('/'), include_ids=list(migrate.get('include_ticket_ids') or []), exclude_ids=list(migrate.get('exclude_ticket_ids') or []), prefix_trac_id_in_title=bool(migrate.get('prefix_trac_id_in_title', True)), id_map_path=mapping_files.get('id_map_path', './trac_forgejo_id_map.json'), user_map={str(k): str(v) for k, v in (users or {}).items()}, label_colors=labels.get('colors', {}), pandoc_path=markdown.get('pandoc_path'), use_light_conv=bool(markdown.get('use_lightweight_converter', True)), dry_run=bool(migrate.get('dry_run', False)), ) class Migrator: def __init__(self, cfg: Config): self.cfg = cfg self.trac = Trac(cfg.trac_sqlite) self.forge = Forgejo(cfg.forgejo_base, cfg.forgejo_owner, cfg.forgejo_repo, cfg.forgejo_token, rate_sleep=cfg.rate_sleep) self.id_map = self._load_id_map(cfg.id_map_path) # ---------- mapping persistence ---------- def _load_id_map(self, path: str) -> Dict[str, Any]: if os.path.exists(path): with open(path, 'r', encoding='utf-8') as f: return json.load(f) return {} def _save_id_map(self) -> None: tmp = self.cfg.id_map_path + ".tmp" with open(tmp, 'w', encoding='utf-8') as f: json.dump(self.id_map, f, indent=2, ensure_ascii=False) os.replace(tmp, self.cfg.id_map_path) def _map_set(self, trac_id: int, issue_index: int) -> None: self.id_map[str(trac_id)] = {'issue_index': issue_index} self._save_id_map() def _map_get(self, trac_id: int) -> Optional[int]: entry = self.id_map.get(str(trac_id)) if entry: return int(entry['issue_index']) return None # ---------- conversion helpers ---------- def _convert_body(self, t: TracTicket) -> str: header = [ f"**Trac ticket #{t.id}**", f"Reporter: `{t.reporter or ''}`", f"Owner: `{t.owner or ''}`", f"Created: {t.time.strftime('%Y-%m-%d %H:%M:%S %Z')}", f"Last change: {t.changetime.strftime('%Y-%m-%d %H:%M:%S %Z')}", f"Status: {t.status}" + (f" ({t.resolution})" if t.resolution else ""), f"Original: {self.cfg.trac_base_url}/ticket/{t.id}", "", "---", "", ] body_raw = t.description or "" if self.cfg.pandoc_path: body_md = maybe_pandoc(body_raw, self.cfg.pandoc_path) elif self.cfg.use_light_conv: body_md = trac_to_markdown(body_raw) else: body_md = body_raw return "\n".join(header) + body_md def _labels_for(self, t: TracTicket) -> List[str]: labs: List[str] = [] if t.component: labs.append(f"component/{t.component}") if t.type: labs.append(f"type/{t.type}") if t.priority: labs.append(f"priority/{t.priority}") if t.severity: labs.append(f"severity/{t.severity}") if t.version: labs.append(f"version/{t.version}") for kw in (t.keywords or '').replace(',', ' ').split(): labs.append(f"keyword/{kw}") # status/resolution as labels (state handled separately) labs.append(f"status/{t.status}") if t.resolution: labs.append(f"resolution/{t.resolution}") return labs def _ensure_labels(self, labels: Iterable[str]) -> None: for ln in labels: if not ln: continue color = self.cfg.label_colors.get(ln) desc = "Imported from Trac" if self.cfg.dry_run: print(f"DRY-RUN: ensure label {ln} ({color or stable_color(ln)})") else: self.forge.ensure_label(ln, color=color, description=desc) def _ensure_milestone(self, t: TracTicket) -> Optional[int]: if not t.milestone: return None if self.cfg.dry_run: print(f"DRY-RUN: ensure milestone {t.milestone}") return None return self.forge.ensure_milestone(t.milestone) def _assignees(self, t: TracTicket) -> List[str]: owner = (t.owner or '').strip() if not owner: return [] mapped = self.cfg.user_map.get(owner) return [mapped] if mapped else [] # ---------- comments & attachments ---------- def _emit_comments(self, idx: int, t: TracTicket, changes: List[TracChange]) -> None: # Only 'comment' field entries produce issue comments; other changes are summarized. # We'll also emit non-empty status/owner changes to preserve history. for ch in changes: if ch.field == 'comment' and (ch.newvalue or '').strip(): text = ch.newvalue or '' if self.cfg.pandoc_path: text_md = maybe_pandoc(text, self.cfg.pandoc_path) elif self.cfg.use_light_conv: text_md = trac_to_markdown(text) else: text_md = text body = banner(ch.author or 'unknown', ch.time) + text_md if self.cfg.dry_run: print(f"DRY-RUN: add comment to #{idx}: {body[:80]!r}...") else: self.forge.comment(idx, body) elif ch.field in {"status", "owner", "milestone", "priority", "component", "resolution"}: # summarize as small audit trail comment oldv = ch.oldvalue or "" newv = ch.newvalue or "" if oldv == newv: continue body = banner(ch.author or 'unknown', ch.time) + f"**Change**: `{ch.field}` → `{newv}` (was `{oldv}`)" if self.cfg.dry_run: print(f"DRY-RUN: add audit comment to #{idx}: {body[:80]!r}...") else: self.forge.comment(idx, body) def _emit_attachments(self, idx: int, t: TracTicket, atts: List[TracAttachment]) -> None: if not atts: return for a in atts: url = f"{self.cfg.trac_base_url}/attachment/ticket/{t.id}/{a.filename}" meta = f"Attachment: **{a.filename}** ({a.size or '?'} bytes) — {url}" body = banner(a.author or 'unknown', a.time) + meta if self.cfg.dry_run: print(f"DRY-RUN: add attachment note to #{idx}: {body}") else: self.forge.comment(idx, body) # ---------- main unit of work ---------- def migrate_one(self, tid: int) -> None: if self.cfg.exclude_ids and tid in self.cfg.exclude_ids: print(f"Skip ticket {tid} (excluded)") return mapped = self._map_get(tid) if mapped: print(f"Ticket {tid} already migrated as issue #{mapped}") return t = self.trac.fetch_ticket(tid) labels = self._labels_for(t) self._ensure_labels(labels) ms_id = self._ensure_milestone(t) title = t.summary if self.cfg.prefix_trac_id_in_title: title = f"[Trac #{t.id}] {title}" body = self._convert_body(t) assignees = self._assignees(t) if self.cfg.dry_run: print(f"DRY-RUN: would create issue for Trac {tid}: title={title!r}, labels={labels}, assignees={assignees}, milestone={ms_id}") idx = -1 else: issue = self.forge.create_issue(title, body, labels, assignees, ms_id) idx = int(issue['number']) if 'number' in issue else int(issue['index']) self._map_set(tid, idx) print(f"Created Forgejo issue #{idx} for Trac {tid}") # comments & attachments changes = self.trac.fetch_changes(tid) atts = self.trac.fetch_attachments(tid) if self.cfg.dry_run: print(f"DRY-RUN: would add {sum(1 for c in changes if c.field=='comment' and (c.newvalue or '').strip())} comments and {len(atts)} attachments to issue #{idx}") else: self._emit_comments(idx, t, changes) self._emit_attachments(idx, t, atts) # close if needed if t.status.lower() == 'closed': if self.cfg.dry_run: print(f"DRY-RUN: would close issue #{idx}") else: self.forge.close_issue(idx) print(f"Closed issue #{idx} (resolution={t.resolution})") def run(self) -> None: if self.cfg.include_ids: ids = self.cfg.include_ids else: ids = self.trac.fetch_ticket_ids() for tid in ids: try: self.migrate_one(tid) except Exception as e: print(f"ERROR migrating ticket {tid}: {e}", file=sys.stderr) # ----------------------------- CLI entry ----------------------------- def load_config(path: str) -> Config: with open(path, 'r', encoding='utf-8') as f: d = yaml.safe_load(f) return Config.from_yaml(d) def main() -> None: ap = argparse.ArgumentParser(description="Migrate Trac tickets to Forgejo issues") ap.add_argument('--config', required=True, help='Path to YAML config') ap.add_argument('--dry-run', action='store_true', help='Force dry-run (overrides config)') args = ap.parse_args() cfg = load_config(args.config) if args.dry_run: cfg.dry_run = True mig = Migrator(cfg) mig.run() if __name__ == '__main__': main()