diff --git a/main.py b/main.py index 5592425..5b67243 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,14 @@ -import requests, websockets, asyncio, json, re, httpx -import traceback, subprocess, os, aiohttp, sys, random +import requests +import websockets +import asyncio +import json +import re +import httpx +import traceback +import subprocess +import os +import aiohttp +import random from bs4 import BeautifulSoup from datetime import datetime from pathlib import Path @@ -9,59 +18,68 @@ HERE = Path(__file__).parent username = os.environ.get("GLUEBOT_USERNAME") password = os.environ.get("GLUEBOT_PASSWORD") + def get_time(): - return datetime.now().timestamp() + return datetime.now().timestamp() + def remove_file(path): - try: - path.unlink() - except Exception as e: - print(f"(Remove) Error: {e}") - traceback.print_exc() + try: + path.unlink() + except Exception as e: + print(f"(Remove) Error: {e}") + traceback.print_exc() + def get_extension(path): - return Path(path).suffix.lower().lstrip(".") + return Path(path).suffix.lower().lstrip(".") + def clean_lines(s): - cleaned = s - cleaned = re.sub(r" *(\n+|\\n+) *", "\n", cleaned) - cleaned = re.sub(r" +", " ", cleaned) - return cleaned.strip() + cleaned = s + cleaned = re.sub(r" *(\n+|\\n+) *", "\n", cleaned) + cleaned = re.sub(r" +", " ", cleaned) + return cleaned.strip() + def random_int(min_val, max_val): - return random.randint(min_val, max_val) + return random.randint(min_val, max_val) + def get_path(name): - return str(Path(HERE, name)) + return str(Path(HERE, name)) + def extract_range(string): - pattern = re.compile(r"\s*(\d+)\s*(\d+)?\s*") - match = pattern.search(string) + pattern = r"(?:(?P-?\d+)(?:\s*(.+?)\s*(?P-?\d+))?)?" + match = re.search(pattern, string) + num1 = None + num2 = None - if not match: - return [0] + if match["number1"]: + num1 = int(match["number1"]) - start = match.group(1) - end = match.group(2) + if match["number2"]: + num2 = int(match["number2"]) + + return [num1, num2] - if end is None: - return [int(start)] - else: - return [int(start), int(end)] def clean_list(lst): - return list(filter(lambda x: x != "", lst)) + return list(filter(lambda x: x != "", lst)) + def string_to_number(input_string): - hash_value = hash(input_string) - absolute_hash = abs(hash_value) - scaled_number = absolute_hash % 1000 - return scaled_number + hash_value = hash(input_string) + absolute_hash = abs(hash_value) + scaled_number = absolute_hash % 1000 + return scaled_number + headers = { - "User-Agent": "gluebot", - "Origin": "https://deek.chat", - "DNT": "1", + "User-Agent": "gluebot", + "Origin": "https://deek.chat", + "DNT": "1", } url = "https://deek.chat" @@ -76,283 +94,301 @@ gm_common = "--width 350 --nogrow --output /tmp/gifmaker" cmd_date = get_time() + def update_time(): - global cmd_date - cmd_date = get_time() + global cmd_date + cmd_date = get_time() + def blocked(): - return (get_time() - cmd_date) < delay + return (get_time() - cmd_date) < delay + def auth(): - global token, session, headers + global token, session, headers - if not username or not password: - print("Missing environment variables") - exit(1) + if not username or not password: + print("Missing environment variables") + exit(1) + + data = {"name": username, "password": password, "submit": "log+in"} + res = requests.post(url + "/login/submit", headers=headers, + data=data, allow_redirects=False) + token = re.search("(?:api_token)=[^;]+", + res.headers.get("Set-Cookie")).group(0) + session = re.search( + "(?:session_id)=[^;]+", res.headers.get("Set-Cookie")).group(0) + headers["Cookie"] = token + "; " + session - data = {"name": username, "password": password, "submit": "log+in"} - res = requests.post(url + "/login/submit", headers=headers, data=data, allow_redirects=False) - token = re.search("(?:api_token)=[^;]+", res.headers.get("Set-Cookie")).group(0) - session = re.search("(?:session_id)=[^;]+", res.headers.get("Set-Cookie")).group(0) - headers["Cookie"] = token + "; " + session async def run(): - async with websockets.connect(ws_url, extra_headers=headers) as ws: - try: - while True: - message = await ws.recv() - await on_message(ws, message) - except KeyboardInterrupt: - exit(0) - except websockets.exceptions.ConnectionClosedOK: - print("WebSocket connection closed") - except Exception as e: - print("(WebSocket) Error:", e) - traceback.print_exc() + async with websockets.connect(ws_url, extra_headers=headers) as ws: + try: + while True: + message = await ws.recv() + await on_message(ws, message) + except KeyboardInterrupt: + exit(0) + except websockets.exceptions.ConnectionClosedOK: + print("WebSocket connection closed") + except Exception as e: + print("(WebSocket) Error:", e) + traceback.print_exc() + async def on_message(ws, message): - if blocked(): return + if blocked(): + return - try: - data = json.loads(message) - except: - return + try: + data = json.loads(message) + except: + return - if data["type"] == "message": - if data["data"]["name"] == username: - return + if data["type"] == "message": + if data["data"]["name"] == username: + return - text = data["data"]["text"].strip() + text = data["data"]["text"].strip() - if not text.startswith(prefix): - return + if not text.startswith(prefix): + return - room_id = data["roomId"] - words = text.lstrip(prefix).split(" ") - cmd = words[0] - args = words[1:] + room_id = data["roomId"] + words = text.lstrip(prefix).split(" ") + cmd = words[0] + args = words[1:] - if cmd == "ping": - update_time() - await send_message(ws, "Pong!", room_id) + if cmd == "ping": + update_time() + await send_message(ws, "Pong!", room_id) - elif cmd == "help": - update_time() - await send_message(ws, f"Commands: describe | wins | numbers | date | bird | shitpost", room_id) + elif cmd == "help": + update_time() + await send_message(ws, f"Commands: describe | wins | numbers | date | bird | shitpost", room_id) - elif cmd == "describe": - if len(args) >= 1: - update_time() - await gif_describe(args[0], room_id) + elif cmd == "describe": + if len(args) >= 1: + update_time() + await gif_describe(args[0], room_id) - elif cmd == "wins" or cmd == "win": - if len(args) >= 1: - update_time() - await gif_wins(args[0], room_id) + elif cmd == "wins" or cmd == "win": + if len(args) >= 1: + update_time() + await gif_wins(args[0], room_id) - elif cmd == "numbers" or cmd == "number" or cmd == "nums" or cmd == "num": - if len(args) > 0: - arg = " ".join(clean_list(args)) - else: - arg = None + elif cmd == "numbers" or cmd == "number" or cmd == "nums" or cmd == "num": + if len(args) > 0: + arg = " ".join(clean_list(args)) + else: + arg = None - update_time() - await gif_numbers(arg, room_id) + update_time() + await gif_numbers(arg, room_id) - elif cmd == "date" or cmd == "data" or cmd == "time" or cmd == "datetime": - update_time() - await gif_date(None, room_id) + elif cmd == "date" or cmd == "data" or cmd == "time" or cmd == "datetime": + update_time() + await gif_date(None, room_id) - elif cmd == "bird" or cmd == "birds" or cmd == "birb" or cmd == "birbs" or cmd == "brb": - update_time() - await random_bird(ws, room_id) + elif cmd == "bird" or cmd == "birds" or cmd == "birb" or cmd == "birbs" or cmd == "brb": + update_time() + await random_bird(ws, room_id) + + elif cmd == "post" or cmd == "shitpost" or cmd == "4chan" or cmd == "anon" or cmd == "shit": + update_time() + await random_post(ws, room_id) - elif cmd == "post" or cmd == "shitpost" or cmd == "4chan" or cmd == "anon" or cmd == "shit": - update_time() - await random_post(ws, room_id) async def random_bird(ws, room_id): - birdfile = get_path("data/aves.txt") + birdfile = get_path("data/aves.txt") + + async with aiofiles.open(birdfile, mode="r", encoding="utf-8") as file: + birds = await file.readlines() + bird = random.choice(birds).strip() + await send_message(ws, f".i \"{bird}\" bird", room_id) - async with aiofiles.open(birdfile, mode="r", encoding="utf-8") as file: - birds = await file.readlines() - bird = random.choice(birds).strip() - await send_message(ws, f".i \"{bird}\" bird", room_id) async def gif_describe(who, room_id): - input_path = get_path("describe.jpg") + input_path = get_path("describe.jpg") - command = [ - gifmaker, - gm_common, - f"--input '{input_path}'", - f"--words '{who} is\\n[Random] [x5]'", - "--filter anyhue2 --opacity 1 --fontsize 66 --delay 700", - "--padding 50 --fontcolor light2 --bgcolor black", - ] + command = [ + gifmaker, + gm_common, + f"--input '{input_path}'", + f"--words '{who} is\\n[Random] [x5]'", + "--filter anyhue2 --opacity 1 --fontsize 66 --delay 700", + "--padding 50 --fontcolor light2 --bgcolor black", + ] + + await run_gifmaker(command, room_id) - await run_gifmaker(command, room_id) async def gif_wins(who, room_id): - input_path = get_path("wins.gif") + input_path = get_path("wins.gif") - command = [ - gifmaker, - gm_common, - f"--input '{input_path}'", - f"--words '{who} wins a ; [repeat] ; [RANDOM] ; [repeat]' --bgcolor 0,0,0", - "--bottom 20 --filter anyhue2 --framelist 11,11,33,33 --fontsize=42", - ] + command = [ + gifmaker, + gm_common, + f"--input '{input_path}'", + f"--words '{who} wins a ; [repeat] ; [RANDOM] ; [repeat]' --bgcolor 0,0,0", + "--bottom 20 --filter anyhue2 --framelist 11,11,33,33 --fontsize=42", + ] + + await run_gifmaker(command, room_id) - await run_gifmaker(command, room_id) async def gif_numbers(arg, room_id): - num = -1 + num = -1 - if arg: - numbers = extract_range(arg) + if arg: + nums = extract_range(arg) - if len(numbers) == 1: - if numbers[0] > 0: - num = random_int(0, numbers[0]) - elif len(numbers) == 2: - if numbers[0] < numbers[1]: - num = random_int(numbers[0], numbers[1]) + if nums[0] is not None: + if nums[1] is not None: + num = random_int(nums[0], nums[1]) + else: + num = random_int(0, nums[0]) - if num == -1: - num = string_to_number(arg) + if num == -1: + num = string_to_number(arg) - if num == -1: - num = random_int(0, 999) + if num == -1: + num = random_int(0, 999) - input_path = get_path("numbers.png") + input_path = get_path("numbers.png") - command = [ - gifmaker, - gm_common, - f"--input '{input_path}'", - f"--top 20 --words '{num}' --fontcolor 0,0,0", - "--fontsize 66 --format jpg", - ] + command = [ + gifmaker, + gm_common, + f"--input '{input_path}'", + f"--top 20 --words '{num}' --fontcolor 0,0,0", + "--fontsize 66 --format jpg", + ] + + await run_gifmaker(command, room_id) - await run_gifmaker(command, room_id) async def gif_date(who, room_id): - input_path = get_path("time.jpg") + input_path = get_path("time.jpg") - command = [ - gifmaker, - gm_common, - f"--input '{input_path}'", - "--words 'Date: [date %A %d] ; [repeat] ; Time: [date %I:%M %p] ; [repeat]'", - "--filter anyhue2 --bottom 20 --bgcolor 0,0,0 --fontsize 80", - ] + command = [ + gifmaker, + gm_common, + f"--input '{input_path}'", + "--words 'Date: [date %A %d] ; [repeat] ; Time: [date %I:%M %p] ; [repeat]'", + "--filter anyhue2 --bottom 20 --bgcolor 0,0,0 --fontsize 80", + ] + + await run_gifmaker(command, room_id) - await run_gifmaker(command, room_id) async def random_post(ws, room_id): - boards = ["g", "an", "ck", "lit", "x", "tv", "v", "fit", "k", "o"] - board = random.choice(boards) + boards = ["g", "an", "ck", "lit", "x", "tv", "v", "fit", "k", "o"] + board = random.choice(boards) - try: - threads_url = f"https://a.4cdn.org/{board}/threads.json" - async_client = httpx.AsyncClient() - threads_response = await async_client.get(threads_url) - threads_response.raise_for_status() - threads_json = threads_response.json() - threads = threads_json[0]["threads"] + try: + threads_url = f"https://a.4cdn.org/{board}/threads.json" + async_client = httpx.AsyncClient() + threads_response = await async_client.get(threads_url) + threads_response.raise_for_status() + threads_json = threads_response.json() + threads = threads_json[0]["threads"] - # Select a random thread - id = threads[random_int(0, len(threads) - 1)]["no"] - thread_url = f"https://a.4cdn.org/{board}/thread/{id}.json" + # Select a random thread + id = threads[random_int(0, len(threads) - 1)]["no"] + thread_url = f"https://a.4cdn.org/{board}/thread/{id}.json" - # Fetch the selected thread - thread_response = await async_client.get(thread_url) - thread_response.raise_for_status() - thread_json = thread_response.json() - posts = thread_json["posts"] + # Fetch the selected thread + thread_response = await async_client.get(thread_url) + thread_response.raise_for_status() + thread_json = thread_response.json() + posts = thread_json["posts"] - # Select a random post - post = posts[random_int(0, len(posts) - 1)] - html = post.get("com", "") - if not html: return + # Select a random post + post = posts[random_int(0, len(posts) - 1)] + html = post.get("com", "") + if not html: + return - # Parse HTML using BeautifulSoup - soup = BeautifulSoup(html, "html.parser") + # Parse HTML using BeautifulSoup + soup = BeautifulSoup(html, "html.parser") - # Remove elements with class "quotelink" - for elem in soup.select(".quotelink"): - elem.decompose() + # Remove elements with class "quotelink" + for elem in soup.select(".quotelink"): + elem.decompose() - # Replace
with newline - for br in soup.find_all("br"): - br.replace_with("\n") + # Replace
with newline + for br in soup.find_all("br"): + br.replace_with("\n") - # Get text content and limit to 500 characters - text = soup.get_text(separator="\n")[:500].strip() - text = clean_lines(text) + # Get text content and limit to 500 characters + text = soup.get_text(separator="\n")[:500].strip() + text = clean_lines(text) - if text: - await send_message(ws, text, room_id) + if text: + await send_message(ws, text, room_id) + + except Exception as err: + print(f"Error: {err}") - except Exception as err: - print(f"Error: {err}") async def run_gifmaker(command, room_id): - process = await asyncio.create_subprocess_shell( - " ".join(command), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - ) + process = await asyncio.create_subprocess_shell( + " ".join(command), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + ) - stdout, stderr = await process.communicate() + stdout, stderr = await process.communicate() - if process.returncode != 0: - print(f"(Process) Error: {stderr.decode()}") - return + if process.returncode != 0: + print(f"(Process) Error: {stderr.decode()}") + return + + await upload(Path(stdout.decode().strip()), room_id) - await upload(Path(stdout.decode().strip()), room_id) async def upload(path, room_id): - if (not path.exists()) or (not path.is_file()): - return + if (not path.exists()) or (not path.is_file()): + return - cookies = { - "session_id": session.split("=")[1], - "api_token": token.split("=")[1], - } + cookies = { + "session_id": session.split("=")[1], + "api_token": token.split("=")[1], + } - ext = get_extension(path) + ext = get_extension(path) - if ext == "jpg": - ext = "jpeg" + if ext == "jpg": + ext = "jpeg" - url = "https://deek.chat/message/send/" + str(room_id) - data = aiohttp.FormData() - data.add_field(name="files[]", value=open(path, "rb"), \ - filename=path.name, content_type=f"image/{ext}") + url = "https://deek.chat/message/send/" + str(room_id) + data = aiohttp.FormData() + data.add_field(name="files[]", value=open(path, "rb"), + filename=path.name, content_type=f"image/{ext}") - try: - async with aiohttp.ClientSession(cookies=cookies) as sess: - async with sess.post(url, data=data, headers={}) as response: - await response.text() - except Exception as e: - print(f"(Upload) Error: {e}") - traceback.print_exc() + try: + async with aiohttp.ClientSession(cookies=cookies) as sess: + async with sess.post(url, data=data, headers={}) as response: + await response.text() + except Exception as e: + print(f"(Upload) Error: {e}") + traceback.print_exc() + + remove_file(path) - remove_file(path) async def send_message(ws, text, room_id): - await ws.send(json.dumps({"type": "message", "data": text, "roomId": room_id})) + await ws.send(json.dumps({"type": "message", "data": text, "roomId": room_id})) while True: - try: - auth() - print("Authenticated") - asyncio.run(run()) - except KeyboardInterrupt: - break - except Exception as e: - print("(Main) Error:", e) - traceback.print_exc() \ No newline at end of file + try: + auth() + print("Authenticated") + asyncio.run(run()) + except KeyboardInterrupt: + break + except Exception as e: + print("(Main) Error:", e) + traceback.print_exc()