This commit is contained in:
Auric Vente 2024-02-22 20:22:16 -06:00
parent 6b2010fa16
commit 96df5b5b7a
1 changed files with 271 additions and 235 deletions

506
main.py
View File

@ -1,5 +1,14 @@
import requests, websockets, asyncio, json, re, httpx import requests
import traceback, subprocess, os, aiohttp, sys, random import websockets
import asyncio
import json
import re
import httpx
import traceback
import subprocess
import os
import aiohttp
import random
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@ -9,59 +18,68 @@ HERE = Path(__file__).parent
username = os.environ.get("GLUEBOT_USERNAME") username = os.environ.get("GLUEBOT_USERNAME")
password = os.environ.get("GLUEBOT_PASSWORD") password = os.environ.get("GLUEBOT_PASSWORD")
def get_time(): def get_time():
return datetime.now().timestamp() return datetime.now().timestamp()
def remove_file(path): def remove_file(path):
try: try:
path.unlink() path.unlink()
except Exception as e: except Exception as e:
print(f"(Remove) Error: {e}") print(f"(Remove) Error: {e}")
traceback.print_exc() traceback.print_exc()
def get_extension(path): def get_extension(path):
return Path(path).suffix.lower().lstrip(".") return Path(path).suffix.lower().lstrip(".")
def clean_lines(s): def clean_lines(s):
cleaned = s cleaned = s
cleaned = re.sub(r" *(\n+|\\n+) *", "\n", cleaned) cleaned = re.sub(r" *(\n+|\\n+) *", "\n", cleaned)
cleaned = re.sub(r" +", " ", cleaned) cleaned = re.sub(r" +", " ", cleaned)
return cleaned.strip() return cleaned.strip()
def random_int(min_val, max_val): def random_int(min_val, max_val):
return random.randint(min_val, max_val) return random.randint(min_val, max_val)
def get_path(name): def get_path(name):
return str(Path(HERE, name)) return str(Path(HERE, name))
def extract_range(string): def extract_range(string):
pattern = re.compile(r"\s*(\d+)\s*(\d+)?\s*") pattern = r"(?:(?P<number1>-?\d+)(?:\s*(.+?)\s*(?P<number2>-?\d+))?)?"
match = pattern.search(string) match = re.search(pattern, string)
num1 = None
num2 = None
if not match: if match["number1"]:
return [0] num1 = int(match["number1"])
start = match.group(1) if match["number2"]:
end = match.group(2) num2 = int(match["number2"])
return [num1, num2]
if end is None:
return [int(start)]
else:
return [int(start), int(end)]
def clean_list(lst): def clean_list(lst):
return list(filter(lambda x: x != "", lst)) return list(filter(lambda x: x != "", lst))
def string_to_number(input_string): def string_to_number(input_string):
hash_value = hash(input_string) hash_value = hash(input_string)
absolute_hash = abs(hash_value) absolute_hash = abs(hash_value)
scaled_number = absolute_hash % 1000 scaled_number = absolute_hash % 1000
return scaled_number return scaled_number
headers = { headers = {
"User-Agent": "gluebot", "User-Agent": "gluebot",
"Origin": "https://deek.chat", "Origin": "https://deek.chat",
"DNT": "1", "DNT": "1",
} }
url = "https://deek.chat" url = "https://deek.chat"
@ -76,283 +94,301 @@ gm_common = "--width 350 --nogrow --output /tmp/gifmaker"
cmd_date = get_time() cmd_date = get_time()
def update_time(): def update_time():
global cmd_date global cmd_date
cmd_date = get_time() cmd_date = get_time()
def blocked(): def blocked():
return (get_time() - cmd_date) < delay return (get_time() - cmd_date) < delay
def auth(): def auth():
global token, session, headers global token, session, headers
if not username or not password: if not username or not password:
print("Missing environment variables") print("Missing environment variables")
exit(1) exit(1)
data = {"name": username, "password": password, "submit": "log+in"}
res = requests.post(url + "/login/submit", headers=headers,
data=data, allow_redirects=False)
token = re.search("(?:api_token)=[^;]+",
res.headers.get("Set-Cookie")).group(0)
session = re.search(
"(?:session_id)=[^;]+", res.headers.get("Set-Cookie")).group(0)
headers["Cookie"] = token + "; " + session
data = {"name": username, "password": password, "submit": "log+in"}
res = requests.post(url + "/login/submit", headers=headers, data=data, allow_redirects=False)
token = re.search("(?:api_token)=[^;]+", res.headers.get("Set-Cookie")).group(0)
session = re.search("(?:session_id)=[^;]+", res.headers.get("Set-Cookie")).group(0)
headers["Cookie"] = token + "; " + session
async def run(): async def run():
async with websockets.connect(ws_url, extra_headers=headers) as ws: async with websockets.connect(ws_url, extra_headers=headers) as ws:
try: try:
while True: while True:
message = await ws.recv() message = await ws.recv()
await on_message(ws, message) await on_message(ws, message)
except KeyboardInterrupt: except KeyboardInterrupt:
exit(0) exit(0)
except websockets.exceptions.ConnectionClosedOK: except websockets.exceptions.ConnectionClosedOK:
print("WebSocket connection closed") print("WebSocket connection closed")
except Exception as e: except Exception as e:
print("(WebSocket) Error:", e) print("(WebSocket) Error:", e)
traceback.print_exc() traceback.print_exc()
async def on_message(ws, message): async def on_message(ws, message):
if blocked(): return if blocked():
return
try: try:
data = json.loads(message) data = json.loads(message)
except: except:
return return
if data["type"] == "message": if data["type"] == "message":
if data["data"]["name"] == username: if data["data"]["name"] == username:
return return
text = data["data"]["text"].strip() text = data["data"]["text"].strip()
if not text.startswith(prefix): if not text.startswith(prefix):
return return
room_id = data["roomId"] room_id = data["roomId"]
words = text.lstrip(prefix).split(" ") words = text.lstrip(prefix).split(" ")
cmd = words[0] cmd = words[0]
args = words[1:] args = words[1:]
if cmd == "ping": if cmd == "ping":
update_time() update_time()
await send_message(ws, "Pong!", room_id) await send_message(ws, "Pong!", room_id)
elif cmd == "help": elif cmd == "help":
update_time() update_time()
await send_message(ws, f"Commands: describe | wins | numbers | date | bird | shitpost", room_id) await send_message(ws, f"Commands: describe | wins | numbers | date | bird | shitpost", room_id)
elif cmd == "describe": elif cmd == "describe":
if len(args) >= 1: if len(args) >= 1:
update_time() update_time()
await gif_describe(args[0], room_id) await gif_describe(args[0], room_id)
elif cmd == "wins" or cmd == "win": elif cmd == "wins" or cmd == "win":
if len(args) >= 1: if len(args) >= 1:
update_time() update_time()
await gif_wins(args[0], room_id) await gif_wins(args[0], room_id)
elif cmd == "numbers" or cmd == "number" or cmd == "nums" or cmd == "num": elif cmd == "numbers" or cmd == "number" or cmd == "nums" or cmd == "num":
if len(args) > 0: if len(args) > 0:
arg = " ".join(clean_list(args)) arg = " ".join(clean_list(args))
else: else:
arg = None arg = None
update_time() update_time()
await gif_numbers(arg, room_id) await gif_numbers(arg, room_id)
elif cmd == "date" or cmd == "data" or cmd == "time" or cmd == "datetime": elif cmd == "date" or cmd == "data" or cmd == "time" or cmd == "datetime":
update_time() update_time()
await gif_date(None, room_id) await gif_date(None, room_id)
elif cmd == "bird" or cmd == "birds" or cmd == "birb" or cmd == "birbs" or cmd == "brb": elif cmd == "bird" or cmd == "birds" or cmd == "birb" or cmd == "birbs" or cmd == "brb":
update_time() update_time()
await random_bird(ws, room_id) await random_bird(ws, room_id)
elif cmd == "post" or cmd == "shitpost" or cmd == "4chan" or cmd == "anon" or cmd == "shit":
update_time()
await random_post(ws, room_id)
elif cmd == "post" or cmd == "shitpost" or cmd == "4chan" or cmd == "anon" or cmd == "shit":
update_time()
await random_post(ws, room_id)
async def random_bird(ws, room_id): async def random_bird(ws, room_id):
birdfile = get_path("data/aves.txt") birdfile = get_path("data/aves.txt")
async with aiofiles.open(birdfile, mode="r", encoding="utf-8") as file:
birds = await file.readlines()
bird = random.choice(birds).strip()
await send_message(ws, f".i \"{bird}\" bird", room_id)
async with aiofiles.open(birdfile, mode="r", encoding="utf-8") as file:
birds = await file.readlines()
bird = random.choice(birds).strip()
await send_message(ws, f".i \"{bird}\" bird", room_id)
async def gif_describe(who, room_id): async def gif_describe(who, room_id):
input_path = get_path("describe.jpg") input_path = get_path("describe.jpg")
command = [ command = [
gifmaker, gifmaker,
gm_common, gm_common,
f"--input '{input_path}'", f"--input '{input_path}'",
f"--words '{who} is\\n[Random] [x5]'", f"--words '{who} is\\n[Random] [x5]'",
"--filter anyhue2 --opacity 1 --fontsize 66 --delay 700", "--filter anyhue2 --opacity 1 --fontsize 66 --delay 700",
"--padding 50 --fontcolor light2 --bgcolor black", "--padding 50 --fontcolor light2 --bgcolor black",
] ]
await run_gifmaker(command, room_id)
await run_gifmaker(command, room_id)
async def gif_wins(who, room_id): async def gif_wins(who, room_id):
input_path = get_path("wins.gif") input_path = get_path("wins.gif")
command = [ command = [
gifmaker, gifmaker,
gm_common, gm_common,
f"--input '{input_path}'", f"--input '{input_path}'",
f"--words '{who} wins a ; [repeat] ; [RANDOM] ; [repeat]' --bgcolor 0,0,0", f"--words '{who} wins a ; [repeat] ; [RANDOM] ; [repeat]' --bgcolor 0,0,0",
"--bottom 20 --filter anyhue2 --framelist 11,11,33,33 --fontsize=42", "--bottom 20 --filter anyhue2 --framelist 11,11,33,33 --fontsize=42",
] ]
await run_gifmaker(command, room_id)
await run_gifmaker(command, room_id)
async def gif_numbers(arg, room_id): async def gif_numbers(arg, room_id):
num = -1 num = -1
if arg: if arg:
numbers = extract_range(arg) nums = extract_range(arg)
if len(numbers) == 1: if nums[0] is not None:
if numbers[0] > 0: if nums[1] is not None:
num = random_int(0, numbers[0]) num = random_int(nums[0], nums[1])
elif len(numbers) == 2: else:
if numbers[0] < numbers[1]: num = random_int(0, nums[0])
num = random_int(numbers[0], numbers[1])
if num == -1: if num == -1:
num = string_to_number(arg) num = string_to_number(arg)
if num == -1: if num == -1:
num = random_int(0, 999) num = random_int(0, 999)
input_path = get_path("numbers.png") input_path = get_path("numbers.png")
command = [ command = [
gifmaker, gifmaker,
gm_common, gm_common,
f"--input '{input_path}'", f"--input '{input_path}'",
f"--top 20 --words '{num}' --fontcolor 0,0,0", f"--top 20 --words '{num}' --fontcolor 0,0,0",
"--fontsize 66 --format jpg", "--fontsize 66 --format jpg",
] ]
await run_gifmaker(command, room_id)
await run_gifmaker(command, room_id)
async def gif_date(who, room_id): async def gif_date(who, room_id):
input_path = get_path("time.jpg") input_path = get_path("time.jpg")
command = [ command = [
gifmaker, gifmaker,
gm_common, gm_common,
f"--input '{input_path}'", f"--input '{input_path}'",
"--words 'Date: [date %A %d] ; [repeat] ; Time: [date %I:%M %p] ; [repeat]'", "--words 'Date: [date %A %d] ; [repeat] ; Time: [date %I:%M %p] ; [repeat]'",
"--filter anyhue2 --bottom 20 --bgcolor 0,0,0 --fontsize 80", "--filter anyhue2 --bottom 20 --bgcolor 0,0,0 --fontsize 80",
] ]
await run_gifmaker(command, room_id)
await run_gifmaker(command, room_id)
async def random_post(ws, room_id): async def random_post(ws, room_id):
boards = ["g", "an", "ck", "lit", "x", "tv", "v", "fit", "k", "o"] boards = ["g", "an", "ck", "lit", "x", "tv", "v", "fit", "k", "o"]
board = random.choice(boards) board = random.choice(boards)
try: try:
threads_url = f"https://a.4cdn.org/{board}/threads.json" threads_url = f"https://a.4cdn.org/{board}/threads.json"
async_client = httpx.AsyncClient() async_client = httpx.AsyncClient()
threads_response = await async_client.get(threads_url) threads_response = await async_client.get(threads_url)
threads_response.raise_for_status() threads_response.raise_for_status()
threads_json = threads_response.json() threads_json = threads_response.json()
threads = threads_json[0]["threads"] threads = threads_json[0]["threads"]
# Select a random thread # Select a random thread
id = threads[random_int(0, len(threads) - 1)]["no"] id = threads[random_int(0, len(threads) - 1)]["no"]
thread_url = f"https://a.4cdn.org/{board}/thread/{id}.json" thread_url = f"https://a.4cdn.org/{board}/thread/{id}.json"
# Fetch the selected thread # Fetch the selected thread
thread_response = await async_client.get(thread_url) thread_response = await async_client.get(thread_url)
thread_response.raise_for_status() thread_response.raise_for_status()
thread_json = thread_response.json() thread_json = thread_response.json()
posts = thread_json["posts"] posts = thread_json["posts"]
# Select a random post # Select a random post
post = posts[random_int(0, len(posts) - 1)] post = posts[random_int(0, len(posts) - 1)]
html = post.get("com", "") html = post.get("com", "")
if not html: return if not html:
return
# Parse HTML using BeautifulSoup # Parse HTML using BeautifulSoup
soup = BeautifulSoup(html, "html.parser") soup = BeautifulSoup(html, "html.parser")
# Remove elements with class "quotelink" # Remove elements with class "quotelink"
for elem in soup.select(".quotelink"): for elem in soup.select(".quotelink"):
elem.decompose() elem.decompose()
# Replace <br> with newline # Replace <br> with newline
for br in soup.find_all("br"): for br in soup.find_all("br"):
br.replace_with("\n") br.replace_with("\n")
# Get text content and limit to 500 characters # Get text content and limit to 500 characters
text = soup.get_text(separator="\n")[:500].strip() text = soup.get_text(separator="\n")[:500].strip()
text = clean_lines(text) text = clean_lines(text)
if text: if text:
await send_message(ws, text, room_id) await send_message(ws, text, room_id)
except Exception as err:
print(f"Error: {err}")
except Exception as err:
print(f"Error: {err}")
async def run_gifmaker(command, room_id): async def run_gifmaker(command, room_id):
process = await asyncio.create_subprocess_shell( process = await asyncio.create_subprocess_shell(
" ".join(command), " ".join(command),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, shell=True,
) )
stdout, stderr = await process.communicate() stdout, stderr = await process.communicate()
if process.returncode != 0: if process.returncode != 0:
print(f"(Process) Error: {stderr.decode()}") print(f"(Process) Error: {stderr.decode()}")
return return
await upload(Path(stdout.decode().strip()), room_id)
await upload(Path(stdout.decode().strip()), room_id)
async def upload(path, room_id): async def upload(path, room_id):
if (not path.exists()) or (not path.is_file()): if (not path.exists()) or (not path.is_file()):
return return
cookies = { cookies = {
"session_id": session.split("=")[1], "session_id": session.split("=")[1],
"api_token": token.split("=")[1], "api_token": token.split("=")[1],
} }
ext = get_extension(path) ext = get_extension(path)
if ext == "jpg": if ext == "jpg":
ext = "jpeg" ext = "jpeg"
url = "https://deek.chat/message/send/" + str(room_id) url = "https://deek.chat/message/send/" + str(room_id)
data = aiohttp.FormData() data = aiohttp.FormData()
data.add_field(name="files[]", value=open(path, "rb"), \ data.add_field(name="files[]", value=open(path, "rb"),
filename=path.name, content_type=f"image/{ext}") filename=path.name, content_type=f"image/{ext}")
try: try:
async with aiohttp.ClientSession(cookies=cookies) as sess: async with aiohttp.ClientSession(cookies=cookies) as sess:
async with sess.post(url, data=data, headers={}) as response: async with sess.post(url, data=data, headers={}) as response:
await response.text() await response.text()
except Exception as e: except Exception as e:
print(f"(Upload) Error: {e}") print(f"(Upload) Error: {e}")
traceback.print_exc() traceback.print_exc()
remove_file(path)
remove_file(path)
async def send_message(ws, text, room_id): async def send_message(ws, text, room_id):
await ws.send(json.dumps({"type": "message", "data": text, "roomId": room_id})) await ws.send(json.dumps({"type": "message", "data": text, "roomId": room_id}))
while True: while True:
try: try:
auth() auth()
print("Authenticated") print("Authenticated")
asyncio.run(run()) asyncio.run(run())
except KeyboardInterrupt: except KeyboardInterrupt:
break break
except Exception as e: except Exception as e:
print("(Main) Error:", e) print("(Main) Error:", e)
traceback.print_exc() traceback.print_exc()