import requests, websockets, asyncio, json, re, httpx import traceback, subprocess, os, aiohttp, sys, random from bs4 import BeautifulSoup from datetime import datetime from pathlib import Path import aiofiles HERE = Path(__file__).parent username = os.environ.get("GLUEBOT_USERNAME") password = os.environ.get("GLUEBOT_PASSWORD") def get_time(): return datetime.now().timestamp() def remove_file(path): try: path.unlink() except Exception as e: print(f"(Remove) Error: {e}") traceback.print_exc() def get_extension(path): return Path(path).suffix.lower().lstrip(".") def clean_lines(s): cleaned = s cleaned = re.sub(r" *(\n+|\\n+) *", "\n", cleaned) cleaned = re.sub(r" +", " ", cleaned) return cleaned.strip() def random_int(min_val, max_val): return random.randint(min_val, max_val) def get_path(name): return str(Path(HERE, name)) def extract_range(string): pattern = re.compile(r"\s*(\d+)\s*(\d+)?\s*") match = pattern.search(string) if not match: return [0] start = match.group(1) end = match.group(2) if end is None: return [int(start)] else: return [int(start), int(end)] def clean_list(lst): return list(filter(lambda x: x != "", lst)) def string_to_number(input_string): hash_value = hash(input_string) absolute_hash = abs(hash_value) scaled_number = absolute_hash % 1000 return scaled_number headers = { "User-Agent": "gluebot", "Origin": "https://deek.chat", "DNT": "1", } url = "https://deek.chat" ws_url = "wss://deek.chat/ws" prefix = "," token = None session = None delay = 3 gifmaker = "/usr/bin/gifmaker" gm_common = "--width 350 --nogrow --output /tmp/gifmaker" cmd_date = get_time() def update_time(): global cmd_date cmd_date = get_time() def blocked(): return (get_time() - cmd_date) < delay def auth(): global token, session, headers if not username or not password: print("Missing environment variables") exit(1) data = {"name": username, "password": password, "submit": "log+in"} res = requests.post(url + "/login/submit", headers=headers, data=data, allow_redirects=False) token = re.search("(?:api_token)=[^;]+", res.headers.get("Set-Cookie")).group(0) session = re.search("(?:session_id)=[^;]+", res.headers.get("Set-Cookie")).group(0) headers["Cookie"] = token + "; " + session async def run(): async with websockets.connect(ws_url, extra_headers=headers) as ws: try: while True: message = await ws.recv() await on_message(ws, message) except KeyboardInterrupt: exit(0) except websockets.exceptions.ConnectionClosedOK: print("WebSocket connection closed") except Exception as e: print("(WebSocket) Error:", e) traceback.print_exc() async def on_message(ws, message): if blocked(): return try: data = json.loads(message) except: return if data["type"] == "message": if data["data"]["name"] == username: return text = data["data"]["text"].strip() if not text.startswith(prefix): return room_id = data["roomId"] words = text.lstrip(prefix).split(" ") cmd = words[0] args = words[1:] if cmd == "ping": update_time() await send_message(ws, "Pong!", room_id) elif cmd == "help": update_time() await send_message(ws, f"Commands: describe | wins | numbers | date | bird | shitpost", room_id) elif cmd == "describe": if len(args) >= 1: update_time() await gif_describe(args[0], room_id) elif cmd == "wins" or cmd == "win": if len(args) >= 1: update_time() await gif_wins(args[0], room_id) elif cmd == "numbers" or cmd == "number" or cmd == "nums" or cmd == "num": if len(args) > 0: arg = " ".join(clean_list(args)) else: arg = None update_time() await gif_numbers(arg, room_id) elif cmd == "date" or cmd == "data" or cmd == "time" or cmd == "datetime": update_time() await gif_date(None, room_id) elif cmd == "bird" or cmd == "birds" or cmd == "birb" or cmd == "birbs" or cmd == "brb": update_time() await random_bird(ws, room_id) elif cmd == "post" or cmd == "shitpost" or cmd == "4chan" or cmd == "anon" or cmd == "shit": update_time() await random_post(ws, room_id) async def random_bird(ws, room_id): birdfile = get_path("data/aves.txt") async with aiofiles.open(birdfile, mode="r", encoding="utf-8") as file: birds = await file.readlines() bird = random.choice(birds).strip() await send_message(ws, f".i \"{bird}\" bird", room_id) async def gif_describe(who, room_id): input_path = get_path("describe.jpg") command = [ gifmaker, gm_common, f"--input '{input_path}'", f"--words '{who} is\\n[Random] [x5]'", "--filter anyhue2 --opacity 1 --fontsize 66 --delay 700", "--padding 50 --fontcolor light2 --bgcolor black", ] await run_gifmaker(command, room_id) async def gif_wins(who, room_id): input_path = get_path("wins.gif") command = [ gifmaker, gm_common, f"--input '{input_path}'", f"--words '{who} wins a ; [repeat] ; [RANDOM] ; [repeat]' --bgcolor 0,0,0", "--bottom 20 --filter anyhue2 --framelist 11,11,33,33 --fontsize=42", ] await run_gifmaker(command, room_id) async def gif_numbers(arg, room_id): num = -1 if arg: numbers = extract_range(arg) if len(numbers) == 1: if numbers[0] > 0: num = random_int(0, numbers[0]) elif len(numbers) == 2: if numbers[0] < numbers[1]: num = random_int(numbers[0], numbers[1]) if num == -1: num = string_to_number(arg) if num == -1: num = random_int(0, 999) input_path = get_path("numbers.png") command = [ gifmaker, gm_common, f"--input '{input_path}'", f"--top 20 --words '{num}' --fontcolor 0,0,0", "--fontsize 66 --format jpg", ] await run_gifmaker(command, room_id) async def gif_date(who, room_id): input_path = get_path("time.jpg") command = [ gifmaker, gm_common, f"--input '{input_path}'", "--words 'Date: [date %A %d] ; [repeat] ; Time: [date %I:%M %p] ; [repeat]'", "--filter anyhue2 --bottom 20 --bgcolor 0,0,0 --fontsize 80", ] await run_gifmaker(command, room_id) async def random_post(ws, room_id): boards = ["g", "an", "ck", "lit", "x", "tv", "v", "fit", "k", "o"] board = random.choice(boards) try: threads_url = f"https://a.4cdn.org/{board}/threads.json" async_client = httpx.AsyncClient() threads_response = await async_client.get(threads_url) threads_response.raise_for_status() threads_json = threads_response.json() threads = threads_json[0]["threads"] # Select a random thread id = threads[random_int(0, len(threads) - 1)]["no"] thread_url = f"https://a.4cdn.org/{board}/thread/{id}.json" # Fetch the selected thread thread_response = await async_client.get(thread_url) thread_response.raise_for_status() thread_json = thread_response.json() posts = thread_json["posts"] # Select a random post post = posts[random_int(0, len(posts) - 1)] html = post.get("com", "") if not html: return # Parse HTML using BeautifulSoup soup = BeautifulSoup(html, "html.parser") # Remove elements with class "quotelink" for elem in soup.select(".quotelink"): elem.decompose() # Replace
with newline for br in soup.find_all("br"): br.replace_with("\n") # Get text content and limit to 500 characters text = soup.get_text(separator="\n")[:500].strip() text = clean_lines(text) if text: await send_message(ws, text, room_id) except Exception as err: print(f"Error: {err}") async def run_gifmaker(command, room_id): process = await asyncio.create_subprocess_shell( " ".join(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, ) stdout, stderr = await process.communicate() if process.returncode != 0: print(f"(Process) Error: {stderr.decode()}") return await upload(Path(stdout.decode().strip()), room_id) async def upload(path, room_id): if (not path.exists()) or (not path.is_file()): return cookies = { "session_id": session.split("=")[1], "api_token": token.split("=")[1], } ext = get_extension(path) if ext == "jpg": ext = "jpeg" url = "https://deek.chat/message/send/" + str(room_id) data = aiohttp.FormData() data.add_field(name="files[]", value=open(path, "rb"), \ filename=path.name, content_type=f"image/{ext}") try: async with aiohttp.ClientSession(cookies=cookies) as sess: async with sess.post(url, data=data, headers={}) as response: await response.text() except Exception as e: print(f"(Upload) Error: {e}") traceback.print_exc() remove_file(path) async def send_message(ws, text, room_id): await ws.send(json.dumps({"type": "message", "data": text, "roomId": room_id})) while True: try: auth() print("Authenticated") asyncio.run(run()) except KeyboardInterrupt: break except Exception as e: print("(Main) Error:", e) traceback.print_exc()