Skip Navigation

...

The instance where I was using it changed it's rules to prevent bots from posting in it and I didn't care enough to search for another instance.

https://lemm.ee/c/issue_tracker?dataType=Post&page=1&sort=Active

`config_template.py

LEMMY_INSTANCE_URL = ""
LEMMY_COMMUNITY_NAME = ""
LEMMY_USERNAME = ""
LEMMY_PASSWORD = ""
GITHUB_API_BASE = "https://api.github.com"
GITHUB_URL_BASE = "https://github.com"
REPOSITORIES = ["LemmyNet/lemmy", "LemmyNet/lemmy-ui"]
DB_FILE = "lemmy_github.db"
DELAY = 1
MAX_BACKOFF_TIME = 300
PERSONAL_ACCESS_TOKEN = ""

github_lemmy_issue_reposter.py

import backoff
import datetime
import logging
import requests
import schedule
import sqlite3
import time

from config import *
from pythorhead import Lemmy
from typing import Any, Dict, Generator, List, Optional, Tuple, Callable, TypeVar

T = TypeVar('T')

# "[%(levelname)s]:%(asctime)s:%(name)s [%(filename)s:%(lineno)s - %(funcName)s()] %(message)s"
FORMAT = "%(message)s"
logging.basicConfig(
    level=logging.INFO,
    format=FORMAT,
    handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()],
)


def on_giveup(details: Dict[str, int]) -> None:
    logging.error(f"Failed to fetch issues after {details['tries']} attempts", exc_info=True)


def handle_errors(message: Optional[str] = None) -> Callable[[Callable[..., T]], Callable[..., T]]:
    def decorator(function: Callable[..., T]) -> Callable[..., T]:
        def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> T:
            try:
                return function(*args, **kwargs)
            except Exception as e:
                if message:
                    logging.exception(f"{message} - Error in {function.__name__}:\n{e}")
                else:
                    logging.exception(f"Error in {function.__name__}:\n{e}")
                raise

        return wrapper

    return decorator


class GitHubIssue:
    def __init__(self, issue_dict: dict[str, Any], github_repo: str) -> None:
        try:
            self.url = issue_dict["html_url"]
            logging.info(f"Creating issue {self.url}")
            self.state = issue_dict["state"]
            self.state_fmt = "[Closed]" if issue_dict["state"] == "closed" else ""
            self.repo_abbr = "[UI]" if "lemmy-ui" in github_repo else "[BE]"
            self.title = f"{self.state_fmt}{self.repo_abbr} {issue_dict['title']} #{issue_dict['number']}"
            self.title = self.title[:200]
            self.body = issue_dict["body"]
            if self.body is not None:
                self.body = self.body[:30000]
            self.user = issue_dict["user"]["login"]
            self.user_url = issue_dict["user"]["html_url"]
            self.updated_at = datetime.datetime.strptime(issue_dict["updated_at"], '%Y-%m-%dT%H:%M:%SZ')
        except Exception as e:
            log_message: str = (
                f"Formatted issue:\n"
                f"  - Repo: {github_repo}\n"
                f"  - Issue State: {self.state}\n"
                f"  - Repo Abbreviation: {self.repo_abbr}\n"
                f"  - Title: {self.title}\n"
                f"  - URL: {self.url}\n"
                f"  - User: {self.user}\n"
                f"  - User URL: {self.user_url}\n"
                f"  - Updated At: {self.updated_at}\n"
            )
            logging.exception(log_message)
            logging.exception(e)

    @property
    def formatted_body(self) -> str:
        formatted_body: str = self.body
        try:
            if self.body is not None:
                formatted_body = self.body.replace("\n", "\n> ")
                formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.number}]({self.url})*"
        except Exception as e:
            logging.exception(f"Error formatting body for {self.url}\n{e}")
        return formatted_body

    @property
    def number(self) -> int:
        return int(self.url.split("/")[-1])


class GitHubComment:
    def __init__(self, comment_dict: dict[str, Any], issue_number: int) -> None:
        self.id = comment_dict["id"]
        self.body = comment_dict["body"]
        self.user = comment_dict["user"]["login"]
        self.user_url = comment_dict["user"]["html_url"]
        self.url = comment_dict["html_url"]
        self.issue_number = issue_number

    @property
    def formatted_comment(self) -> str:
        formatted_body:str = self.body.replace("\n", "\n> ")
        formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.issue_number}]({self.url})*"
        return formatted_body


@handle_errors("Error initializing database")
def initialize_database() -> sqlite3.Connection:
    logging.info("Initializing database")
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS posts (
            issue_number INTEGER PRIMARY KEY,
            lemmy_post_id INTEGER NOT NULL UNIQUE,
            issue_title TEXT,
            issue_body TEXT,
            updated_at TIMESTAMP DEFAULT NULL
        )
    """
    )
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS comments (
            github_comment_id INTEGER PRIMARY KEY,
            lemmy_comment_id INTEGER NOT NULL UNIQUE,
            comment_user TEXT,
            comment_body TEXT
            updated_at TIMESTAMP DEFAULT NULL
        )
    """
    )
    cursor.execute(
    """
    CREATE TABLE IF NOT EXISTS last_updated (
        id INTEGER PRIMARY KEY,
        last_updated_time TIMESTAMP
    );
    """
    )
    conn.commit()
    return conn


def get_last_updated_time() -> str:
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    cursor.execute("SELECT last_updated_time FROM last_updated WHERE id = 1")
    last_updated_time: str = cursor.fetchone()[0]
    conn.close()

    return last_updated_time


def update_last_updated_time() -> None:
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()
    current_time = datetime.datetime.utcnow().isoformat()
    
    cursor.execute("UPDATE last_updated SET last_updated_time = ? WHERE id = 1", (current_time,))
    if cursor.rowcount == 0:
        cursor.execute("INSERT INTO last_updated (id, last_updated_time) VALUES (1, ?)", (current_time,))
    
    conn.commit()
    conn.close()
    logging.info("Updated last updated time")


def update_post_time(post_id: int, updated_at: datetime.datetime) -> None:
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    time_formatted = updated_at.strftime('%Y-%m-%d %H:%M:%S')
    SQL = "UPDATE posts SET updated_at = ? WHERE lemmy_post_id = ?"
    cursor.execute(SQL, (time_formatted, post_id))
    conn.commit()
    conn.close()


def check_updated_at(issue_number: int) -> Optional[Tuple[int, str, str, Optional[str]]]:
    logging.info(f"Checking last post update for {issue_number}")
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    SQL = "SELECT lemmy_post_id, issue_title, issue_body, updated_at FROM posts WHERE issue_number = ?"
    cursor.execute(SQL, (issue_number,))
    result: Tuple[int, str, str, Optional[str]] = cursor.fetchone()
    conn.close()

    if result is None:
        logging.info(f"No post found for {issue_number}")
        return None
    else:
        logging.info(f"Found post for {issue_number}")
        return result


@handle_errors("Error initializing Lemmy instance")
def initialize_lemmy_instance() -> Lemmy:
    logging.info("Initializing Lemmy instance")
    lemmy = Lemmy(LEMMY_INSTANCE_URL)
    logging.info(f"Initialized Lemmy instance in {LEMMY_INSTANCE_URL}")
    lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD)
    logging.info(f"Logged in to Lemmy instance with user {LEMMY_USERNAME}")
    return lemmy


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def fetch_github_data(url: str) -> List[Dict[str, Any]]:
    global LAST_REQUEST_TIME
    try:
        headers = {
            "Accept": "application/vnd.github+json",
            "Authorization": f"Bearer {PERSONAL_ACCESS_TOKEN}",
            "X-GitHub-Api-Version": "2022-11-28",
        }
        time_elapsed = time.time() - LAST_REQUEST_TIME
        required_delay = max(0, DELAY - time_elapsed)
        time.sleep(required_delay)
        response = requests.get(url, headers=headers)
        LAST_REQUEST_TIME = time.time()
        logging.info(f"Fetched data from {url}")
        res: List[Dict[str, Any]] = response.json()
        return res
    except requests.exceptions.RequestException as e:
        logging.exception(f"Error fetching data from {url}\n{e}")
        raise


def check_existing_post(issue_number: str) -> Optional[int]:
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    SQL = "SELECT lemmy_post_id FROM posts WHERE issue_number=?"
    cursor.execute(SQL, (issue_number,))
    post_id: Optional[tuple[int]] = cursor.fetchone()
    if post_id:
        return post_id[0]
    return None


def insert_post_to_db(issue: GitHubIssue, lemmy_post_id: Optional[int]) -> None:
    try:
        conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
        cursor: sqlite3.Cursor = conn.cursor()
        SQL = "INSERT INTO posts (issue_number, lemmy_post_id, issue_title, issue_body, updated_at) VALUES (?, ?, ?, ?, ?)"
        cursor.execute(SQL, (issue.number, lemmy_post_id, issue.title, issue.formatted_body, issue.updated_at))
        conn.commit()
        logging.info(f"Inserted new Lemmy post {lemmy_post_id} into the database")
    except sqlite3.Error as e:
        logging.exception(f"Error inserting post into the database for issue {issue.title} with url {issue.url}\n{e}")
        raise


def insert_comment_to_database(cursor: sqlite3.Cursor, github_comment_id: int, lemmy_comment_id: int, comment: GitHubComment) -> None:
    try:
        SQL = "INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)"
        cursor.execute(SQL, (github_comment_id, lemmy_comment_id, comment.user, comment.formatted_comment,))
        logging.info(f"Inserted comment {github_comment_id} into the database")
    except Exception as e:
        logging.exception(f"Error encountered while inserting comment {github_comment_id} to database\n{e}")


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def create_lemmy_post(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]:
    lemmy_post_id: Optional[int] = None
    lemmy_post_id = lemmy.post.create(community_id, issue.title, url=issue.url, body=issue.body)["post_view"]["post"]["id"]
    lemmy_url = f"{LEMMY_INSTANCE_URL}/post/{lemmy_post_id}"
    logging.info(f"Posted issue {lemmy_url}")

    return lemmy_post_id


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def create_lemmy_comment(lemmy: Any, post_id: Optional[int], comment: GitHubComment) -> Optional[int]:
    logging.info(f"Creating new Lemmy comment in {LEMMY_INSTANCE_URL}/post/{post_id}")

    if not post_id:
        logging.warning("Post ID is empty. Skipping comment creation")
        return None

    response = lemmy.comment.create(post_id, comment.formatted_comment)
    lemmy_comment_id:int = response["comment_view"]["comment"]["id"]
    logging.info(f"Successfully created Lemmy comment {LEMMY_INSTANCE_URL}/comment/{lemmy_comment_id}")

    return lemmy_comment_id


def get_total_issues(github_repo: str) -> int:
    url: str = f"https://api.github.com/repos/{github_repo}"
    data: List[Dict[str, Any]] = fetch_github_data(url)
    total_issues: int = data["open_issues_count"]
    return total_issues


def fetch_issues(github_repo: str, last_updated_time: str) -> Generator[Dict[str, Any], None, None]:
    page = 1
    per_page = 100
    issues_url = (f"{GITHUB_API_BASE}/repos/{github_repo}/issues?state=all&since={last_updated_time}&per_page={per_page}")

    while True:
        page_url = f"{issues_url}&page={page}"
        issues: List[Dict[str, Any]] = fetch_github_data(page_url)

        if not issues:
            break

        for issue_dict in issues:
            yield issue_dict

        page += 1


@backoff.on_exception(
    backoff.expo,
    (requests.exceptions.RequestException, TypeError),
    max_time=MAX_BACKOFF_TIME,
    on_giveup=on_giveup,
)
def edit_lemmy_post(lemmy: Any, lemmy_post_id: int, issue: GitHubIssue) -> None:
    lemmy.post.edit(lemmy_post_id, name=issue.title, url=issue.url, body=issue.body)


def process_issues(lemmy: Any, community_id: int, github_repo: str) -> None:
    last_updated_time = get_last_updated_time()
    update_last_updated_time()
    for issue_dict in fetch_issues(github_repo, last_updated_time):
        process_issue(lemmy, community_id, github_repo, issue_dict)


def process_issue(lemmy: Any, community_id: int, github_repo: str, issue_dict: dict[str, Any]) -> None:
    issue: GitHubIssue = GitHubIssue(issue_dict, github_repo)
    res: Optional[Tuple[int, str, str, Optional[str]]] = check_updated_at(issue.number)

    if res is None:
        create_new_lemmy_post(lemmy, community_id, github_repo, issue)
    else:
        lemmy_post_id, existing_title, existing_body, updated_at = res
        if updated_at is None or has_enough_time_passed(updated_at, issue.updated_at):
            update_issue_if_needed(lemmy, lemmy_post_id, existing_title, existing_body, issue)
            process_comments(lemmy, lemmy_post_id, github_repo, issue)
            update_post_time(lemmy_post_id, issue.updated_at)


def has_enough_time_passed(old_updated_at_str: str, new_updated_at: datetime.datetime) -> bool:
    old_updated_at = datetime.datetime.strptime(old_updated_at_str, '%Y-%m-%d %H:%M:%S')
    time_difference: datetime.timedelta = new_updated_at - old_updated_at
    return time_difference >= datetime.timedelta(hours=2)


def update_issue_if_needed(lemmy: Any, lemmy_post_id: int, existing_title: str, existing_body: str, issue: GitHubIssue) -> None:
    if existing_title != issue.title or existing_body != issue.formatted_body:
        edit_lemmy_post(lemmy, lemmy_post_id, issue)


def create_new_lemmy_post(lemmy: Any, community_id: int, github_repo: str, issue: GitHubIssue) -> None:
    lemmy_post_id: Optional[int] = post_issue_to_lemmy(lemmy, community_id, issue)
    insert_post_to_db(issue, lemmy_post_id)
    process_comments(lemmy, lemmy_post_id, github_repo, issue)


def post_issue_to_lemmy(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]:
    try:
        logging.info(f"Start posting issue {issue.title} to community {community_id}")
        lemmy_post_id: Optional[int] = create_lemmy_post(lemmy, community_id, issue)
        return lemmy_post_id
    except Exception as e:
        logging.exception(f"Error posting issue {issue.title} to community {community_id}\n{e}")
        return None


def process_comments(lemmy: Any, post_id: Optional[int], github_repo: str, issue: GitHubIssue) -> None:
    try:
        logging.info(f"Posting comments from issue #{issue.number} to Lemmy post {LEMMY_INSTANCE_URL}/post/{post_id}")
        comments_url: str = f"{GITHUB_API_BASE}/repos/{github_repo}/issues/{issue.number}/comments"
        comments: Dict[str, Any] = fetch_github_data(comments_url)
        for comment_data in comments:
            if isinstance(comment_data, str):
                logging.warning(f"Skipping comment {comment_data}")
                continue
            process_comment(lemmy, github_repo, comment_data, post_id, issue.number)
    except Exception as e:
        logging.exception(f"Error posting comments to lemmy post {post_id}\n{e}")

def process_comment(lemmy: Any, github_repo: str, comment_data: Dict[str, Any], post_id: Optional[int], issue_number: int) -> None:
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    comment = GitHubComment(comment_data, issue_number)

    existing_comment_id: Optional[int] = get_existing_comment_id(cursor, comment.id)
    if existing_comment_id:
        logging.info(f"Skipping existing comment with GitHub comment ID: {comment.id}")
        return

    post_comment_to_lemmy(cursor, lemmy, github_repo, comment, post_id, issue_number)
    conn.commit()


def post_comment_to_lemmy(cursor: sqlite3.Cursor, lemmy: Any, github_repo: str, comment: GitHubComment, post_id: Optional[int], issue_number: int) -> None:
    lemmy_post_url = f"{LEMMY_INSTANCE_URL}/post/{post_id}"
    comment_url = f"{GITHUB_URL_BASE}/{github_repo}/issues/{issue_number}#issuecomment-{comment.id}"
    logging.info(f"Posting comment {comment.url} to Lemmy post {lemmy_post_url}")
    lemmy_comment_id: Optional[int] = create_lemmy_comment(lemmy, post_id, comment)

    if not lemmy_comment_id:
        logging.exception(f"Error creating Lemmy comment {lemmy_comment_id} to {lemmy_post_url} from Github comment {comment.url}")
        return

    logging.info(f"Posted comment {comment_url} to Lemmy post {lemmy_post_url}")
    insert_comment_to_database(cursor, comment.id, lemmy_comment_id, comment)


def get_existing_comment_id(cursor: sqlite3.Cursor, github_comment_id: int) -> Optional[int]:
    logging.info(f"Checking if comment with GitHub comment ID: {github_comment_id} exists")
    cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,))
    existing_comment = cursor.fetchone()
    if existing_comment is not None:
        logging.info(f"Found existing comment with GitHub comment ID: {github_comment_id}")
        existing_comment_id: int = existing_comment[0]
        return existing_comment_id
    else:
        logging.info(f"No existing comment found with GitHub comment ID: {github_comment_id}")
        return None


def fetch_issue_data(github_repo: str) -> List[Tuple[str, Optional[int]]]:
    logging.info("Fetching the GitHub issue number and Lemmy post ID for all issues")
    conn: sqlite3.Connection = sqlite3.connect(DB_FILE)
    cursor: sqlite3.Cursor = conn.cursor()
    SQL = "SELECT issue_url, lemmy_post_id FROM posts WHERE issue_url LIKE ?"
    issues_url = f"https://github.com/{github_repo}/issues/%"
    issue_data = cursor.execute(SQL, (issues_url,)).fetchall()
    logging.info(f"Fetched {len(issue_data)} issues")
    return issue_data


def process_repo(lemmy: Any, community_id: int, github_repo: str) -> None:
    try:
        logging.info(f"Processing repository {github_repo}")
        process_issues(lemmy, community_id, github_repo)
    except Exception as e:
        logging.exception(f"Error occurred while processing repository {github_repo}\n{e}")


def main() -> None:
    logging.info("Running main function")
    initialize_database()
    lemmy = initialize_lemmy_instance()
    community_id = lemmy.discover_community(LEMMY_COMMUNITY_NAME)

    for github_repo in REPOSITORIES:
        process_repo(lemmy, community_id, github_repo)


def run_periodically() -> None:
    logging.info("Starting periodic run")
    schedule.every(1).hours.do(main)

    while True:
        try:
            schedule.run_pending()
        except Exception as e:
            logging.exception(f"Error occurred during scheduling\n{e}")
        time.sleep(60)


if __name__ == "__main__":
    try:
        logging.info("Starting script")
        main()
        run_periodically()
    except Exception as e:
        logging.exception(f"Error occurred during script execution\n{e}")

requirements.txt

pythorhead==0.12.3
schedule==1.2.0
backoff==2.2.1
feedparser==6.0.10
1
1 comments