discord-yt-remote/bot.py

293 lines
No EOL
12 KiB
Python

from selenium.webdriver.firefox.options import Options
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchElementException, ElementNotVisibleException, TimeoutException
from time import sleep
import time
import disnake
import random
from urllib.parse import urlparse
from disnake.ext import commands
from disnake import TextInputStyle
import asyncio
from dotenv import load_dotenv
import os
import logging
import math
logger = logging.getLogger('disnake')
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(filename='disnake.log', encoding='utf-8', mode='w')
handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s'))
logger.addHandler(handler)
queue = []
user_queue = []
skip_list = []
shuffle = False
load_dotenv()
options = Options()
options.profile = webdriver.FirefoxProfile(os.getenv("PROFILE_PATH"))
driver = webdriver.Firefox(options=options)
intents = disnake.Intents.all()
intents.message_content = False
bot = commands.Bot(intents=intents, command_prefix=".", test_guilds=[int(os.getenv("GUILD_ID"))])
@bot.event
async def on_ready():
global queuetask
queuetask = asyncio.create_task(queuehandler()) #this will set the waiting for videos image, then exit
def play_video(videourl):
driver.get(videourl)
try:
elem = WebDriverWait(driver, 8, 0.2, None).until(lambda x: x.find_element(By.CLASS_NAME, "ytp-fullscreen-button"))
sleep(1)
elem.send_keys(Keys.RETURN)
except (NoSuchElementException,TimeoutException) as e:
print(e)
return #if this errors there is no fullscreen options, such as playlists, so skip the link
try:
elem = driver.find_element(By.XPATH, '//button[@data-tooltip-target-id="ytp-autonav-toggle-button"][@aria-label="Autoplay is on"]')
elem.send_keys(Keys.RETURN)
except (NoSuchElementException,ElementNotVisibleException,TimeoutException):
pass
try:
elem = WebDriverWait(driver, 5, 0.2, (NoSuchElementException,ElementNotVisibleException)).until(lambda x: x.find_element(By.XPATH, '//button[@aria-label="Dismiss"]'))
elem.send_keys(Keys.RETURN)
except (NoSuchElementException,ElementNotVisibleException,TimeoutException) as e:
print(e)
try:
elem = WebDriverWait(driver, (float(os.getenv("MAX_MIN")) * 60), 1, None).until(lambda x: x.find_element(By.CLASS_NAME, "html5-endscreen").is_displayed())
except (NoSuchElementException,ElementNotVisibleException,TimeoutException) as e:
print(e)
sleep(1)
return #same as above
sleep(1)
return
@bot.slash_command(
name="play",
description="adds a video to the queue",
)
async def play(inter: disnake.AppCmdInter, link: str):
await inter.response.defer(ephemeral=True)
if user_queue.count(inter.user.id) >= (int(os.getenv("MAX_QUEUE"))):
await inter.edit_original_response(f"You have reached the queue limit of {os.getenv('MAX_QUEUE')}, try again after one of your videos has played.")
return
if urlparse(link).netloc == 'youtube.com' or urlparse(link).netloc == 'www.youtube.com' or urlparse(link).netloc == 'youtu.be':
queue.append(link)
user_queue.append(inter.user.id)
await inter.edit_original_response(f"added to queue!")
global queuetask
if queuetask.done():
queuetask = asyncio.create_task(queuehandler())
return
else:
await inter.edit_original_response(f"This bot only accepts youtube links")
return
@bot.slash_command(
name="shuffle",
description="toggles shuffle on or off, the queue cannot be unshuffled once it is shuffled",
)
async def shuffleplay(inter: disnake.AppCmdInter, toggle: str = commands.Param(choices=["on", "off"])):
await inter.response.defer(ephemeral=True)
global shuffle
if toggle == "on":
shuffle = True
await inter.edit_original_response(f"shuffle enabled")
return
else:
shuffle = False
await inter.edit_original_response(f"shuffle disabled")
return
@bot.slash_command(
name="queue",
description="list the videos in queue",
)
async def getqueue(inter: disnake.AppCmdInter):
await inter.response.defer(ephemeral=True)
if not queue:
await inter.edit_original_response("There are no items in queue")
return
message = f"Now Playing: <{queue[0]}>\n"
message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n")
for i in range(11):
if i == 0:
continue
try:
message = message + f"{i}. <{queue[i]}>\n"
except IndexError:
break
message = message + f"1 of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}"
if math.ceil((len(queue)-1)/10) > 1:
await inter.edit_original_response(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
else:
await inter.edit_original_response(message)
@bot.listen("on_button_click")
async def button_listener(inter: disnake.MessageInteraction):
if not queue:
await inter.response.edit_message("There are no items in queue")
return
ogmsg = inter.message.content
page = ogmsg.split("\n")
page = page[-1].split(" of ")
if inter.component.custom_id == "Forward":
message = f"Now Playing: <{queue[0]}>\n"
message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n")
offset = int(int(page[0]) * 10)
for i in range(11):
if i == 0:
continue
try:
message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n"
except IndexError:
break
message = message + f"{int(page[0])+1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}"
if (int(page[0])+1) >= int(page[1]):
await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),])
else:
await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),
disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
return
if inter.component.custom_id == "Backward":
message = f"Now Playing: <{queue[0]}>\n"
message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n")
offset = int((int(page[0]) - 2) * 10)
for i in range(11):
if i == 0:
continue
try:
message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n"
except IndexError:
break
message = message + f"{int(page[0])-1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}"
if (int(page[0])-1) <= 1 and int(int(page[1]) == 1):
await inter.response.edit_message(message)
elif (int(page[0])-1) <= 1 and int(int(page[1]) > 1):
await inter.response.edit_message(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
else:
await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),
disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
await inter.response.edit_message(message, components=[])
return
@bot.slash_command(
name="toggleplayback",
description="pauses or unpauses the video, does not bypass the video timelimit",
)
async def toggleplayback(inter: disnake.AppCmdInter):
await inter.response.defer(ephemeral=True)
try:
elem = driver.find_element(By.XPATH, '//button[@aria-keyshortcuts="k"]')
elem.send_keys(Keys.RETURN)
except Exception:
pass
await inter.edit_original_response("toggled!")
@bot.slash_command(
name="skip",
description="skips the current video",
default_member_permissions=disnake.Permissions(8192),
)
async def skip(inter: disnake.AppCmdInter):
await inter.response.defer(ephemeral=False)
global queuetask
queuetask.cancel()
try:
await queuetask
except asyncio.CancelledError:
queue.pop(0)
user_queue.pop(0)
skip_list.clear()
if len(queue) < 1:
driver.get(f"file://{os.getcwd()}/waitingforvideo.png")
driver.fullscreen_window()
else:
queuetask = asyncio.create_task(queuehandler())
await inter.edit_original_response("skipped")
@bot.slash_command(
name="voteskip",
description="vote to skip the current video",
)
async def voteskip(inter: disnake.AppCmdInter):
await inter.response.defer(ephemeral=False)
if not inter.user.voice:
await inter.edit_original_response("You are not in the voice channel")
return
if inter.user.id in skip_list:
await inter.edit_original_response("You have already voted to skip this video")
return
vc = inter.user.voice.channel.members #this could be better due to potential for abuse, but it's fine for now
print(inter.user.voice.channel.name)
broadcaster = False
for m in vc:
if m.voice.self_video or m.voice.self_stream:
broadcaster = True
break
if not broadcaster:
await inter.edit_original_response("No one is playing video so how can this be the correct vc?")
return
skip_list.append(inter.user.id)
print(len(skip_list))
print(math.floor(len(vc)/2))
print(vc)
if len(skip_list) >= (math.floor(len(vc)/2)):
global queuetask
queuetask.cancel()
try:
await queuetask
except asyncio.CancelledError:
queue.pop(0)
user_queue.pop(0)
skip_list.clear()
if len(queue) < 1:
driver.get(f"file://{os.getcwd()}/waitingforvideo.png")
driver.fullscreen_window()
else:
queuetask = asyncio.create_task(queuehandler())
await inter.edit_original_response("skipped as sufficient votes were reached")
else:
await inter.edit_original_response(f"**{inter.user.display_name}** has voted to skip the video, {len(skip_list)}/{math.floor(len(vc)/2)}")
@bot.slash_command(
name="remove",
description="removes a video from the queue",
)
async def remove(inter: disnake.AppCmdInter, toremove: int):
await inter.response.defer(ephemeral=True)
if toremove == 0:
await inter.edit_original_response("that is the currently playing video!", ephemeral=True)
return
else:
queue.pop(toremove)
user_queue.pop(toremove)
await inter.edit_original_response("removed!")
async def queuehandler():
loop = asyncio.get_running_loop()
global queue
while queue:
if shuffle:
seed = time.time()
random.seed(seed)
random.shuffle(queue)
random.seed(seed)
random.shuffle(user_queue)
print(queue)
print(user_queue)
await loop.run_in_executor(None, play_video, queue[0])
queue.pop(0)
user_queue.pop(0)
skip_list.clear()
driver.get(f"file://{os.getcwd()}/waitingforvideo.png")
driver.fullscreen_window()
bot.run(os.getenv("TOKEN"))