from datetime import datetime import os import time from zoneinfo import ZoneInfo import pyperclip import requests from dotenv import load_dotenv load_dotenv() API_TOKEN = os.environ["API_TOKEN"] EMOJI_FOLDER_ID = "a8qkigygjv" MAX_CHECK_NEW_FILE = 10 TIMEZONE = ZoneInfo("Asia/Tokyo") def admin_emoji_list(since_id: str | None = None, until_id: str | None = None): request_body = { "query": None, "limit": 100, } if since_id is not None: request_body["since_id"] = since_id if until_id is not None: request_body["until_id"] = until_id return requests.post( "https://misskey.qwjyh.net/api/admin/emoji/list", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json=request_body, ) def drive_files_upload_from_url(url: str): return requests.post( "https://misskey.qwjyh.net/api/drive/files/upload-from-url", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json={ "url": url, "folderId": EMOJI_FOLDER_ID, "comment": "auto uploaded", }, ) def check_file_exists(emoji: dict) -> bool: r = requests.get(emoji["url"]) return r.status_code == requests.codes.ok def get_latest_file(): return requests.post( "https://misskey.qwjyh.net/api/drive/files", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json={"limit": 1, "folderId": EMOJI_FOLDER_ID, "sort": "+createdAt"}, ) def admin_emoji_update(emoji_id: str, new_file_id: str): return requests.post( "https://misskey.qwjyh.net/api/admin/emoji/update", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}", }, json={ "id": emoji_id, "fileId": new_file_id, }, ) def main(): print(f"{API_TOKEN=}") print("Hello from misskey-update-emoji-files!") r = admin_emoji_list() print(f"out body={r.json()}") r.raise_for_status() for emoji in r.json(): file_exists = check_file_exists(emoji) if not file_exists: print(f"{emoji=}") pyperclip.copy(emoji['name']) before_upload = datetime.now(tz=TIMEZONE) new_url = input("enter new file url:") r = drive_files_upload_from_url(new_url) r.raise_for_status() for i_get_file_iter in range(MAX_CHECK_NEW_FILE): time.sleep(1) r = get_latest_file() print(f"out body={r.json()}") r.raise_for_status() created_at_str: str = r.json()[0]["createdAt"] new_file_created_at = datetime.fromisoformat(created_at_str) if new_file_created_at > before_upload: break if i_get_file_iter == MAX_CHECK_NEW_FILE: raise Exception("failed to confirm new file") new_file_id = r.json()[0]["id"] admin_emoji_update(emoji["id"], new_file_id) if __name__ == "__main__": main()