...
...
The instance where I was using it changed it's rules to prevent bots from posting in it and I didn't care enough to search for another instance.
https://lemm.ee/c/issue_tracker?dataType=Post&page=1&sort=Active
`config_template.py
py
LEMMY_INSTANCE_URL = "" LEMMY_COMMUNITY_NAME = "" LEMMY_USERNAME = "" LEMMY_PASSWORD = "" GITHUB_API_BASE = "https://api.github.com" GITHUB_URL_BASE = "https://github.com" REPOSITORIES = ["LemmyNet/lemmy", "LemmyNet/lemmy-ui"] DB_FILE = "lemmy_github.db" DELAY = 1 MAX_BACKOFF_TIME = 300 PERSONAL_ACCESS_TOKEN = ""
github_lemmy_issue_reposter.py
py
import backoff import datetime import logging import requests import schedule import sqlite3 import time from config import * from pythorhead import Lemmy from typing import Any, Dict, Generator, List, Optional, Tuple, Callable, TypeVar T = TypeVar('T') # "[%(levelname)s]:%(asctime)s:%(name)s [%(filename)s:%(lineno)s - %(funcName)s()] %(message)s" FORMAT = "%(message)s" logging.basicConfig( level=logging.INFO, format=FORMAT, handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()], ) def on_giveup(details: Dict[str, int]) -> None: logging.error(f"Failed to fetch issues after {details['tries']} attempts", exc_info=True) def handle_errors(message: Optional[str] = None) -> Callable[[Callable[..., T]], Callable[..., T]]: def decorator(function: Callable[..., T]) -> Callable[..., T]: def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> T: try: return function(*args, **kwargs) except Exception as e: if message: logging.exception(f"{message} - Error in {function.__name__}:\n{e}") else: logging.exception(f"Error in {function.__name__}:\n{e}") raise return wrapper return decorator class GitHubIssue: def __init__(self, issue_dict: dict[str, Any], github_repo: str) -> None: try: self.url = issue_dict["html_url"] logging.info(f"Creating issue {self.url}") self.state = issue_dict["state"] self.state_fmt = "[Closed]" if issue_dict["state"] == "closed" else "" self.repo_abbr = "[UI]" if "lemmy-ui" in github_repo else "[BE]" self.title = f"{self.state_fmt}{self.repo_abbr} {issue_dict['title']} #{issue_dict['number']}" self.title = self.title[:200] self.body = issue_dict["body"] if self.body is not None: self.body = self.body[:30000] self.user = issue_dict["user"]["login"] self.user_url = issue_dict["user"]["html_url"] self.updated_at = datetime.datetime.strptime(issue_dict["updated_at"], '%Y-%m-%dT%H:%M:%SZ') except Exception as e: log_message: str = ( f"Formatted issue:\n" f" - Repo: {github_repo}\n" f" - Issue State: {self.state}\n" f" - Repo Abbreviation: {self.repo_abbr}\n" f" - Title: {self.title}\n" f" - URL: {self.url}\n" f" - User: {self.user}\n" f" - User URL: {self.user_url}\n" f" - Updated At: {self.updated_at}\n" ) logging.exception(log_message) logging.exception(e) @property def formatted_body(self) -> str: formatted_body: str = self.body try: if self.body is not None: formatted_body = self.body.replace("\n", "\n> ") formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.number}]({self.url})*" except Exception as e: logging.exception(f"Error formatting body for {self.url}\n{e}") return formatted_body @property def number(self) -> int: return int(self.url.split("/")[-1]) class GitHubComment: def __init__(self, comment_dict: dict[str, Any], issue_number: int) -> None: self.id = comment_dict["id"] self.body = comment_dict["body"] self.user = comment_dict["user"]["login"] self.user_url = comment_dict["user"]["html_url"] self.url = comment_dict["html_url"] self.issue_number = issue_number @property def formatted_comment(self) -> str: formatted_body:str = self.body.replace("\n", "\n> ") formatted_body = f"> {formatted_body}\n> \n> *Originally posted by [{self.user}]({self.user_url}) in [#{self.issue_number}]({self.url})*" return formatted_body @handle_errors("Error initializing database") def initialize_database() -> sqlite3.Connection: logging.info("Initializing database") conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS posts ( issue_number INTEGER PRIMARY KEY, lemmy_post_id INTEGER NOT NULL UNIQUE, issue_title TEXT, issue_body TEXT, updated_at TIMESTAMP DEFAULT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS comments ( github_comment_id INTEGER PRIMARY KEY, lemmy_comment_id INTEGER NOT NULL UNIQUE, comment_user TEXT, comment_body TEXT updated_at TIMESTAMP DEFAULT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS last_updated ( id INTEGER PRIMARY KEY, last_updated_time TIMESTAMP ); """ ) conn.commit() return conn def get_last_updated_time() -> str: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute("SELECT last_updated_time FROM last_updated WHERE id = 1") last_updated_time: str = cursor.fetchone()[0] conn.close() return last_updated_time def update_last_updated_time() -> None: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() current_time = datetime.datetime.utcnow().isoformat() cursor.execute("UPDATE last_updated SET last_updated_time = ? WHERE id = 1", (current_time,)) if cursor.rowcount == 0: cursor.execute("INSERT INTO last_updated (id, last_updated_time) VALUES (1, ?)", (current_time,)) conn.commit() conn.close() logging.info("Updated last updated time") def update_post_time(post_id: int, updated_at: datetime.datetime) -> None: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() time_formatted = updated_at.strftime('%Y-%m-%d %H:%M:%S') SQL = "UPDATE posts SET updated_at = ? WHERE lemmy_post_id = ?" cursor.execute(SQL, (time_formatted, post_id)) conn.commit() conn.close() def check_updated_at(issue_number: int) -> Optional[Tuple[int, str, str, Optional[str]]]: logging.info(f"Checking last post update for {issue_number}") conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "SELECT lemmy_post_id, issue_title, issue_body, updated_at FROM posts WHERE issue_number = ?" cursor.execute(SQL, (issue_number,)) result: Tuple[int, str, str, Optional[str]] = cursor.fetchone() conn.close() if result is None: logging.info(f"No post found for {issue_number}") return None else: logging.info(f"Found post for {issue_number}") return result @handle_errors("Error initializing Lemmy instance") def initialize_lemmy_instance() -> Lemmy: logging.info("Initializing Lemmy instance") lemmy = Lemmy(LEMMY_INSTANCE_URL) logging.info(f"Initialized Lemmy instance in {LEMMY_INSTANCE_URL}") lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD) logging.info(f"Logged in to Lemmy instance with user {LEMMY_USERNAME}") return lemmy @backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def fetch_github_data(url: str) -> List[Dict[str, Any]]: global LAST_REQUEST_TIME try: headers = { "Accept": "application/vnd.github+json", "Authorization": f"Bearer {PERSONAL_ACCESS_TOKEN}", "X-GitHub-Api-Version": "2022-11-28", } time_elapsed = time.time() - LAST_REQUEST_TIME required_delay = max(0, DELAY - time_elapsed) time.sleep(required_delay) response = requests.get(url, headers=headers) LAST_REQUEST_TIME = time.time() logging.info(f"Fetched data from {url}") res: List[Dict[str, Any]] = response.json() return res except requests.exceptions.RequestException as e: logging.exception(f"Error fetching data from {url}\n{e}") raise def check_existing_post(issue_number: str) -> Optional[int]: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "SELECT lemmy_post_id FROM posts WHERE issue_number=?" cursor.execute(SQL, (issue_number,)) post_id: Optional[tuple[int]] = cursor.fetchone() if post_id: return post_id[0] return None def insert_post_to_db(issue: GitHubIssue, lemmy_post_id: Optional[int]) -> None: try: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "INSERT INTO posts (issue_number, lemmy_post_id, issue_title, issue_body, updated_at) VALUES (?, ?, ?, ?, ?)" cursor.execute(SQL, (issue.number, lemmy_post_id, issue.title, issue.formatted_body, issue.updated_at)) conn.commit() logging.info(f"Inserted new Lemmy post {lemmy_post_id} into the database") except sqlite3.Error as e: logging.exception(f"Error inserting post into the database for issue {issue.title} with url {issue.url}\n{e}") raise def insert_comment_to_database(cursor: sqlite3.Cursor, github_comment_id: int, lemmy_comment_id: int, comment: GitHubComment) -> None: try: SQL = "INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)" cursor.execute(SQL, (github_comment_id, lemmy_comment_id, comment.user, comment.formatted_comment,)) logging.info(f"Inserted comment {github_comment_id} into the database") except Exception as e: logging.exception(f"Error encountered while inserting comment {github_comment_id} to database\n{e}") @backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def create_lemmy_post(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]: lemmy_post_id: Optional[int] = None lemmy_post_id = lemmy.post.create(community_id, issue.title, url=issue.url, body=issue.body)["post_view"]["post"]["id"] lemmy_url = f"{LEMMY_INSTANCE_URL}/post/{lemmy_post_id}" logging.info(f"Posted issue {lemmy_url}") return lemmy_post_id @backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def create_lemmy_comment(lemmy: Any, post_id: Optional[int], comment: GitHubComment) -> Optional[int]: logging.info(f"Creating new Lemmy comment in {LEMMY_INSTANCE_URL}/post/{post_id}") if not post_id: logging.warning("Post ID is empty. Skipping comment creation") return None response = lemmy.comment.create(post_id, comment.formatted_comment) lemmy_comment_id:int = response["comment_view"]["comment"]["id"] logging.info(f"Successfully created Lemmy comment {LEMMY_INSTANCE_URL}/comment/{lemmy_comment_id}") return lemmy_comment_id def get_total_issues(github_repo: str) -> int: url: str = f"https://api.github.com/repos/{github_repo}" data: List[Dict[str, Any]] = fetch_github_data(url) total_issues: int = data["open_issues_count"] return total_issues def fetch_issues(github_repo: str, last_updated_time: str) -> Generator[Dict[str, Any], None, None]: page = 1 per_page = 100 issues_url = (f"{GITHUB_API_BASE}/repos/{github_repo}/issues?state=all&since={last_updated_time}&per_page={per_page}") while True: page_url = f"{issues_url}&page={page}" issues: List[Dict[str, Any]] = fetch_github_data(page_url) if not issues: break for issue_dict in issues: yield issue_dict page += 1 @backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def edit_lemmy_post(lemmy: Any, lemmy_post_id: int, issue: GitHubIssue) -> None: lemmy.post.edit(lemmy_post_id, name=issue.title, url=issue.url, body=issue.body) def process_issues(lemmy: Any, community_id: int, github_repo: str) -> None: last_updated_time = get_last_updated_time() update_last_updated_time() for issue_dict in fetch_issues(github_repo, last_updated_time): process_issue(lemmy, community_id, github_repo, issue_dict) def process_issue(lemmy: Any, community_id: int, github_repo: str, issue_dict: dict[str, Any]) -> None: issue: GitHubIssue = GitHubIssue(issue_dict, github_repo) res: Optional[Tuple[int, str, str, Optional[str]]] = check_updated_at(issue.number) if res is None: create_new_lemmy_post(lemmy, community_id, github_repo, issue) else: lemmy_post_id, existing_title, existing_body, updated_at = res if updated_at is None or has_enough_time_passed(updated_at, issue.updated_at): update_issue_if_needed(lemmy, lemmy_post_id, existing_title, existing_body, issue) process_comments(lemmy, lemmy_post_id, github_repo, issue) update_post_time(lemmy_post_id, issue.updated_at) def has_enough_time_passed(old_updated_at_str: str, new_updated_at: datetime.datetime) -> bool: old_updated_at = datetime.datetime.strptime(old_updated_at_str, '%Y-%m-%d %H:%M:%S') time_difference: datetime.timedelta = new_updated_at - old_updated_at return time_difference >= datetime.timedelta(hours=2) def update_issue_if_needed(lemmy: Any, lemmy_post_id: int, existing_title: str, existing_body: str, issue: GitHubIssue) -> None: if existing_title != issue.title or existing_body != issue.formatted_body: edit_lemmy_post(lemmy, lemmy_post_id, issue) def create_new_lemmy_post(lemmy: Any, community_id: int, github_repo: str, issue: GitHubIssue) -> None: lemmy_post_id: Optional[int] = post_issue_to_lemmy(lemmy, community_id, issue) insert_post_to_db(issue, lemmy_post_id) process_comments(lemmy, lemmy_post_id, github_repo, issue) def post_issue_to_lemmy(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]: try: logging.info(f"Start posting issue {issue.title} to community {community_id}") lemmy_post_id: Optional[int] = create_lemmy_post(lemmy, community_id, issue) return lemmy_post_id except Exception as e: logging.exception(f"Error posting issue {issue.title} to community {community_id}\n{e}") return None def process_comments(lemmy: Any, post_id: Optional[int], github_repo: str, issue: GitHubIssue) -> None: try: logging.info(f"Posting comments from issue #{issue.number} to Lemmy post {LEMMY_INSTANCE_URL}/post/{post_id}") comments_url: str = f"{GITHUB_API_BASE}/repos/{github_repo}/issues/{issue.number}/comments" comments: Dict[str, Any] = fetch_github_data(comments_url) for comment_data in comments: if isinstance(comment_data, str): logging.warning(f"Skipping comment {comment_data}") continue process_comment(lemmy, github_repo, comment_data, post_id, issue.number) except Exception as e: logging.exception(f"Error posting comments to lemmy post {post_id}\n{e}") def process_comment(lemmy: Any, github_repo: str, comment_data: Dict[str, Any], post_id: Optional[int], issue_number: int) -> None: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() comment = GitHubComment(comment_data, issue_number) existing_comment_id: Optional[int] = get_existing_comment_id(cursor, comment.id) if existing_comment_id: logging.info(f"Skipping existing comment with GitHub comment ID: {comment.id}") return post_comment_to_lemmy(cursor, lemmy, github_repo, comment, post_id, issue_number) conn.commit() def post_comment_to_lemmy(cursor: sqlite3.Cursor, lemmy: Any, github_repo: str, comment: GitHubComment, post_id: Optional[int], issue_number: int) -> None: lemmy_post_url = f"{LEMMY_INSTANCE_URL}/post/{post_id}" comment_url = f"{GITHUB_URL_BASE}/{github_repo}/issues/{issue_number}#issuecomment-{comment.id}" logging.info(f"Posting comment {comment.url} to Lemmy post {lemmy_post_url}") lemmy_comment_id: Optional[int] = create_lemmy_comment(lemmy, post_id, comment) if not lemmy_comment_id: logging.exception(f"Error creating Lemmy comment {lemmy_comment_id} to {lemmy_post_url} from Github comment {comment.url}") return logging.info(f"Posted comment {comment_url} to Lemmy post {lemmy_post_url}") insert_comment_to_database(cursor, comment.id, lemmy_comment_id, comment) def get_existing_comment_id(cursor: sqlite3.Cursor, github_comment_id: int) -> Optional[int]: logging.info(f"Checking if comment with GitHub comment ID: {github_comment_id} exists") cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,)) existing_comment = cursor.fetchone() if existing_comment is not None: logging.info(f"Found existing comment with GitHub comment ID: {github_comment_id}") existing_comment_id: int = existing_comment[0] return existing_comment_id else: logging.info(f"No existing comment found with GitHub comment ID: {github_comment_id}") return None def fetch_issue_data(github_repo: str) -> List[Tuple[str, Optional[int]]]: logging.info("Fetching the GitHub issue number and Lemmy post ID for all issues") conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "SELECT issue_url, lemmy_post_id FROM posts WHERE issue_url LIKE ?" issues_url = f"https://github.com/{github_repo}/issues/%" issue_data = cursor.execute(SQL, (issues_url,)).fetchall() logging.info(f"Fetched {len(issue_data)} issues") return issue_data def process_repo(lemmy: Any, community_id: int, github_repo: str) -> None: try: logging.info(f"Processing repository {github_repo}") process_issues(lemmy, community_id, github_repo) except Exception as e: logging.exception(f"Error occurred while processing repository {github_repo}\n{e}") def main() -> None: logging.info("Running main function") initialize_database() lemmy = initialize_lemmy_instance() community_id = lemmy.discover_community(LEMMY_COMMUNITY_NAME) for github_repo in REPOSITORIES: process_repo(lemmy, community_id, github_repo) def run_periodically() -> None: logging.info("Starting periodic run") schedule.every(1).hours.do(main) while True: try: schedule.run_pending() except Exception as e: logging.exception(f"Error occurred during scheduling\n{e}") time.sleep(60) if __name__ == "__main__": try: logging.info("Starting script") main() run_periodically() except Exception as e: logging.exception(f"Error occurred during script execution\n{e}")
requirements.txt
pythorhead==0.12.3 schedule==1.2.0 backoff==2.2.1 feedparser==6.0.10