from selenium.webdriver.firefox.options import Options from selenium import webdriver from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from time import sleep import time import disnake import random from urllib.parse import urlparse from disnake.ext import commands from disnake import TextInputStyle import asyncio from dotenv import load_dotenv import os import logging import math logger = logging.getLogger('disnake') logger.setLevel(logging.DEBUG) handler = logging.FileHandler(filename='disnake.log', encoding='utf-8', mode='w') handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')) logger.addHandler(handler) queue = [] shuffle = False load_dotenv() options = Options() options.profile = webdriver.FirefoxProfile(os.getenv("PROFILE_PATH")) driver = webdriver.Firefox(options=options) intents = disnake.Intents.default() intents.message_content = False bot = commands.Bot(intents=intents, command_prefix=".", test_guilds=[int(os.getenv("GUILD_ID"))]) @bot.event async def on_ready(): global queuetask queuetask = asyncio.create_task(queuehandler()) #this will set the waiting for videos image, then exit def play_video(videourl): driver.get(videourl) sleep(4) try: elem = driver.find_element(By.CLASS_NAME, "ytp-fullscreen-button") elem.send_keys(Keys.RETURN) except Exception: return #if this errors there is no fullscreen options, such as playlists, so skip the link #guess I don't need this #sleep(1.5) #elem = driver.find_element(By.XPATH, '//button[@aria-keyshortcuts="k"]') #elem.send_keys(Keys.RETURN) try: elem = driver.find_element(By.XPATH, '//button[@data-tooltip-target-id="ytp-autonav-toggle-button"][@aria-label="Autoplay is on"]') elem.send_keys(Keys.RETURN) except Exception: pass sleep(10) try: elem = driver.find_element(By.XPATH, '//button[@aria-label="Dismiss"]') elem.send_keys(Keys.RETURN) except Exception: pass try: endscreen = driver.find_element(By.CLASS_NAME, "html5-endscreen") timeout = (float(os.getenv("MAX_MIN")) * 60) + time.time() - 10 #subtract 10 since 10 seconds of the video has played by this point except Exception: return #same as above while str(endscreen.get_attribute('style')) == "display: none;" and time.time() < timeout: pass sleep(2) return @bot.slash_command( name="play", description="adds a video to the queue", ) async def play(inter: disnake.AppCmdInter, link: str): await inter.response.defer(ephemeral=True) if urlparse(link).netloc == 'youtube.com' or urlparse(link).netloc == 'www.youtube.com' or urlparse(link).netloc == 'youtu.be': queue.append(link) await inter.edit_original_response(f"added to queue!") global queuetask if queuetask.done(): queuetask = asyncio.create_task(queuehandler()) return else: await inter.edit_original_response(f"This bot only accepts youtube links") return @bot.slash_command( name="shuffle", description="toggles shuffle on or off, the queue cannot be unshuffled once it is shuffled", ) async def shuffleplay(inter: disnake.AppCmdInter, toggle: str = commands.Param(choices=["on", "off"])): await inter.response.defer(ephemeral=True) global shuffle if toggle == "on": shuffle = True await inter.edit_original_response(f"shuffle enabled") return else: shuffle = False await inter.edit_original_response(f"shuffle disabled") return @bot.slash_command( name="queue", description="list the videos in queue", ) async def getqueue(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=True) if not queue: await inter.edit_original_response("There are no items in queue") message = f"Now Playing: <{queue[0]}>\n" for i in range(11): if i == 0: continue try: message = message + f"{i}. <{queue[i]}>\n" except IndexError: break message = message + f"1 of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" if math.ceil((len(queue)-1)/10) > 1: await inter.edit_original_response(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) else: await inter.edit_original_response(message) @bot.listen("on_button_click") async def button_listener(inter: disnake.MessageInteraction): ogmsg = inter.message.content page = ogmsg.split("\n") page = page[-1].split(" of ") if inter.component.custom_id == "Forward": message = f"Now Playing: <{queue[0]}>\n" offset = int(int(page[0]) * 10) for i in range(11): if i == 0: continue try: message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n" except IndexError: break message = message + f"{int(page[0])+1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" if (int(page[0])+1) >= int(page[1]): await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),]) else: await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) return if inter.component.custom_id == "Backward": message = f"Now Playing: <{queue[0]}>\n" offset = int((int(page[0]) - 2) * 10) for i in range(11): if i == 0: continue try: message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n" except IndexError: break message = message + f"{int(page[0])-1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" if (int(page[0])-1) <= 1 and int(int(page[1]) == 1): await inter.response.edit_message(message) elif (int(page[0])-1) <= 1 and int(int(page[1]) > 1): await inter.response.edit_message(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) else: await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) await inter.response.edit_message(message, components=[]) return @bot.slash_command( name="toggleplayback", description="pauses or unpauses the video, does not bypass the video timelimit", ) async def toggleplayback(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=True) try: elem = driver.find_element(By.XPATH, '//button[@aria-keyshortcuts="k"]') elem.send_keys(Keys.RETURN) except Exception: pass await inter.edit_original_response("toggled!") @bot.slash_command( name="skip", description="skips the current video", ) async def skip(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=False) global queuetask queuetask.cancel() try: await queuetask except asyncio.CancelledError: queue.pop(0) if len(queue) < 1: driver.get(f"file://{os.getcwd()}/waitingforvideo.png") driver.fullscreen_window() else: queuetask = asyncio.create_task(queuehandler()) await inter.edit_original_response("skipped") @bot.slash_command( name="remove", description="removes a video from the queue", ) async def remove(inter: disnake.AppCmdInter, toremove: int): await inter.response.defer(ephemeral=True) if toremove == 0: await inter.edit_original_response("that is the currently playing video!", ephemeral=True) return else: queue.pop(toremove) await inter.edit_original_response("removed!") async def queuehandler(): loop = asyncio.get_running_loop() global queue while queue: if shuffle: random.shuffle(queue) print(queue) driver.maximize_window() await loop.run_in_executor(None, play_video, queue[0]) queue.pop(0) driver.get(f"file://{os.getcwd()}/waitingforvideo.png") driver.fullscreen_window() bot.run(os.getenv("TOKEN"))