import math import discord from discord import guild from discord.ext.commands.errors import CommandError import youtube_dl from discord.ext import commands from utils.models import VoiceState,YTDLSource, YTDLError, Song, VoiceError # Silence useless bug reports messages youtube_dl.utils.bug_reports_message = lambda: '' class Music(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot self.voice_states = {} def get_voice_state(self, ctx: commands.Context): state = VoiceState(self.bot, ctx) voice_client = discord.utils.get(self.bot.voice_clients, guild=ctx.guild) if voice_client is not None: state.voice = voice_client self.voice_states[ctx.guild.id] = state return state def cog_unload(self): for state in self.voice_states.values(): self.bot.loop.create_task(state.stop()) def cog_check(self, ctx: commands.Context): if not ctx.guild: raise commands.NoPrivateMessage('This command can\'t be used in DM channels.') return True async def cog_before_invoke(self, ctx: commands.Context): ctx.voice_state = self.get_voice_state(ctx) async def cog_command_error(self, ctx: commands.Context, error: commands.CommandError): # await ctx.send('An error occurred: {}'.format(str(error))) pass @commands.command(name='join', invoke_without_subcommand=True) async def _join(self, ctx: commands.Context): """Joins a voice channel.""" destination = ctx.author.voice.channel if ctx.voice_state.voice: await ctx.voice_state.voice.move_to(destination) return voice_client = discord.utils.get(self.bot.voice_clients, guild=ctx.guild) if voice_client == None: ctx.voice_state.voice = await destination.connect() else: ctx.voice_state.voice = voice_client @commands.command(name='summon') async def _summon(self, ctx: commands.Context, *, channel: discord.VoiceChannel = None): """Summons the bot to a voice channel. If no channel was specified, it joins your channel. """ if not channel and not ctx.author.voice: raise VoiceError('You are neither connected to a voice channel nor specified a channel to join.') destination = channel or ctx.author.voice.channel if ctx.voice_state.voice: await ctx.voice_state.voice.move_to(destination) return ctx.voice_state.voice = await destination.connect() @commands.command(name='leave', aliases=['disconnect', 'q', 'dc']) async def _leave(self, ctx: commands.Context): """Clears the queue and leaves the voice channel.""" if not ctx.voice_state.voice: return await ctx.send('Not connected to any voice channel.') await ctx.voice_state.stop() del self.voice_states[ctx.guild.id] @commands.command(name='volume') async def _volume(self, ctx: commands.Context, *, volume: int): """Sets the volume of the player.""" if not ctx.voice_state.is_playing: return await ctx.send('Nothing being played at the moment.') if 0 > volume > 100: return await ctx.send('Volume must be between 0 and 100') ctx.voice_state.volume = volume / 100 await ctx.send('Volume of the player set to {}%'.format(volume)) @commands.command(name='now', aliases=['current', 'playing']) async def _now(self, ctx: commands.Context): """Displays the currently playing song.""" await ctx.send(embed=ctx.voice_state.current.create_embed()) @commands.command(name='pause') async def _pause(self, ctx: commands.Context): """Pauses the currently playing song.""" if ctx.voice_state.is_playing and ctx.voice_state.voice.is_playing(): ctx.voice_state.voice.pause() await ctx.message.add_reaction('⏯') @commands.command(name='resume', aliases=['r']) async def _resume(self, ctx: commands.Context): """Resumes a currently paused song.""" if ctx.voice_state.is_playing and ctx.voice_state.voice.is_paused(): ctx.voice_state.voice.resume() await ctx.message.add_reaction('⏯') @commands.command(name='stop') async def _stop(self, ctx: commands.Context): """Stops playing song and clears the queue.""" ctx.voice_state.songs.clear() if ctx.voice_state.is_playing: ctx.voice_state.voice.stop() await ctx.message.add_reaction('⏹') @commands.command(name='skip', aliases=['s', 'next', 'n']) async def _skip(self, ctx: commands.Context): """Vote to skip a song. The requester can automatically skip.""" if not ctx.voice_state.is_playing: return await ctx.send('Not playing any music right now...') await ctx.message.add_reaction('⏭') ctx.voice_state.skip() @commands.command(name='queue') async def _queue(self, ctx: commands.Context, *, page: int = 1): """Shows the player's queue. You can optionally specify the page to show. Each page contains 10 elements. """ if len(ctx.voice_state.songs) == 0: return await ctx.send('Empty queue.') items_per_page = 10 pages = math.ceil(len(ctx.voice_state.songs) / items_per_page) start = (page - 1) * items_per_page end = start + items_per_page queue = '' for i, song in enumerate(ctx.voice_state.songs[start:end], start=start): queue += '`{0}.` [**{1.source.title}**]({1.source.url})\n'.format(i + 1, song) embed = (discord.Embed(description='**{} tracks:**\n\n{}'.format(len(ctx.voice_state.songs), queue)) .set_footer(text='Viewing page {}/{}'.format(page, pages))) await ctx.send(embed=embed) @commands.command(name='shuffle', aliases=['sf']) async def _shuffle(self, ctx: commands.Context): """Shuffles the queue.""" if len(ctx.voice_state.songs) == 0: return await ctx.send('Empty queue.') ctx.voice_state.songs.shuffle() await ctx.message.add_reaction('✅') @commands.command(name='remove') async def _remove(self, ctx: commands.Context, index: int): """Removes a song from the queue at a given index.""" if len(ctx.voice_state.songs) == 0: return await ctx.send('Empty queue.') ctx.voice_state.songs.remove(index - 1) await ctx.message.add_reaction('✅') @commands.command(name='loop', aliases=['l']) async def _loop(self, ctx: commands.Context): """Loops the currently playing song. Invoke this command again to unloop the song. """ if not ctx.voice_state.is_playing: return await ctx.send('Nothing being played at the moment.') # Inverse boolean value to loop and unloop. ctx.voice_state.loop = not ctx.voice_state.loop await ctx.message.add_reaction('✅') @commands.command(name='play', aliases=['p']) async def _play(self, ctx: commands.Context, *, search: str): """Plays a song. If there are songs in the queue, this will be queued until the other songs finished playing. This command automatically searches from various sites if no URL is provided. A list of these sites can be found here: https://rg3.github.io/youtube-dl/supportedsites.html """ quiet_flag = False flags = ['-q'] for flag in flags: if ' ' + flag in search: search = search.replace(' ' + flag, '') if flag == '-q': quiet_flag = True if not ctx.voice_state.voice: await ctx.invoke(self._join) async with ctx.typing(): try: source = await YTDLSource.create_source(ctx, search, loop=self.bot.loop, quiet=quiet_flag) except YTDLError as e: await ctx.send('An error occurred while processing this request: {}'.format(str(e))) else: song = Song(source) await ctx.voice_state.songs.put(song) if not quiet_flag: await ctx.send('Enqueued {}'.format(str(source))) else: await ctx.message.delete() @_join.before_invoke @_play.before_invoke async def ensure_voice_state(self, ctx: commands.Context): if not ctx.author.voice or not ctx.author.voice.channel: raise commands.CommandError('You are not connected to any voice channel.') if ctx.voice_client and ctx.voice_client.channel != ctx.author.voice.channel: raise commands.CommandError('Bot is already in a voice channel.')