import obsws_python import yt_dlp from time import sleep import time import disnake import random from urllib.parse import urlparse from disnake.ext import commands, tasks from disnake import TextInputStyle import asyncio from dotenv import load_dotenv import os, sys import datetime import logging import math from tempfile import TemporaryDirectory from pathlib import Path import sqlite3 if len(sys.argv) > 1: if sys.argv[1] == "--clear-queue": os.remove("queue.db") print("Queue cleared!") load_dotenv() queue = [] skip_list = [] shuffle = True retries = 0 failures = 0 vidcounter = 0 temp_dir = TemporaryDirectory() vid_dir = Path(temp_dir.name) download_dir = TemporaryDirectory() full_dl_dir = Path(download_dir.name) vid_details = {"title": "", "channel": ""} con = sqlite3.connect("queue.db", check_same_thread=False) cur = con.cursor() def countuser(usrid): res = cur.execute(f"SELECT ROWID FROM queue WHERE " + ("hasplayed = false AND " if not (os.getenv("PERMANENT_MAX_QUEUE","FALSE") == "TRUE") else "") + f"user = {usrid}") rowid = res.fetchall() print(len(rowid)) return len(rowid) def sqllen(): res = cur.execute(f"SELECT ROWID FROM queue WHERE hasplayed = false") rowid = res.fetchall() return len(rowid) def propagate_queue(times): res = cur.execute(f"SELECT ROWID,link FROM queue WHERE hasplayed = false " + ("ORDER BY RANDOM() " if shuffle else "") + f"LIMIT {times}") for rowid,link in res.fetchall(): if len(queue) == 2: print("the queue is already propagated refusing to evaluate!") return elif len(queue) > 2: print(f"The queue is larger than two videos this WILL cause issues: {queue}") return queue.append(link) cur.execute(f"UPDATE queue SET hasplayed = true WHERE ROWID='{rowid}'") print(f"added {link} to queue") con.commit() def video_check(info, *, incomplete): duration = info.get('duration') vid_details["title"] = info.get("title") vid_details["channel"] = info.get("channel") print(info.get("channel")) print(info.get("title")) if duration and duration >= ((float(os.getenv("MAX_MIN")) * 60) + 480): queue.pop(0) propagate_queue(1) skip_list.clear() if queue: ydl.download(queue[0]) return "video too long... :(" ydl_opts = { 'match_filter': video_check, 'hls_prefer_native': True, 'extract_flat': 'discard_in_playlist', 'format': 'bestvideo[height<=1080][vcodec!*=av01][ext=mp4]+bestaudio[abr<=256][ext=m4a]/best[ext=mp4]/best', 'fragment_retries': 10, 'noplaylist': True, 'restrictfilenames': True, 'source_address': '0.0.0.0', 'concurrent_fragment_downloads': 3, 'paths': {'home': f'{vid_dir}', 'temp': f'{full_dl_dir}'}, 'outtmpl': {'default': f'999zznext'}, 'postprocessors': [{'api': 'https://sponsor.ajay.app', 'categories': {'sponsor', 'selfpromo'}, 'key': 'SponsorBlock', 'when': 'after_filter'}, {'force_keyframes': False, 'key': 'ModifyChapters', 'remove_chapters_patterns': [], 'remove_ranges': [], 'remove_sponsor_segments': {'sponsor', 'selfpromo'}, 'sponsorblock_chapter_title': '[SponsorBlock]: ' '%(category_names)l'}, {'key': 'FFmpegConcat', 'only_multi_video': True, 'when': 'playlist'}, {'format': 'vtt', 'key': 'FFmpegSubtitlesConvertor', 'when': 'before_dl'}, {'already_have_subtitle': False, 'key': 'FFmpegEmbedSubtitle'}], 'proxy': f'{os.getenv("PROXY")}', 'subtitlesformat': 'vtt', 'subtitleslangs': ['en.*'], 'writesubtitles': True, 'retries': 10, } #logging.basicConfig(level=logging.DEBUG) ydl = yt_dlp.YoutubeDL(ydl_opts) obs = obsws_python.ReqClient() obse = obsws_python.EventClient() intents = disnake.Intents.all() intents.message_content = False bot = commands.Bot(intents=intents, command_prefix=".", test_guilds=[int(os.getenv("GUILD_ID"))]) ver = obs.get_version() print(f"OBS Version: {ver.obs_version}") @bot.event async def on_ready(): res = cur.execute("SELECT name FROM sqlite_master WHERE name='queue'") if res.fetchone() is None: cur.execute("CREATE TABLE queue(link, user, hasplayed)") con.commit() obs.set_current_program_scene("waiting") obse.callback.register(on_media_input_playback_ended) videotimer.start() ensurewaiting.start() titlehandler.start() if (not os.path.isfile(f"{vid_dir}/{vidcounter}.mp4")) and sqllen() > 1: print("predefined queue!") propagate_queue(2) loop = asyncio.get_running_loop() await loop.run_in_executor(None, cold_run) def download_video(index): global retries while len(os.listdir(full_dl_dir)) != 0: pass try: ydl.download(queue[index]) retries = 0 sleep(2) #allow ytdlp to fully cleanup except Exception: print("handling youtube exception") global failures if retries % 2 == 1: queue.pop(index) propagate_queue(1) failures = failures + 1 retries = retries + 1 download_video(index) def wait_for_next_video(): counter = 0 if len(queue) <= 1: return stat = obs.get_scene_item_id("youtube", "notice") print(stat.scene_item_id) #if not os.path.isfile(f"{vid_dir}/{vidcounter+1}.mp4"): print("Attempting to enable") obs.set_scene_item_enabled("youtube", stat.scene_item_id, True) while not os.path.isfile(f"{vid_dir}/{vidcounter+1}.mp4"): counter = counter + 1 sleep(0.5) if counter == 20 and len(os.listdir(full_dl_dir)) == 0: print("failsafe activated") download_video(0) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") obs.set_scene_item_enabled("youtube", stat.scene_item_id, False) return def cold_run(): download_video(0) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter}.mp4") obs.set_current_program_scene("youtube") nowplayingid = obs.get_scene_item_id("youtube", "nowplaying") stat = obs.get_scene_item_id("youtube", "notice") obs.set_scene_item_enabled("youtube", stat.scene_item_id, False) obs.set_input_settings("player", {'playlist': [{'hidden': False, 'selected': False, 'value': f"{str(vid_dir)}"}]}, True) obs.set_input_settings("nowplaying", {'text': f'{vid_details["title"]}\nBy {vid_details["channel"]}'}, True) obs.set_scene_item_enabled("youtube", nowplayingid.scene_item_id, True) obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART") if len(queue) > 1: download_video(1) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") def on_media_input_playback_ended(data): global vidcounter try: queue.pop(0) propagate_queue(1) except IndexError as e: print(f"Video ended but somehow it wasn't in the queue? Resetting {e}") obs.set_current_program_scene("waiting") skip_list.clear() if not queue: if sqllen() >= 1: propagate_queue(2) download_video(0) #will be noticeably slow but this should not happen os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") else: obs.set_current_program_scene("waiting") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 return wait_for_next_video() print(queue) #obs.set_current_program_scene("youtube2") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 #os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter}.mp4") print("changing obs settigs") scene = obs.get_current_program_scene() if scene.scene_name != "youtube": obs.set_current_program_scene("youtube") nowplayingid = obs.get_scene_item_id("youtube", "nowplaying") obs.set_input_settings("player", {'playlist': [{'hidden': False, 'selected': False, 'value': f"{str(vid_dir)}"}]}, True) obs.set_input_settings("nowplaying", {'text': f'{vid_details["title"]}\nBy {vid_details["channel"]}'}, True) obs.set_input_settings("nowplaying", {'text': f'{vid_details["title"]}\nBy {vid_details["channel"]}'}, True) #sometimes it doesn't apply for some reason TODO: investigate further print("starting playback") obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART") obs.set_scene_item_enabled("youtube", nowplayingid.scene_item_id, True) obs.set_scene_item_enabled("youtube", nowplayingid.scene_item_id, True) #same as above if len(queue) > 1: download_video(1) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") elif sqllen() >= 1 and len(queue) == 1: propagate_queue(1) download_video(1) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") @bot.slash_command( name="stats", description="get information about whats happening", ) async def stats(inter: disnake.AppCmdInter): scene = obs.get_current_program_scene() if scene.scene_name != "youtube": await inter.response.send_message("Stats can only be shown while a video is playing", ephemeral=True) return await inter.response.defer(ephemeral=True) ver = obs.get_version() message = f"OBS Version: {ver.obs_version}\n" message = message + f"Failures: {failures}\n" nowplaying = obs.get_input_settings("nowplaying") message = message + f"Title: {nowplaying.input_settings['text']}\n" message = message + f"Link: <{queue[0]}>\n" playing = obs.get_media_input_status("player") message = message + f"Video Duration: {str(datetime.timedelta(seconds=(round(playing.media_cursor/1000))))}/{str(datetime.timedelta(seconds=(round(playing.media_duration/1000))))}\n" if inter.permissions.moderate_members and not (os.getenv("PERMANENT_MAX_QUEUE","FALSE") == "TRUE"): res = cur.execute(f"SELECT user FROM queue WHERE hasplayed = true AND link = '{queue[0]}'") res = res.fetchall() #We can't gaurentee the result so just show likely possibilites message = message + f"Users who have queued this video: " for [usrid] in list(set(res)): message = message + f"<@{usrid}>, " await inter.edit_original_response(message) @bot.slash_command( name="play", description="adds a video to the queue", ) async def play(inter: disnake.AppCmdInter, link: str): await inter.response.defer(ephemeral=True) if countuser(inter.user.id) >= (int(os.getenv("MAX_QUEUE"))): await inter.edit_original_response(f"You have reached the queue limit of {os.getenv('MAX_QUEUE')}, " + ("try again after one of your videos has played." if not (os.getenv("PERMANENT_MAX_QUEUE","FALSE") == "TRUE") else "you may not queue videos for the rest of the session.")) return if (urlparse(link).netloc == 'youtube.com' or urlparse(link).netloc == 'www.youtube.com' or urlparse(link).netloc == 'youtu.be') and urlparse(link).scheme == 'https': cur.execute(f"""INSERT INTO queue VALUES ('{link}',{inter.user.id},false) """) con.commit() await inter.edit_original_response(f"added to queue!") if (not os.path.isfile(f"{vid_dir}/{vidcounter}.mp4")) and sqllen() > 1 and len(queue) == 0: loop = asyncio.get_running_loop() queue.clear() #safety propagate_queue(2) print(queue) await loop.run_in_executor(None, cold_run) return else: await inter.edit_original_response(f"This bot only accepts youtube links") return @bot.slash_command( name="shuffle", description="toggles shuffle on or off, the queue cannot be unshuffled once it is shuffled", ) async def shuffleplay(inter: disnake.AppCmdInter, toggle: str = commands.Param(choices=["on", "off"])): if os.getenv("LOCK_SHUFFLE","FALSE") == "TRUE": await inter.response.send_message("the bot owner has locked shuffling",ephemeral=True) return await inter.response.defer(ephemeral=True) global shuffle if toggle == "on": shuffle = True await inter.edit_original_response(f"shuffle enabled") return else: shuffle = False await inter.edit_original_response(f"shuffle disabled") return @bot.slash_command( name="queue", description="list the videos in queue", ) async def getqueue(inter: disnake.AppCmdInter): await inter.response.defer(ephemeral=True) if not queue: await inter.edit_original_response("There are no items in queue") return message = f"Now Playing: <{queue[0]}>\n" try: message = message + f"Up Next: <{queue[1]}>\n" except IndexError: pass message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") res = cur.execute(f"SELECT link FROM queue WHERE hasplayed = false") links = [x[0] for x in res.fetchall()] links.insert(0,"dummy") for i in range(10): if i == 0: continue try: message = message + f"{i}. <{links[i]}>\n" except IndexError: break message = message + f"1 of {math.ceil((len(links)-1)/10) if (len(links)-1)/10 > 1 else 1}" if math.ceil((len(links)-1)/10) > 1: await inter.edit_original_response(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) else: await inter.edit_original_response(message) @bot.listen("on_button_click") async def button_listener(inter: disnake.MessageInteraction): if not queue: await inter.response.edit_message("There are no items in queue") return ogmsg = inter.message.content page = ogmsg.split("\n") page = page[-1].split(" of ") if inter.component.custom_id == "Forward": message = f"Now Playing: <{queue[0]}>\n" try: message = message + f"Up Next: <{queue[1]}>\n" except IndexError: pass message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") res = cur.execute(f"SELECT link FROM queue WHERE hasplayed = false") links = [x[0] for x in res.fetchall()] links.insert(0,"dummy") offset = int(int(page[0]) * 10) for i in range(11): if i == 0: continue try: message = message + f"{int(i+offset)}. <{links[int(i+offset)]}>\n" except IndexError: break message = message + f"{int(page[0])+1} of {math.ceil((len(links)-1)/10) if (len(links)-1)/10 > 1 else 1}" if (int(page[0])+1) >= int(page[1]): await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"),]) else: await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) return if inter.component.custom_id == "Backward": message = f"Now Playing: <{queue[0]}>\n" try: message = message + f"Up Next: <{queue[1]}>\n" except IndexError: pass message = message + f"Shuffle is currently " + ("off\n" if not shuffle else "on!\n") res = cur.execute(f"SELECT link FROM queue WHERE hasplayed = false") links = [x[0] for x in res.fetchall()] links.insert(0,"dummy") offset = int((int(page[0]) - 2) * 10) for i in range(11): if i == 0: continue try: message = message + f"{int(i+offset)}. <{links[int(i+offset)]}>\n" except IndexError: break message = message + f"{int(page[0])-1} of {math.ceil((len(links)-1)/10) if (len(links)-1)/10 > 1 else 1}" if (int(page[0])-1) <= 1 and int(int(page[1]) == 1): await inter.response.edit_message(message) elif (int(page[0])-1) <= 1 and int(int(page[1]) > 1): await inter.response.edit_message(message, components=[disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) else: await inter.response.edit_message(message, components=[disnake.ui.Button(label="<<", style=disnake.ButtonStyle.primary, custom_id="Backward"), disnake.ui.Button(label=">>", style=disnake.ButtonStyle.primary, custom_id="Forward"),]) await inter.response.edit_message(message, components=[]) return @bot.slash_command( name="toggleplayback", description="play or pause the video", ) async def toggleplayback(inter: disnake.AppCmdInter): stat = obs.get_media_input_status("player") print(stat.media_state) if stat.media_state == "OBS_MEDIA_STATE_PLAYING": obs.trigger_media_input_action("player","OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE") elif stat.media_state == "OBS_MEDIA_STATE_PAUSED": obs.trigger_media_input_action("player","OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PLAY") await inter.response.send_message("done",ephemeral=True) @bot.slash_command( name="skip", description="skips the current video, do not run this command multiple times", default_member_permissions=disnake.Permissions(8192), ) async def skip(inter: disnake.AppCmdInter): if os.getenv("ALLOW_SKIP","TRUE") == "FALSE": await inter.response.send_message("the bot owner has disabled skipping", ephemeral=True) return await inter.response.defer(ephemeral=False) loop = asyncio.get_running_loop() global vidcounter queue.pop(0) propagate_queue(1) skip_list.clear() if not queue: obs.set_current_program_scene("waiting") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 await inter.edit_original_response("skipped as sufficient votes were reached") return await loop.run_in_executor(None, wait_for_next_video) print("stopping video") #obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 nowplayingid = obs.get_scene_item_id("youtube", "nowplaying") obs.set_input_settings("player", {'playlist': [{'hidden': False, 'selected': False, 'value': f"{str(vid_dir)}"}]}, True) obs.set_input_settings("nowplaying", {'text': f'{vid_details["title"]}\nBy {vid_details["channel"]}'}, True) obs.set_scene_item_enabled("youtube", nowplayingid.scene_item_id, True) obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART") await inter.edit_original_response("skipped as sufficient votes were reached") if len(queue) > 1: await loop.run_in_executor(None, download_video, 1) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") @bot.slash_command( name="voteskip", description="vote to skip the current video", ) async def voteskip(inter: disnake.AppCmdInter): if os.getenv("ALLOW_SKIP","TRUE") == "FALSE": await inter.response.send_message("the bot owner has disabled skipping", ephemeral=True) return await inter.response.defer(ephemeral=False) if not inter.user.voice: await inter.edit_original_response("You are not in the voice channel") return if inter.user.id in skip_list: await inter.edit_original_response("You have already voted to skip this video") return vc = inter.user.voice.channel.members #this could be better due to potential for abuse, but it's fine for now print(inter.user.voice.channel.name) broadcaster = False for m in vc: if m.voice.self_video or m.voice.self_stream: broadcaster = True break if not broadcaster: await inter.edit_original_response("No one is playing video so how can this be the correct vc?") return skip_list.append(inter.user.id) print(len(skip_list)) print(math.floor(len(vc)/2)) print(vc) if len(skip_list) >= (math.floor(len(vc)/2)): loop = asyncio.get_running_loop() global vidcounter queue.pop(0) propagate_queue(1) skip_list.clear() if not queue: obs.set_current_program_scene("waiting") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 await inter.edit_original_response("skipped as sufficient votes were reached") return await loop.run_in_executor(None, wait_for_next_video) print("stopping video") obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 nowplayingid = obs.get_scene_item_id("youtube", "nowplaying") obs.set_input_settings("player", {'playlist': [{'hidden': False, 'selected': False, 'value': f"{str(vid_dir)}"}]}, True) obs.set_input_settings("nowplaying", {'text': f'{vid_details["title"]}\nBy {vid_details["channel"]}'}, True) obs.set_scene_item_enabled("youtube", nowplayingid.scene_item_id, True) obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART") await inter.edit_original_response("skipped as sufficient votes were reached") if len(queue) > 1: await loop.run_in_executor(None, download_video, 1) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") else: await inter.edit_original_response(f"**{inter.user.display_name}** has voted to skip the video, {len(skip_list)}/{math.floor(len(vc)/2)}") @tasks.loop(seconds=5.0) async def videotimer(): try: scene = obs.get_current_program_scene() if scene.scene_name != "youtube": return playing = obs.get_media_input_status("player") if playing.media_cursor >= ((float(os.getenv("MAX_MIN")) * 60000)): #skip loop = asyncio.get_running_loop() global vidcounter playing = obs.get_media_input_status("player") if playing.media_state == "OBS_MEDIA_STATE_STOPPED" or playing.media_state == "OBS_MEDIA_STATE_ENDED": #we are already handling it print("skip return") return queue.pop(0) propagate_queue(1) skip_list.clear() if not queue: obs.set_current_program_scene("waiting") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 return await loop.run_in_executor(None, wait_for_next_video) print("stopping video") #obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE") os.remove(f"{vid_dir}/{vidcounter}.mp4") vidcounter = vidcounter + 1 nowplayingid = obs.get_scene_item_id("youtube", "nowplaying") obs.set_input_settings("player", {'playlist': [{'hidden': False, 'selected': False, 'value': f"{str(vid_dir)}"}]}, True) obs.trigger_media_input_action("player", "OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART") obs.set_input_settings("nowplaying", {'text': f'{vid_details["title"]}\nBy {vid_details["channel"]}'}, True) obs.set_scene_item_enabled("youtube", nowplayingid.scene_item_id, True) if len(queue) > 1: await loop.run_in_executor(None, download_video, 1) os.rename(f"{vid_dir}/999zznext.mp4", f"{vid_dir}/{vidcounter+1}.mp4") except Exception: pass return @tasks.loop(seconds=20) async def ensurewaiting(): scene = obs.get_current_program_scene() if scene.scene_name == "waiting": return if scene.scene_name == "error" and len(os.listdir(full_dl_dir)) == 0: obs.set_current_program_scene("waiting") return elif scene.scene_name == "error": return stat = obs.get_scene_item_id("youtube", "notice") enabled = obs.get_scene_item_enabled("youtube", stat.scene_item_id) if enabled.scene_item_enabled: return playing = obs.get_media_input_status("player") if playing.media_cursor == None: obs.set_current_program_scene("error") if len(os.listdir(full_dl_dir)) == 0 and len(queue) >= 1: #just in case loop = asyncio.get_running_loop() await loop.run_in_executor(None, download_video, 0) @tasks.loop(seconds=5) async def titlehandler(): scene = obs.get_current_program_scene() if scene.scene_name != "youtube": return nowplayingid = obs.get_scene_item_id("youtube", "nowplaying") enabled = obs.get_scene_item_enabled("youtube", nowplayingid.scene_item_id) if enabled.scene_item_enabled: await asyncio.sleep(4) obs.set_scene_item_enabled("youtube", nowplayingid.scene_item_id, False) return bot.run(os.getenv("TOKEN")) print("cleaning up tempdir") temp_dir.cleanup() download_dir.cleanup() obs.set_current_program_scene("nosignal")