discord-yt-remote/bot.py

235 lines
No EOL
8.9 KiB
Python

from selenium.webdriver.firefox.options import Options
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from time import sleep
import time
import disnake
import random
from urllib.parse import urlparse
from disnake.ext import commands
from disnake import TextInputStyle
import asyncio
from dotenv import load_dotenv
import os
import logging
import math
logger = logging.getLogger('disnake')
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(filename='disnake.log', encoding='utf-8', mode='w')
handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s'))
logger.addHandler(handler)
queue = []
shuffle = False
load_dotenv()
options = Options()
options.profile = webdriver.FirefoxProfile(os.getenv("PROFILE_PATH"))
driver = webdriver.Firefox(options=options)
intents = disnake.Intents.default()
intents.message_content = False
bot = commands.Bot(intents=intents, command_prefix=".", test_guilds=[int(os.getenv("GUILD_ID"))])
@bot.event
async def on_ready():
global queuetask
queuetask = asyncio.create_task(queuehandler()) #this will set the waiting for videos image, then exit
def play_video(videourl):
driver.get(videourl)
sleep(4)
try:
elem = driver.find_element(By.CLASS_NAME, "ytp-fullscreen-button")
elem.send_keys(Keys.RETURN)
except Exception:
return #if this errors there is no fullscreen options, such as playlists, so skip the link
#guess I don't need this
#sleep(1.5)
#elem = driver.find_element(By.XPATH, '//button[@aria-keyshortcuts="k"]')
#elem.send_keys(Keys.RETURN)
try:
elem = driver.find_element(By.XPATH, '//button[@data-tooltip-target-id="ytp-autonav-toggle-button"][@aria-label="Autoplay is on"]')
elem.send_keys(Keys.RETURN)
except Exception:
pass
sleep(10)
try:
elem = driver.find_element(By.XPATH, '//button[@aria-label="Dismiss"]')
elem.send_keys(Keys.RETURN)
except Exception:
pass
try:
endscreen = driver.find_element(By.CLASS_NAME, "html5-endscreen")
timeout = (float(os.getenv("MAX_MIN")) * 60) + time.time() - 10 #subtract 10 since 10 seconds of the video has played by this point
except Exception:
return #same as above
while str(endscreen.get_attribute('style')) == "display: none;" and time.time() < timeout:
pass
sleep(2)
return
@bot.slash_command(
name="play",
description="adds a video to the queue",
)
async def play(inter: disnake.AppCmdInter, link: str):
await inter.response.defer(ephemeral=True)
if urlparse(link).netloc == 'youtube.com' or urlparse(link).netloc == 'www.youtube.com' or urlparse(link).netloc == 'youtu.be':
queue.append(link)
await inter.edit_original_response(f"added to queue!")
global queuetask
if queuetask.done():
queuetask = asyncio.create_task(queuehandler())
return
else:
await inter.edit_original_response(f"This bot only accepts youtube links")
return
@bot.slash_command(
name="shuffle",
description="toggles shuffle on or off, the queue cannot be unshuffled once it is shuffled",
)
async def shuffleplay(inter: disnake.AppCmdInter, toggle: str = commands.Param(choices=["on", "off"])):
await inter.response.defer(ephemeral=True)
global shuffle
if toggle == "on":
shuffle = True
await inter.edit_original_response(f"shuffle enabled")
return
else:
shuffle = False
await inter.edit_original_response(f"shuffle disabled")
return
@bot.slash_command(
name="queue",
description="list the videos in queue",
)
async def getqueue(inter: disnake.AppCmdInter):
await inter.response.defer(ephemeral=True)
if not queue:
await inter.edit_original_response("There are no items in queue")
return
message = f"Now Playing: <{queue[0]}>\n"
message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n")
for i in range(11):
if i == 0:
continue
try:
message = message + f"{i}. <{queue[i]}>\n"
except IndexError:
break
message = message + f"1 of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}"
if math.ceil((len(queue)-1)/10) > 1:
await inter.edit_original_response(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
else:
await inter.edit_original_response(message)
@bot.listen("on_button_click")
async def button_listener(inter: disnake.MessageInteraction):
if not queue:
await inter.response.edit_message("There are no items in queue")
return
ogmsg = inter.message.content
page = ogmsg.split("\n")
page = page[-1].split(" of ")
if inter.component.custom_id == "Forward":
message = f"Now Playing: <{queue[0]}>\n"
message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n")
offset = int(int(page[0]) * 10)
for i in range(11):
if i == 0:
continue
try:
message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n"
except IndexError:
break
message = message + f"{int(page[0])+1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}"
if (int(page[0])+1) >= int(page[1]):
await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),])
else:
await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),
disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
return
if inter.component.custom_id == "Backward":
message = f"Now Playing: <{queue[0]}>\n"
message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n")
offset = int((int(page[0]) - 2) * 10)
for i in range(11):
if i == 0:
continue
try:
message = message + f"{int(i+offset)}. <{queue[int(i+offset)]}>\n"
except IndexError:
break
message = message + f"{int(page[0])-1} of {math.ceil((len(queue)-1)/10) if (len(queue)-1)/10 > 1 else 1}"
if (int(page[0])-1) <= 1 and int(int(page[1]) == 1):
await inter.response.edit_message(message)
elif (int(page[0])-1) <= 1 and int(int(page[1]) > 1):
await inter.response.edit_message(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
else:
await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),
disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),])
await inter.response.edit_message(message, components=[])
return
@bot.slash_command(
name="toggleplayback",
description="pauses or unpauses the video, does not bypass the video timelimit",
)
async def toggleplayback(inter: disnake.AppCmdInter):
await inter.response.defer(ephemeral=True)
try:
elem = driver.find_element(By.XPATH, '//button[@aria-keyshortcuts="k"]')
elem.send_keys(Keys.RETURN)
except Exception:
pass
await inter.edit_original_response("toggled!")
@bot.slash_command(
name="skip",
description="skips the current video",
)
async def skip(inter: disnake.AppCmdInter):
await inter.response.defer(ephemeral=False)
global queuetask
queuetask.cancel()
try:
await queuetask
except asyncio.CancelledError:
queue.pop(0)
if len(queue) < 1:
driver.get(f"file://{os.getcwd()}/waitingforvideo.png")
driver.fullscreen_window()
else:
queuetask = asyncio.create_task(queuehandler())
await inter.edit_original_response("skipped")
@bot.slash_command(
name="remove",
description="removes a video from the queue",
)
async def remove(inter: disnake.AppCmdInter, toremove: int):
await inter.response.defer(ephemeral=True)
if toremove == 0:
await inter.edit_original_response("that is the currently playing video!", ephemeral=True)
return
else:
queue.pop(toremove)
await inter.edit_original_response("removed!")
async def queuehandler():
loop = asyncio.get_running_loop()
global queue
while queue:
if shuffle:
random.shuffle(queue)
print(queue)
driver.maximize_window()
await loop.run_in_executor(None, play_video, queue[0])
queue.pop(0)
driver.get(f"file://{os.getcwd()}/waitingforvideo.png")
driver.fullscreen_window()
bot.run(os.getenv("TOKEN"))