Source code for lavalink.lavalink

import asyncio
from typing import Optional, Tuple, Union

import discord
from discord.ext.commands import Bot

from . import enums, log, node, player, errors

__all__ = [
    "initialize",
    "connect",
    "get_player",
    "close",
    "register_event_listener",
    "unregister_event_listener",
    "register_update_listener",
    "unregister_update_listener",
    "register_stats_listener",
    "unregister_stats_listener",
    "all_players",
    "all_connected_players",
    "active_players",
    "wait_until_ready",
]

_event_listeners = []
_update_listeners = []
_stats_listeners = []


[docs]async def initialize( bot: Bot, *, host, password, port: Optional[int] = None, timeout: float = 30, resume_key: Optional[str] = None, resume_timeout: float = 60, secured: bool = False, ): """ Initializes the websocket connection to the lavalink player. .. important:: This function must only be called AFTER the bot has received its "on_ready" event! Parameters ---------- bot : Bot An instance of a discord.py `Bot` object. host : str The hostname or IP address of the Lavalink node. password : str The password of the Lavalink node. port : Optional[int] The websocket port on the Lavalink Node. If not provided, it will use 80 for unsecured connections and 443 for secured. timeout : float Amount of time to allow retries to occur, ``None`` is considered forever. resume_key : Optional[str] A resume key used for resuming a session upon re-establishing a WebSocket connection to Lavalink. resume_timeout : float How long the node should wait for a connection while disconnected before clearing all players. secured: bool Whether to use the `wss://` and `https://` protocol. """ register_event_listener(_handle_event) register_update_listener(_handle_update) lavalink_node = node.Node( event_handler=dispatch, host=host, password=password, port=port, user_id=bot.user.id, num_shards=bot.shard_count or 1, resume_key=resume_key, resume_timeout=resume_timeout, bot=bot, secured=secured, ) await lavalink_node.connect(timeout=timeout) lavalink_node._retries = 0 bot.add_listener(_on_guild_remove, name="on_guild_remove") return lavalink_node
[docs]async def connect(channel: discord.VoiceChannel, *, self_deaf: bool = False): """ Connects to a discord voice channel. This is the publicly exposed way to connect to a discord voice channel. The :py:func:`initialize` function must be called first! Parameters ---------- channel : discord.VoiceChannel The channel to connect to. self_deaf: bool Whether to deafen the bot user upon join. Returns ------- Player The created Player object. Raises ------ IndexError If there are no available lavalink nodes ready to connect to discord. """ node_ = node.get_node(channel.guild.id) p = await node_.create_player(channel, self_deaf=self_deaf) return p
[docs]def get_player(guild_id: int) -> player.Player: node_ = node.get_node(guild_id) return node_.get_player(guild_id)
async def _on_guild_remove(guild): try: p = get_player(guild.id) except (errors.NodeNotFound, errors.PlayerNotFound): pass else: await p.disconnect()
[docs]def register_event_listener(coro): """ Registers a coroutine to receive lavalink event information. This coroutine will accept three arguments: :py:class:`Player`, :py:class:`LavalinkEvents`, and possibly an extra. The value of the extra depends on the value of the second argument. If the second argument is :py:attr:`LavalinkEvents.TRACK_END`, the extra will be a :py:class:`TrackEndReason`. If the second argument is :py:attr:`LavalinkEvents.TRACK_EXCEPTION`, the extra will be a dictionary with ``message``, ``cause``, and ``severity`` keys. If the second argument is :py:attr:`LavalinkEvents.TRACK_STUCK`, the extra will be the threshold milliseconds that the track has been stuck for. If the second argument is :py:attr:`LavalinkEvents.TRACK_START`, the extra will be a track identifier string. If the second argument is any other value, the third argument will not exist. Parameters ---------- coro A coroutine function that accepts the arguments listed above. Raises ------ TypeError If ``coro`` is not a coroutine. """ if not asyncio.iscoroutinefunction(coro): raise TypeError("Function is not a coroutine.") if coro not in _event_listeners: _event_listeners.append(coro)
async def _handle_event(player, data: enums.LavalinkEvents, extra): await player.handle_event(data, extra) def _get_event_args(data: enums.LavalinkEvents, raw_data: dict): guild_id = int(raw_data.get("guildId")) try: node_ = node.get_node(guild_id, ignore_ready_status=True) player = node_.get_player(guild_id) except (errors.NodeNotFound, errors.PlayerNotFound): if data != enums.LavalinkEvents.TRACK_END: log.debug( "Got an event for a guild that we have no player for." " This may be because of a forced voice channel" " disconnect." ) return extra = None if data == enums.LavalinkEvents.TRACK_END: extra = enums.TrackEndReason(raw_data.get("reason")) elif data == enums.LavalinkEvents.TRACK_EXCEPTION: exception_data = raw_data["exception"] extra = { "message": exception_data["message"], "cause": exception_data["cause"], "severity": enums.ExceptionSeverity(exception_data["severity"]), } elif data == enums.LavalinkEvents.TRACK_STUCK: extra = raw_data.get("thresholdMs") elif data == enums.LavalinkEvents.TRACK_START: extra = raw_data.get("track") elif data == enums.LavalinkEvents.WEBSOCKET_CLOSED: extra = { "code": raw_data.get("code"), "reason": raw_data.get("reason"), "byRemote": raw_data.get("byRemote"), "channelID": player.channel.id if player.channel else None, } return player, data, extra
[docs]def unregister_event_listener(coro): """ Unregisters coroutines from being event listeners. Parameters ---------- coro """ try: _event_listeners.remove(coro) except ValueError: pass
[docs]def register_update_listener(coro): """ Registers a coroutine to receive lavalink player update information. This coroutine will accept a two arguments: an instance of :py:class:`Player` and an instance of :py:class:`PlayerState`. Parameters ---------- coro Raises ------ TypeError If ``coro`` is not a coroutine. """ if not asyncio.iscoroutinefunction(coro): raise TypeError("Function is not a coroutine.") if coro not in _update_listeners: _update_listeners.append(coro)
async def _handle_update(player, data: node.PositionTime, raw_data: dict): await player.handle_player_update(data) def _get_update_args( data: Union[node.PositionTime, enums.LavalinkEvents, node.Stats], raw_data: dict ): guild_id = int(raw_data.get("guildId")) try: player = get_player(guild_id) except (errors.NodeNotFound, errors.PlayerNotFound): log.debug( "Got a player update for a guild that we have no player for." " This may be because of a forced voice channel disconnect." ) return return player, data, raw_data
[docs]def unregister_update_listener(coro): """ Unregisters coroutines from being player update listeners. Parameters ---------- coro """ try: _update_listeners.remove(coro) except ValueError: pass
[docs]def register_stats_listener(coro): """ Registers a coroutine to receive lavalink server stats information. This coroutine will accept a single argument which will be an instance of :py:class:`Stats`. Parameters ---------- coro Raises ------ TypeError If ``coro`` is not a coroutine. """ if not asyncio.iscoroutinefunction(coro): raise TypeError("Function is not a coroutine.") if coro not in _stats_listeners: _stats_listeners.append(coro)
[docs]def unregister_stats_listener(coro): """ Unregisters coroutines from being server stats listeners. Parameters ---------- coro """ try: _stats_listeners.remove(coro) except ValueError: pass
def dispatch( op: enums.LavalinkIncomingOp, data: Union[node.PositionTime, enums.LavalinkEvents, node.Stats], raw_data: dict, ): listeners = [] args = [] if op == enums.LavalinkIncomingOp.EVENT: listeners = _event_listeners args = _get_event_args(data, raw_data) elif op == enums.LavalinkIncomingOp.PLAYER_UPDATE: listeners = _update_listeners args = _get_update_args(data, raw_data) elif op == enums.LavalinkIncomingOp.STATS: listeners = _stats_listeners args = [data] if args is None: # For example, no player because channel got removed. return for coro in listeners: asyncio.create_task(coro(*args))
[docs]async def close(bot): """ Closes the lavalink connection completely. """ log.debug("Closing Lavalink connections") unregister_event_listener(_handle_event) unregister_update_listener(_handle_update) bot.remove_listener(_on_guild_remove, name="on_guild_remove") await node.disconnect() log.debug("All Lavalink nodes have been disconnected")
# Helper methods def all_players() -> Tuple[player.Player]: nodes = node._nodes ret = tuple(p for n in nodes for p in n.players) return ret def all_connected_players() -> Tuple[player.Player]: nodes = node._nodes ret = tuple(p for n in nodes for p in n.players if p.connected) return ret def active_players() -> Tuple[player.Player]: ps = all_connected_players() return tuple(p for p in ps if p.is_playing) async def wait_until_ready( *, timeout: Optional[float] = None, wait_if_no_node: Optional[int] = None ): if wait_if_no_node: for iteration in range(0, abs(wait_if_no_node), 1): if not node._nodes: await asyncio.sleep(1) else: break if not node._nodes: raise asyncio.TimeoutError for result in await asyncio.gather( *( node_.wait_until_ready(timeout) for node_ in node._nodes if (not node_.ready) and not node_.state == node.NodeState.DISCONNECTING ), return_exceptions=True, ): if result is not None: raise result