import os import re import time from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo import requests from dotenv import load_dotenv load_dotenv() API_TOKEN = os.environ["API_TOKEN"] EMOJI_FOLDER_ID = "a8qkigygjv" MAX_CHECK_NEW_FILE = 10 TIMEZONE = ZoneInfo("Asia/Tokyo") FILES_SRC = Path(os.environ["FILES_SRC"]) def admin_emoji_list(since_id: str | None = None, until_id: str | None = None): request_body = { "query": None, "limit": 100, } if since_id is not None: request_body["sinceId"] = since_id if until_id is not None: request_body["untilId"] = until_id return requests.post( "https://misskey.qwjyh.net/api/admin/emoji/list", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json=request_body, ) def drive_files_upload_from_url(url: str): return requests.post( "https://misskey.qwjyh.net/api/drive/files/upload-from-url", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json={ "url": url, "folderId": EMOJI_FOLDER_ID, "comment": "auto uploaded", }, ) def drive_files_create(file_path: str | os.PathLike, name: str): return requests.post( "https://misskey.qwjyh.net/api/drive/files/create", headers={ # "Content-Type": "multipart/form-data", "Authorization": f"Bearer {API_TOKEN}", }, files={ "file": open(file_path, "rb"), }, data={ "folderId": EMOJI_FOLDER_ID, "name": name, }, ) def check_file_exists(emoji: dict) -> bool: r = requests.get(emoji["url"]) return r.status_code == requests.codes.ok def get_latest_file(): return requests.post( "https://misskey.qwjyh.net/api/drive/files", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json={"limit": 1, "folderId": EMOJI_FOLDER_ID, "sort": "+createdAt"}, ) def admin_emoji_update(emoji_id: str, new_file_id: str): return requests.post( "https://misskey.qwjyh.net/api/admin/emoji/update", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json={ "id": emoji_id, "fileId": new_file_id, }, ) def main(): print(f"{API_TOKEN=}") print("Hello from misskey-update-emoji-files!") r = admin_emoji_list(until_id="9h9zv01kcr") print(f"out body={r.json()}") r.raise_for_status() for emoji in r.json(): file_exists = check_file_exists(emoji) if not file_exists: print(f"{emoji=}") m = re.search(r"[\da-f\-]+$", emoji["url"]) if m is None: raise Exception(f"Failed to get file name from URL: {emoji['url']}") source_file = FILES_SRC / m.group() if not source_file.exists(): raise Exception(f"file doesn't exist: {source_file=}") print(f"Uploading file from {source_file}") before_upload = datetime.now(tz=TIMEZONE) r = drive_files_create(source_file, emoji["name"]) print(f"{r=}") r.raise_for_status() for i_get_file_iter in range(MAX_CHECK_NEW_FILE): time.sleep(1) r = get_latest_file() print(f"out body={r.json()}") r.raise_for_status() created_at_str: str = r.json()[0]["createdAt"] new_file_created_at = datetime.fromisoformat(created_at_str) if new_file_created_at > before_upload: break if i_get_file_iter == MAX_CHECK_NEW_FILE: raise Exception("failed to confirm new file") new_file_id = r.json()[0]["id"] admin_emoji_update(emoji["id"], new_file_id) if __name__ == "__main__": main()