diff --git a/bot.py b/bot.py deleted file mode 100644 index b7e40d9..0000000 --- a/bot.py +++ /dev/null @@ -1,293 +0,0 @@ -from selenium.webdriver.firefox.options import Options -from selenium import webdriver -from selenium.webdriver.common.keys import Keys -from selenium.webdriver.common.by import By -from selenium.webdriver.support.wait import WebDriverWait -from selenium.common.exceptions import NoSuchElementException, ElementNotVisibleException, TimeoutException -from time import sleep -import time -import disnake -import random -from urllib.parse import urlparse -from disnake.ext import commands -from disnake import TextInputStyle -import asyncio -from dotenv import load_dotenv -import os -import logging -import math - -logger = logging.getLogger('disnake') -logger.setLevel(logging.DEBUG) -handler = logging.FileHandler(filename='disnake.log', encoding='utf-8', mode='w') -handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')) -logger.addHandler(handler) - -queue = [] -user_queue = [] -skip_list = [] -shuffle = False -load_dotenv() -options = Options() -options.profile = webdriver.FirefoxProfile(os.getenv("PROFILE_PATH")) -driver = webdriver.Firefox(options=options) -intents = disnake.Intents.all() -intents.message_content = False -bot = commands.Bot(intents=intents, command_prefix=".", test_guilds=[int(os.getenv("GUILD_ID"))]) - -@bot.event -async def on_ready(): - global queuetask - queuetask = asyncio.create_task(queuehandler()) #this will set the waiting for videos image, then exit - -def play_video(videourl): - driver.get(videourl) - try: - elem = WebDriverWait(driver, 8, 0.2, None).until(lambda x: x.find_element(By.CLASS_NAME, "ytp-fullscreen-button")) - sleep(1) - elem.send_keys(Keys.RETURN) - except (NoSuchElementException,TimeoutException) as e: - print(e) - return #if this errors there is no fullscreen options, such as playlists, so skip the link - try: - elem = driver.find_element(By.XPATH, '//button[@data-tooltip-target-id="ytp-autonav-toggle-button"][@aria-label="Autoplay is on"]') - elem.send_keys(Keys.RETURN) - except (NoSuchElementException,ElementNotVisibleException,TimeoutException): - pass - try: - elem = WebDriverWait(driver, 5, 0.2, (NoSuchElementException,ElementNotVisibleException)).until(lambda x: x.find_element(By.XPATH, '//button[@aria-label="Dismiss"]')) - elem.send_keys(Keys.RETURN) - except (NoSuchElementException,ElementNotVisibleException,TimeoutException) as e: - print(e) - try: - elem = WebDriverWait(driver, (float(os.getenv("MAX_MIN")) * 60), 1, None).until(lambda x: x.find_element(By.CLASS_NAME, "html5-endscreen").is_displayed()) - except (NoSuchElementException,ElementNotVisibleException,TimeoutException) as e: - print(e) - sleep(1) - return #same as above - sleep(1) - return - -@bot.slash_command( - name="play", - description="adds a video to the queue", -) -async def play(inter: disnake.AppCmdInter, link: str): - await inter.response.defer(ephemeral=True) - if user_queue.count(inter.user.id) >= (int(os.getenv("MAX_QUEUE"))): - await inter.edit_original_response(f"You have reached the queue limit of {os.getenv('MAX_QUEUE')}, try again after one of your videos has played.") - return - if urlparse(link).netloc == 'youtube.com' or urlparse(link).netloc == 'www.youtube.com' or urlparse(link).netloc == 'youtu.be': - queue.append(link) - user_queue.append(inter.user.id) - await inter.edit_original_response(f"added to queue!") - global queuetask - if queuetask.done(): - queuetask = asyncio.create_task(queuehandler()) - return - else: - await inter.edit_original_response(f"This bot only accepts youtube links") - return - -@bot.slash_command( - name="shuffle", - description="toggles shuffle on or off, the queue cannot be unshuffled once it is shuffled", -) -async def shuffleplay(inter: disnake.AppCmdInter, toggle: str = commands.Param(choices=["on", "off"])): - await inter.response.defer(ephemeral=True) - global shuffle - if toggle == "on": - shuffle = True - await inter.edit_original_response(f"shuffle enabled") - return - else: - shuffle = False - await inter.edit_original_response(f"shuffle disabled") - return - -@bot.slash_command( - name="queue", - description="list the videos in queue", -) -async def getqueue(inter: disnake.AppCmdInter): - await inter.response.defer(ephemeral=True) - if not queue: - await inter.edit_original_response("There are no items in queue") - return - message = f"Now Playing: <{queue[0]}>\n" - message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") - for i in range(11): - if i == 0: - continue - try: - message = message + f"{i}. <{queue[i]}>\n" - except IndexError: - break - message = message + f"1 of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" - if math.ceil((len(queue)-1)/10) > 1: - await inter.edit_original_response(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) - else: - await inter.edit_original_response(message) - -@bot.listen("on_button_click") -async def button_listener(inter: disnake.MessageInteraction): - if not queue: - await inter.response.edit_message("There are no items in queue") - return - ogmsg = inter.message.content - page = ogmsg.split("\n") - page = page[-1].split(" of ") - if inter.component.custom_id == "Forward": - message = f"Now Playing: <{queue[0]}>\n" - message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") - offset = int(int(page[0]) * 10) - for i in range(11): - if i == 0: - continue - try: - message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n" - except IndexError: - break - message = message + f"{int(page[0])+1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" - if (int(page[0])+1) >= int(page[1]): - await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),]) - else: - await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), - disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) - return - if inter.component.custom_id == "Backward": - message = f"Now Playing: <{queue[0]}>\n" - message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") - offset = int((int(page[0]) - 2) * 10) - for i in range(11): - if i == 0: - continue - try: - message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n" - except IndexError: - break - message = message + f"{int(page[0])-1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" - if (int(page[0])-1) <= 1 and int(int(page[1]) == 1): - await inter.response.edit_message(message) - elif (int(page[0])-1) <= 1 and int(int(page[1]) > 1): - await inter.response.edit_message(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) - else: - await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), - disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) - await inter.response.edit_message(message, components=[]) - return - -@bot.slash_command( - name="toggleplayback", - description="pauses or unpauses the video, does not bypass the video timelimit", -) -async def toggleplayback(inter: disnake.AppCmdInter): - await inter.response.defer(ephemeral=True) - try: - elem = driver.find_element(By.XPATH, '//button[@aria-keyshortcuts="k"]') - elem.send_keys(Keys.RETURN) - except Exception: - pass - await inter.edit_original_response("toggled!") - -@bot.slash_command( - name="skip", - description="skips the current video", - default_member_permissions=disnake.Permissions(8192), -) -async def skip(inter: disnake.AppCmdInter): - await inter.response.defer(ephemeral=False) - global queuetask - queuetask.cancel() - try: - await queuetask - except asyncio.CancelledError: - queue.pop(0) - user_queue.pop(0) - skip_list.clear() - if len(queue) < 1: - driver.get(f"file://{os.getcwd()}/waitingforvideo.png") - driver.fullscreen_window() - else: - queuetask = asyncio.create_task(queuehandler()) - await inter.edit_original_response("skipped") - -@bot.slash_command( - name="voteskip", - description="vote to skip the current video", -) -async def voteskip(inter: disnake.AppCmdInter): - await inter.response.defer(ephemeral=False) - if not inter.user.voice: - await inter.edit_original_response("You are not in the voice channel") - return - if inter.user.id in skip_list: - await inter.edit_original_response("You have already voted to skip this video") - return - vc = inter.user.voice.channel.members #this could be better due to potential for abuse, but it's fine for now - print(inter.user.voice.channel.name) - broadcaster = False - for m in vc: - if m.voice.self_video or m.voice.self_stream: - broadcaster = True - break - if not broadcaster: - await inter.edit_original_response("No one is playing video so how can this be the correct vc?") - return - skip_list.append(inter.user.id) - print(len(skip_list)) - print(math.floor(len(vc)/2)) - print(vc) - if len(skip_list) >= (math.floor(len(vc)/2)): - global queuetask - queuetask.cancel() - try: - await queuetask - except asyncio.CancelledError: - queue.pop(0) - user_queue.pop(0) - skip_list.clear() - if len(queue) < 1: - driver.get(f"file://{os.getcwd()}/waitingforvideo.png") - driver.fullscreen_window() - else: - queuetask = asyncio.create_task(queuehandler()) - await inter.edit_original_response("skipped as sufficient votes were reached") - else: - await inter.edit_original_response(f"**{inter.user.display_name}** has voted to skip the video, {len(skip_list)}/{math.floor(len(vc)/2)}") - - -@bot.slash_command( - name="remove", - description="removes a video from the queue", -) -async def remove(inter: disnake.AppCmdInter, toremove: int): - await inter.response.defer(ephemeral=True) - if toremove == 0: - await inter.edit_original_response("that is the currently playing video!", ephemeral=True) - return - else: - queue.pop(toremove) - user_queue.pop(toremove) - await inter.edit_original_response("removed!") - -async def queuehandler(): - loop = asyncio.get_running_loop() - global queue - while queue: - if shuffle: - seed = time.time() - random.seed(seed) - random.shuffle(queue) - random.seed(seed) - random.shuffle(user_queue) - print(queue) - print(user_queue) - await loop.run_in_executor(None, play_video, queue[0]) - queue.pop(0) - user_queue.pop(0) - skip_list.clear() - driver.get(f"file://{os.getcwd()}/waitingforvideo.png") - driver.fullscreen_window() - -bot.run(os.getenv("TOKEN")) \ No newline at end of file