from selenium.webdriver.firefox.options import Options from selenium import webdriver from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.common.exceptions import NoSuchElementException, ElementNotVisibleException, TimeoutException from time import sleep import time import disnake import random from urllib.parse import urlparse from disnake.ext import commands from disnake import TextInputStyle import asyncio from dotenv import load_dotenv import os import logging import math logger = logging.getLogger('disnake') logger.setLevel(logging.DEBUG) handler = logging.FileHandler(filename='disnake.log', encoding='utf-8', mode='w') handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')) logger.addHandler(handler) queue = [] user_queue = [] skip_list = [] shuffle = False load_dotenv() options = Options() options.profile = webdriver.FirefoxProfile(os.getenv("PROFILE_PATH")) driver = webdriver.Firefox(options=options) intents = disnake.Intents.all() intents.message_content = False bot = commands.Bot(intents=intents, command_prefix=".", test_guilds=[int(os.getenv("GUILD_ID"))]) @bot.event async def on_ready(): global queuetask queuetask = asyncio.create_task(queuehandler()) #this will set the waiting for videos image, then exit def play_video(videourl): driver.get(videourl) try: elem = WebDriverWait(driver, 8, 0.2, None).until(lambda x: x.find_element(By.CLASS_NAME, "ytp-fullscreen-button")) sleep(1) elem.send_keys(Keys.RETURN) except (NoSuchElementException,TimeoutException) as e: print(e) return #if this errors there is no fullscreen options, such as playlists, so skip the link try: elem = driver.find_element(By.XPATH, '//button[@data-tooltip-target-id="ytp-autonav-toggle-button"][@aria-label="Autoplay is on"]') elem.send_keys(Keys.RETURN) except (NoSuchElementException,ElementNotVisibleException,TimeoutException): pass try: elem = WebDriverWait(driver, 5, 0.2, (NoSuchElementException,ElementNotVisibleException)).until(lambda x: x.find_element(By.XPATH, '//button[@aria-label="Dismiss"]')) elem.send_keys(Keys.RETURN) except (NoSuchElementException,ElementNotVisibleException,TimeoutException) as e: print(e) try: elem = WebDriverWait(driver, (float(os.getenv("MAX_MIN")) * 60), 1, None).until(lambda x: x.find_element(By.CLASS_NAME, "html5-endscreen").is_displayed()) except (NoSuchElementException,ElementNotVisibleException,TimeoutException) as e: print(e) sleep(1) return #same as above sleep(1) return @bot.slash_command( name="play", description="adds a video to the queue", ) async def play(inter: disnake.AppCmdInter, link: str): await inter.response.defer(ephemeral=True) if user_queue.count(inter.user.id) >= (int(os.getenv("MAX_QUEUE"))): await inter.edit_original_response(f"You have reached the queue limit of {os.getenv('MAX_QUEUE')}, try again after one of your videos has played.") return if urlparse(link).netloc == 'youtube.com' or urlparse(link).netloc == 'www.youtube.com' or urlparse(link).netloc == 'youtu.be': queue.append(link) user_queue.append(inter.user.id) await inter.edit_original_response(f"added to queue!") global queuetask if queuetask.done(): queuetask = asyncio.create_task(queuehandler()) return else: await inter.edit_original_response(f"This bot only accepts youtube links") return @bot.slash_command( name="shuffle", description="toggles shuffle on or off, the queue cannot be unshuffled once it is shuffled", ) async def shuffleplay(inter: disnake.AppCmdInter, toggle: str = commands.Param(choices=["on", "off"])): await inter.response.defer(ephemeral=True) global shuffle if toggle == "on": shuffle = True await inter.edit_original_response(f"shuffle enabled") return else: shuffle = False await inter.edit_original_response(f"shuffle disabled") return @bot.slash_command( name="queue", description="list the videos in queue", ) async def getqueue(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=True) if not queue: await inter.edit_original_response("There are no items in queue") return message = f"Now Playing: <{queue[0]}>\n" message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") for i in range(11): if i == 0: continue try: message = message + f"{i}. <{queue[i]}>\n" except IndexError: break message = message + f"1 of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" if math.ceil((len(queue)-1)/10) > 1: await inter.edit_original_response(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) else: await inter.edit_original_response(message) @bot.listen("on_button_click") async def button_listener(inter: disnake.MessageInteraction): if not queue: await inter.response.edit_message("There are no items in queue") return ogmsg = inter.message.content page = ogmsg.split("\n") page = page[-1].split(" of ") if inter.component.custom_id == "Forward": message = f"Now Playing: <{queue[0]}>\n" message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") offset = int(int(page[0]) * 10) for i in range(11): if i == 0: continue try: message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n" except IndexError: break message = message + f"{int(page[0])+1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" if (int(page[0])+1) >= int(page[1]): await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),]) else: await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) return if inter.component.custom_id == "Backward": message = f"Now Playing: <{queue[0]}>\n" message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") offset = int((int(page[0]) - 2) * 10) for i in range(11): if i == 0: continue try: message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n" except IndexError: break message = message + f"{int(page[0])-1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}" if (int(page[0])-1) <= 1 and int(int(page[1]) == 1): await inter.response.edit_message(message) elif (int(page[0])-1) <= 1 and int(int(page[1]) > 1): await inter.response.edit_message(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) else: await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) await inter.response.edit_message(message, components=[]) return @bot.slash_command( name="toggleplayback", description="pauses or unpauses the video, does not bypass the video timelimit", ) async def toggleplayback(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=True) try: elem = driver.find_element(By.XPATH, '//button[@aria-keyshortcuts="k"]') elem.send_keys(Keys.RETURN) except Exception: pass await inter.edit_original_response("toggled!") @bot.slash_command( name="skip", description="skips the current video", default_member_permissions=disnake.Permissions(8192), ) async def skip(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=False) global queuetask queuetask.cancel() try: await queuetask except asyncio.CancelledError: queue.pop(0) user_queue.pop(0) skip_list.clear() if len(queue) < 1: driver.get(f"file://{os.getcwd()}/waitingforvideo.png") driver.fullscreen_window() else: queuetask = asyncio.create_task(queuehandler()) await inter.edit_original_response("skipped") @bot.slash_command( name="voteskip", description="vote to skip the current video", ) async def voteskip(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=False) if not inter.user.voice: await inter.edit_original_response("You are not in the voice channel") return if inter.user.id in skip_list: await inter.edit_original_response("You have already voted to skip this video") return vc = inter.user.voice.channel.members #this could be better due to potential for abuse, but it's fine for now print(inter.user.voice.channel.name) broadcaster = False for m in vc: if m.voice.self_video or m.voice.self_stream: broadcaster = True break if not broadcaster: await inter.edit_original_response("No one is playing video so how can this be the correct vc?") return skip_list.append(inter.user.id) print(len(skip_list)) print(math.floor(len(vc)/2)) print(vc) if len(skip_list) >= (math.floor(len(vc)/2)): global queuetask queuetask.cancel() try: await queuetask except asyncio.CancelledError: queue.pop(0) user_queue.pop(0) skip_list.clear() if len(queue) < 1: driver.get(f"file://{os.getcwd()}/waitingforvideo.png") driver.fullscreen_window() else: queuetask = asyncio.create_task(queuehandler()) await inter.edit_original_response("skipped as sufficient votes were reached") else: await inter.edit_original_response(f"**{inter.user.display_name}** has voted to skip the video, {len(skip_list)}/{math.floor(len(vc)/2)}") @bot.slash_command( name="remove", description="removes a video from the queue", ) async def remove(inter: disnake.AppCmdInter, toremove: int): await inter.response.defer(ephemeral=True) if toremove == 0: await inter.edit_original_response("that is the currently playing video!", ephemeral=True) return else: queue.pop(toremove) user_queue.pop(toremove) await inter.edit_original_response("removed!") async def queuehandler(): loop = asyncio.get_running_loop() global queue while queue: if shuffle: seed = time.time() random.seed(seed) random.shuffle(queue) random.seed(seed) random.shuffle(user_queue) print(queue) print(user_queue) await loop.run_in_executor(None, play_video, queue[0]) queue.pop(0) user_queue.pop(0) skip_list.clear() driver.get(f"file://{os.getcwd()}/waitingforvideo.png") driver.fullscreen_window() bot.run(os.getenv("TOKEN"))