Format with Black

This commit is contained in:
Stein Magnus Jodal 2020-03-08 12:30:34 +01:00
parent b067352a00
commit 1b04266d92
7 changed files with 466 additions and 207 deletions

View file

@ -2,28 +2,29 @@ import os
from mopidy import ext, config from mopidy import ext, config
__version__ = '0.2.1' __version__ = "0.2.1"
class SubidyExtension(ext.Extension): class SubidyExtension(ext.Extension):
dist_name = 'Mopidy-Subidy' dist_name = "Mopidy-Subidy"
ext_name = 'subidy' ext_name = "subidy"
version = __version__ version = __version__
def get_default_config(self): def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf') conf_file = os.path.join(os.path.dirname(__file__), "ext.conf")
return config.read(conf_file) return config.read(conf_file)
def get_config_schema(self): def get_config_schema(self):
schema = super().get_config_schema() schema = super().get_config_schema()
schema['url'] = config.String() schema["url"] = config.String()
schema['username'] = config.String() schema["username"] = config.String()
schema['password'] = config.Secret() schema["password"] = config.Secret()
schema['legacy_auth'] = config.Boolean(optional=True) schema["legacy_auth"] = config.Boolean(optional=True)
schema['api_version'] = config.String(optional=True) schema["api_version"] = config.String(optional=True)
return schema return schema
def setup(self, registry): def setup(self, registry):
from .backend import SubidyBackend from .backend import SubidyBackend
registry.add('backend', SubidyBackend)
registry.add("backend", SubidyBackend)

View file

@ -3,18 +3,22 @@ from mopidy_subidy import library, playback, playlists, subsonic_api
from mopidy import backend from mopidy import backend
import pykka import pykka
class SubidyBackend(pykka.ThreadingActor, backend.Backend): class SubidyBackend(pykka.ThreadingActor, backend.Backend):
def __init__(self, config, audio): def __init__(self, config, audio):
super().__init__() super().__init__()
subidy_config = config['subidy'] subidy_config = config["subidy"]
self.subsonic_api = subsonic_api.SubsonicApi( self.subsonic_api = subsonic_api.SubsonicApi(
url=subidy_config['url'], url=subidy_config["url"],
username=subidy_config['username'], username=subidy_config["username"],
password=subidy_config['password'], password=subidy_config["password"],
app_name=mopidy_subidy.SubidyExtension.dist_name, app_name=mopidy_subidy.SubidyExtension.dist_name,
legacy_auth=subidy_config['legacy_auth'], legacy_auth=subidy_config["legacy_auth"],
api_version=subidy_config['api_version']) api_version=subidy_config["api_version"],
)
self.library = library.SubidyLibraryProvider(backend=self) self.library = library.SubidyLibraryProvider(backend=self)
self.playback = playback.SubidyPlaybackProvider(audio=audio, backend=self) self.playback = playback.SubidyPlaybackProvider(
audio=audio, backend=self
)
self.playlists = playlists.SubidyPlaylistsProvider(backend=self) self.playlists = playlists.SubidyPlaylistsProvider(backend=self)
self.uri_schemes = ['subidy'] self.uri_schemes = ["subidy"]

View file

@ -3,8 +3,10 @@ from mopidy.models import Ref, SearchResult
from mopidy_subidy import uri from mopidy_subidy import uri
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SubidyLibraryProvider(backend.LibraryProvider): class SubidyLibraryProvider(backend.LibraryProvider):
def __create_vdirs(): def __create_vdirs():
vdir_templates = [ vdir_templates = [
@ -20,7 +22,7 @@ class SubidyLibraryProvider(backend.LibraryProvider):
for template in vdir_templates: for template in vdir_templates:
vdir = template.copy() vdir = template.copy()
vdir.update(uri=uri.get_vdir_uri(vdir["id"])) vdir.update(uri=uri.get_vdir_uri(vdir["id"]))
vdirs[template['id']] = vdir vdirs[template["id"]] = vdir
return vdirs return vdirs
_vdirs = __create_vdirs() _vdirs = __create_vdirs()
@ -28,11 +30,9 @@ class SubidyLibraryProvider(backend.LibraryProvider):
def __raw_vdir_to_ref(vdir): def __raw_vdir_to_ref(vdir):
if vdir is None: if vdir is None:
return None return None
return Ref.directory( return Ref.directory(name=vdir["name"], uri=vdir["uri"])
name=vdir['name'],
uri=vdir['uri'])
root_directory = __raw_vdir_to_ref(_vdirs['root']) root_directory = __raw_vdir_to_ref(_vdirs["root"])
_raw_vdir_to_ref = staticmethod(__raw_vdir_to_ref) _raw_vdir_to_ref = staticmethod(__raw_vdir_to_ref)
@ -66,19 +66,29 @@ class SubidyLibraryProvider(backend.LibraryProvider):
return self.subsonic_api.get_songs_as_tracks(album_id) return self.subsonic_api.get_songs_as_tracks(album_id)
def lookup_artist(self, artist_id): def lookup_artist(self, artist_id):
return list(self.subsonic_api.get_artist_as_songs_as_tracks_iter(artist_id)) return list(
self.subsonic_api.get_artist_as_songs_as_tracks_iter(artist_id)
)
def lookup_directory(self, directory_id): def lookup_directory(self, directory_id):
return list(self.subsonic_api.get_recursive_dir_as_songs_as_tracks_iter(directory_id)) return list(
self.subsonic_api.get_recursive_dir_as_songs_as_tracks_iter(
directory_id
)
)
def lookup_playlist(self, playlist_id): def lookup_playlist(self, playlist_id):
return self.subsonic_api.get_playlist_as_playlist(playlist_id).tracks return self.subsonic_api.get_playlist_as_playlist(playlist_id).tracks
def browse(self, browse_uri): def browse(self, browse_uri):
if browse_uri == uri.get_vdir_uri('root'): if browse_uri == uri.get_vdir_uri("root"):
root_vdir_names = ["rootdirs", "artists", "albums"] root_vdir_names = ["rootdirs", "artists", "albums"]
root_vdirs = [self._vdirs[vdir_name] for vdir_name in root_vdir_names] root_vdirs = [
sorted_root_vdirs = sorted(root_vdirs, key=lambda vdir: vdir["name"]) self._vdirs[vdir_name] for vdir_name in root_vdir_names
]
sorted_root_vdirs = sorted(
root_vdirs, key=lambda vdir: vdir["name"]
)
return [self._raw_vdir_to_ref(vdir) for vdir in sorted_root_vdirs] return [self._raw_vdir_to_ref(vdir) for vdir in sorted_root_vdirs]
elif browse_uri == uri.get_vdir_uri("rootdirs"): elif browse_uri == uri.get_vdir_uri("rootdirs"):
return self.browse_rootdirs() return self.browse_rootdirs()
@ -136,39 +146,52 @@ class SubidyLibraryProvider(backend.LibraryProvider):
return SearchResult(tracks=[song]) return SearchResult(tracks=[song])
return None return None
def search_by_artist_album_and_track(self, artist_name, album_name, track_name): def search_by_artist_album_and_track(
self, artist_name, album_name, track_name
):
tracks = self.search_by_artist_and_album(artist_name, album_name) tracks = self.search_by_artist_and_album(artist_name, album_name)
track = next(item for item in tracks.tracks if track_name in item.name) track = next(item for item in tracks.tracks if track_name in item.name)
return SearchResult(tracks=[track]) return SearchResult(tracks=[track])
def search_by_artist_and_album(self, artist_name, album_name): def search_by_artist_and_album(self, artist_name, album_name):
artists = self.subsonic_api.get_raw_artists() artists = self.subsonic_api.get_raw_artists()
artist = next(item for item in artists if artist_name in item.get('name')) artist = next(
albums = self.subsonic_api.get_raw_albums(artist.get('id')) item for item in artists if artist_name in item.get("name")
album = next(item for item in albums if album_name in item.get('title')) )
return SearchResult(tracks=self.subsonic_api.get_songs_as_tracks(album.get('id'))) albums = self.subsonic_api.get_raw_albums(artist.get("id"))
album = next(item for item in albums if album_name in item.get("title"))
return SearchResult(
tracks=self.subsonic_api.get_songs_as_tracks(album.get("id"))
)
def get_distinct(self, field, query): def get_distinct(self, field, query):
search_result = self.search(query) search_result = self.search(query)
if not search_result: if not search_result:
return [] return []
if field == 'track' or field == 'title': if field == "track" or field == "title":
return [track.name for track in (search_result.tracks or [])] return [track.name for track in (search_result.tracks or [])]
if field == 'album': if field == "album":
return [album.name for album in (search_result.albums or [])] return [album.name for album in (search_result.albums or [])]
if field == 'artist': if field == "artist":
if not search_result.artists: if not search_result.artists:
return [artist.name for artist in self.browse_artists()] return [artist.name for artist in self.browse_artists()]
return [artist.name for artist in search_result.artists] return [artist.name for artist in search_result.artists]
def search(self, query=None, uris=None, exact=False): def search(self, query=None, uris=None, exact=False):
if 'artist' in query and 'album' in query and 'track_name' in query: if "artist" in query and "album" in query and "track_name" in query:
return self.search_by_artist_album_and_track(query.get('artist')[0], query.get('album')[0], query.get('track_name')[0]) return self.search_by_artist_album_and_track(
if 'artist' in query and 'album' in query: query.get("artist")[0],
return self.search_by_artist_and_album(query.get('artist')[0], query.get('album')[0]) query.get("album")[0],
if 'artist' in query: query.get("track_name")[0],
return self.subsonic_api.find_as_search_result(query.get('artist')[0]) )
if 'any' in query: if "artist" in query and "album" in query:
return self.subsonic_api.find_as_search_result(query.get('any')[0]) return self.search_by_artist_and_album(
query.get("artist")[0], query.get("album")[0]
)
if "artist" in query:
return self.subsonic_api.find_as_search_result(
query.get("artist")[0]
)
if "any" in query:
return self.subsonic_api.find_as_search_result(query.get("any")[0])
return SearchResult(artists=self.subsonic_api.get_artists_as_artists()) return SearchResult(artists=self.subsonic_api.get_artists_as_artists())

View file

@ -4,6 +4,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SubidyPlaybackProvider(backend.PlaybackProvider): class SubidyPlaybackProvider(backend.PlaybackProvider):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)

View file

@ -3,8 +3,10 @@ from mopidy_subidy import uri
from mopidy.models import Playlist from mopidy.models import Playlist
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SubidyPlaylistsProvider(backend.PlaylistsProvider): class SubidyPlaylistsProvider(backend.PlaylistsProvider):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -19,7 +21,7 @@ class SubidyPlaylistsProvider(backend.PlaylistsProvider):
result = self.subsonic_api.create_playlist_raw(name) result = self.subsonic_api.create_playlist_raw(name)
if result is None: if result is None:
return None return None
playlist = result.get('playlist') playlist = result.get("playlist")
if playlist is None: if playlist is None:
for pl in self.subsonic_api.get_playlists_as_playlists(): for pl in self.subsonic_api.get_playlists_as_playlists():
if pl.name == name: if pl.name == name:
@ -33,12 +35,16 @@ class SubidyPlaylistsProvider(backend.PlaylistsProvider):
self.subsonic_api.delete_playlist_raw(playlist_id) self.subsonic_api.delete_playlist_raw(playlist_id)
def get_items(self, items_uri): def get_items(self, items_uri):
#logger.info('ITEMS %s: %s' % (lookup_uri, self.subsonic_api.get_playlist_songs_as_refs(uri.get_playlist_id(items_uri)))) # logger.info('ITEMS %s: %s' % (lookup_uri, self.subsonic_api.get_playlist_songs_as_refs(uri.get_playlist_id(items_uri))))
return self.subsonic_api.get_playlist_as_songs_as_refs(uri.get_playlist_id(items_uri)) return self.subsonic_api.get_playlist_as_songs_as_refs(
uri.get_playlist_id(items_uri)
)
def lookup(self, lookup_uri): def lookup(self, lookup_uri):
#logger.info('LOOKUP PLAYLIST %s: %s' % (lookup_uri, self.subsonic_api.get_playlist_as_playlist(uri.get_playlist_id(lookup_uri)))) # logger.info('LOOKUP PLAYLIST %s: %s' % (lookup_uri, self.subsonic_api.get_playlist_as_playlist(uri.get_playlist_id(lookup_uri))))
return self.subsonic_api.get_playlist_as_playlist(uri.get_playlist_id(lookup_uri)) return self.subsonic_api.get_playlist_as_playlist(
uri.get_playlist_id(lookup_uri)
)
def refresh(self): def refresh(self):
pass pass

View file

@ -9,18 +9,19 @@ from mopidy_subidy import uri
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
RESPONSE_OK = 'ok' RESPONSE_OK = "ok"
UNKNOWN_SONG = 'Unknown Song' UNKNOWN_SONG = "Unknown Song"
UNKNOWN_ALBUM = 'Unknown Album' UNKNOWN_ALBUM = "Unknown Album"
UNKNOWN_ARTIST = 'Unknown Artist' UNKNOWN_ARTIST = "Unknown Artist"
MAX_SEARCH_RESULTS = 100 MAX_SEARCH_RESULTS = 100
MAX_LIST_RESULTS = 500 MAX_LIST_RESULTS = 500
ref_sort_key = lambda ref: ref.name ref_sort_key = lambda ref: ref.name
def string_nums_nocase_sort_key(s): def string_nums_nocase_sort_key(s):
segments = [] segments = []
for substr in re.split(r'(\d+)', s): for substr in re.split(r"(\d+)", s):
if substr.isdigit(): if substr.isdigit():
seg = int(substr) seg = int(substr)
else: else:
@ -28,37 +29,49 @@ def string_nums_nocase_sort_key(s):
segments.append(seg) segments.append(seg)
return segments return segments
def diritem_sort_key(item): def diritem_sort_key(item):
isdir = item['isDir'] isdir = item["isDir"]
if isdir: if isdir:
key = string_nums_nocase_sort_key(item['title']) key = string_nums_nocase_sort_key(item["title"])
else: else:
key = int(item.get('track', 1)) key = int(item.get("track", 1))
return (isdir, key) return (isdir, key)
class SubsonicApi():
def __init__(self, url, username, password, app_name, legacy_auth, api_version): class SubsonicApi:
def __init__(
self, url, username, password, app_name, legacy_auth, api_version
):
parsed = urlparse(url) parsed = urlparse(url)
self.port = parsed.port if parsed.port else \ self.port = (
443 if parsed.scheme == 'https' else 80 parsed.port
base_url = parsed.scheme + '://' + parsed.hostname if parsed.port
else 443
if parsed.scheme == "https"
else 80
)
base_url = parsed.scheme + "://" + parsed.hostname
self.connection = libsonic.Connection( self.connection = libsonic.Connection(
base_url, base_url,
username, username,
password, password,
self.port, self.port,
parsed.path + '/rest', parsed.path + "/rest",
appName=app_name, appName=app_name,
legacyAuth=legacy_auth, legacyAuth=legacy_auth,
apiVersion=api_version) apiVersion=api_version,
self.url = url + '/rest' )
self.url = url + "/rest"
self.username = username self.username = username
self.password = password self.password = password
logger.info(f'Connecting to subsonic server on url {url} as user {username}, API version {api_version}') logger.info(
f"Connecting to subsonic server on url {url} as user {username}, API version {api_version}"
)
try: try:
self.connection.ping() self.connection.ping()
except Exception as e: except Exception as e:
logger.error('Unable to reach subsonic server: %s' % e) logger.error("Unable to reach subsonic server: %s" % e)
exit() exit()
def get_subsonic_uri(self, view_name, params, censor=False): def get_subsonic_uri(self, view_name, params, censor=False):
@ -67,50 +80,84 @@ class SubsonicApi():
di_params.update(c=self.connection.appName) di_params.update(c=self.connection.appName)
di_params.update(v=self.connection.apiVersion) di_params.update(v=self.connection.apiVersion)
if censor: if censor:
di_params.update(u='*****', p='*****') di_params.update(u="*****", p="*****")
else: else:
di_params.update(u=self.username, p=self.password) di_params.update(u=self.username, p=self.password)
return '{}/{}.view?{}'.format(self.url, view_name, urlencode(di_params)) return "{}/{}.view?{}".format(self.url, view_name, urlencode(di_params))
def get_song_stream_uri(self, song_id): def get_song_stream_uri(self, song_id):
return self.get_subsonic_uri('stream', dict(id=song_id)) return self.get_subsonic_uri("stream", dict(id=song_id))
def get_censored_song_stream_uri(self, song_id): def get_censored_song_stream_uri(self, song_id):
return self.get_subsonic_uri('stream', dict(id=song_id), True) return self.get_subsonic_uri("stream", dict(id=song_id), True)
def find_raw(self, query, exclude_artists=False, exclude_albums=False, exclude_songs=False): def find_raw(
self,
query,
exclude_artists=False,
exclude_albums=False,
exclude_songs=False,
):
try: try:
response = self.connection.search2( response = self.connection.search2(
query.encode('utf-8'), query.encode("utf-8"),
MAX_SEARCH_RESULTS if not exclude_artists else 0, 0, MAX_SEARCH_RESULTS if not exclude_artists else 0,
MAX_SEARCH_RESULTS if not exclude_albums else 0, 0, 0,
MAX_SEARCH_RESULTS if not exclude_songs else 0, 0) MAX_SEARCH_RESULTS if not exclude_albums else 0,
0,
MAX_SEARCH_RESULTS if not exclude_songs else 0,
0,
)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when searching.') logger.warning("Connecting to subsonic failed when searching.")
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return response.get('searchResult2') return response.get("searchResult2")
def find_as_search_result(self, query, exclude_artists=False, exclude_albums=False, exclude_songs=False): def find_as_search_result(
self,
query,
exclude_artists=False,
exclude_albums=False,
exclude_songs=False,
):
result = self.find_raw(query) result = self.find_raw(query)
if result is None: if result is None:
return None return None
return SearchResult( return SearchResult(
uri=uri.get_search_uri(query), uri=uri.get_search_uri(query),
artists=[self.raw_artist_to_artist(artist) for artist in result.get('artist') or []], artists=[
albums=[self.raw_album_to_album(album) for album in result.get('album') or []], self.raw_artist_to_artist(artist)
tracks=[self.raw_song_to_track(song) for song in result.get('song') or []]) for artist in result.get("artist") or []
],
albums=[
self.raw_album_to_album(album)
for album in result.get("album") or []
],
tracks=[
self.raw_song_to_track(song)
for song in result.get("song") or []
],
)
def create_playlist_raw(self, name): def create_playlist_raw(self, name):
try: try:
response = self.connection.createPlaylist(name=name) response = self.connection.createPlaylist(name=name)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when creating playlist.') logger.warning(
"Connecting to subsonic failed when creating playlist."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return response return response
@ -118,21 +165,33 @@ class SubsonicApi():
try: try:
response = self.connection.deletePlaylist(playlist_id) response = self.connection.deletePlaylist(playlist_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when deleting playlist.') logger.warning(
"Connecting to subsonic failed when deleting playlist."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return response return response
def save_playlist_raw(self, playlist_id, song_ids): def save_playlist_raw(self, playlist_id, song_ids):
try: try:
response = self.connection.createPlaylist(playlist_id, songIds=song_ids) response = self.connection.createPlaylist(
playlist_id, songIds=song_ids
)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when creating playlist.') logger.warning(
"Connecting to subsonic failed when creating playlist."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return response return response
@ -140,79 +199,135 @@ class SubsonicApi():
try: try:
response = self.connection.getArtists() response = self.connection.getArtists()
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading list of artists.') logger.warning(
"Connecting to subsonic failed when loading list of artists."
)
return [] return []
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return [] return []
letters = response.get('artists').get('index') letters = response.get("artists").get("index")
if letters is not None: if letters is not None:
artists = [artist for letter in letters for artist in letter.get('artist') or []] artists = [
artist
for letter in letters
for artist in letter.get("artist") or []
]
return artists return artists
logger.warning('Subsonic does not seem to have any artists in it\'s library.') logger.warning(
"Subsonic does not seem to have any artists in it's library."
)
return [] return []
def get_raw_rootdirs(self): def get_raw_rootdirs(self):
try: try:
response = self.connection.getIndexes() response = self.connection.getIndexes()
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading list of rootdirs.') logger.warning(
"Connecting to subsonic failed when loading list of rootdirs."
)
return [] return []
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return [] return []
letters = response.get('indexes').get('index') letters = response.get("indexes").get("index")
if letters is not None: if letters is not None:
artists = [artist for letter in letters for artist in letter.get('artist') or []] artists = [
artist
for letter in letters
for artist in letter.get("artist") or []
]
return artists return artists
logger.warning('Subsonic does not seem to have any rootdirs in its library.') logger.warning(
"Subsonic does not seem to have any rootdirs in its library."
)
return [] return []
def get_song_by_id(self, song_id): def get_song_by_id(self, song_id):
try: try:
response = self.connection.getSong(song_id) response = self.connection.getSong(song_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading song by id.') logger.warning(
"Connecting to subsonic failed when loading song by id."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return self.raw_song_to_track(response.get('song')) if response.get('song') is not None else None return (
self.raw_song_to_track(response.get("song"))
if response.get("song") is not None
else None
)
def get_album_by_id(self, album_id): def get_album_by_id(self, album_id):
try: try:
response = self.connection.getAlbum(album_id) response = self.connection.getAlbum(album_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading album by id.') logger.warning(
"Connecting to subsonic failed when loading album by id."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return self.raw_album_to_album(response.get('album')) if response.get('album') is not None else None return (
self.raw_album_to_album(response.get("album"))
if response.get("album") is not None
else None
)
def get_artist_by_id(self, artist_id): def get_artist_by_id(self, artist_id):
try: try:
response = self.connection.getArtist(artist_id) response = self.connection.getArtist(artist_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading artist by id.') logger.warning(
"Connecting to subsonic failed when loading artist by id."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return self.raw_artist_to_artist(response.get('artist')) if response.get('artist') is not None else None return (
self.raw_artist_to_artist(response.get("artist"))
if response.get("artist") is not None
else None
)
def get_raw_playlists(self): def get_raw_playlists(self):
try: try:
response = self.connection.getPlaylists() response = self.connection.getPlaylists()
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading list of playlists.') logger.warning(
"Connecting to subsonic failed when loading list of playlists."
)
return [] return []
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return [] return []
playlists = response.get('playlists').get('playlist') playlists = response.get("playlists").get("playlist")
if playlists is None: if playlists is None:
logger.warning('Subsonic does not seem to have any playlists in it\'s library.') logger.warning(
"Subsonic does not seem to have any playlists in it's library."
)
return [] return []
return playlists return playlists
@ -220,25 +335,35 @@ class SubsonicApi():
try: try:
response = self.connection.getPlaylist(playlist_id) response = self.connection.getPlaylist(playlist_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading playlist.') logger.warning(
"Connecting to subsonic failed when loading playlist."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
return response.get('playlist') return response.get("playlist")
def get_raw_dir(self, parent_id): def get_raw_dir(self, parent_id):
try: try:
response = self.connection.getMusicDirectory(parent_id) response = self.connection.getMusicDirectory(parent_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when listing content of music directory.') logger.warning(
"Connecting to subsonic failed when listing content of music directory."
)
return None return None
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return None return None
directory = response.get('directory') directory = response.get("directory")
if directory is not None: if directory is not None:
diritems = directory.get('child') diritems = directory.get("child")
return sorted(diritems, key=diritem_sort_key) return sorted(diritems, key=diritem_sort_key)
return None return None
@ -246,26 +371,39 @@ class SubsonicApi():
try: try:
response = self.connection.getArtist(artist_id) response = self.connection.getArtist(artist_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading list of albums.') logger.warning(
"Connecting to subsonic failed when loading list of albums."
)
return [] return []
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return [] return []
albums = response.get('artist').get('album') albums = response.get("artist").get("album")
if albums is not None: if albums is not None:
return sorted(albums, key=lambda album: string_nums_nocase_sort_key(album['name'])) return sorted(
albums,
key=lambda album: string_nums_nocase_sort_key(album["name"]),
)
return [] return []
def get_raw_songs(self, album_id): def get_raw_songs(self, album_id):
try: try:
response = self.connection.getAlbum(album_id) response = self.connection.getAlbum(album_id)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading list of songs in album.') logger.warning(
"Connecting to subsonic failed when loading list of songs in album."
)
return [] return []
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return [] return []
songs = response.get('album').get('song') songs = response.get("album").get("song")
if songs is not None: if songs is not None:
return songs return songs
return [] return []
@ -274,46 +412,84 @@ class SubsonicApi():
try: try:
response = self.connection.getAlbumList2(ltype=ltype, size=size) response = self.connection.getAlbumList2(ltype=ltype, size=size)
except Exception as e: except Exception as e:
logger.warning('Connecting to subsonic failed when loading album list.') logger.warning(
"Connecting to subsonic failed when loading album list."
)
return [] return []
if response.get('status') != RESPONSE_OK: if response.get("status") != RESPONSE_OK:
logger.warning('Got non-okay status code from subsonic: %s' % response.get('status')) logger.warning(
"Got non-okay status code from subsonic: %s"
% response.get("status")
)
return [] return []
albums = response.get('albumList2').get('album') albums = response.get("albumList2").get("album")
if albums is not None: if albums is not None:
return albums return albums
return [] return []
def get_albums_as_refs(self, artist_id=None): def get_albums_as_refs(self, artist_id=None):
albums = (self.get_raw_album_list('alphabeticalByName') if artist_id is None else self.get_raw_albums(artist_id)) albums = (
self.get_raw_album_list("alphabeticalByName")
if artist_id is None
else self.get_raw_albums(artist_id)
)
return [self.raw_album_to_ref(album) for album in albums] return [self.raw_album_to_ref(album) for album in albums]
def get_albums_as_albums(self, artist_id): def get_albums_as_albums(self, artist_id):
return [self.raw_album_to_album(album) for album in self.get_raw_albums(artist_id)] return [
self.raw_album_to_album(album)
for album in self.get_raw_albums(artist_id)
]
def get_songs_as_refs(self, album_id): def get_songs_as_refs(self, album_id):
return [self.raw_song_to_ref(song) for song in self.get_raw_songs(album_id)] return [
self.raw_song_to_ref(song) for song in self.get_raw_songs(album_id)
]
def get_songs_as_tracks(self, album_id): def get_songs_as_tracks(self, album_id):
return [self.raw_song_to_track(song) for song in self.get_raw_songs(album_id)] return [
self.raw_song_to_track(song)
for song in self.get_raw_songs(album_id)
]
def get_artists_as_refs(self): def get_artists_as_refs(self):
return [self.raw_artist_to_ref(artist) for artist in self.get_raw_artists()] return [
self.raw_artist_to_ref(artist) for artist in self.get_raw_artists()
]
def get_rootdirs_as_refs(self): def get_rootdirs_as_refs(self):
return [self.raw_directory_to_ref(rootdir) for rootdir in self.get_raw_rootdirs()] return [
self.raw_directory_to_ref(rootdir)
for rootdir in self.get_raw_rootdirs()
]
def get_diritems_as_refs(self, directory_id): def get_diritems_as_refs(self, directory_id):
return [(self.raw_directory_to_ref(diritem) if diritem.get('isDir') else self.raw_song_to_ref(diritem)) for diritem in self.get_raw_dir(directory_id)] return [
(
self.raw_directory_to_ref(diritem)
if diritem.get("isDir")
else self.raw_song_to_ref(diritem)
)
for diritem in self.get_raw_dir(directory_id)
]
def get_artists_as_artists(self): def get_artists_as_artists(self):
return [self.raw_artist_to_artist(artist) for artist in self.get_raw_artists()] return [
self.raw_artist_to_artist(artist)
for artist in self.get_raw_artists()
]
def get_playlists_as_refs(self): def get_playlists_as_refs(self):
return [self.raw_playlist_to_ref(playlist) for playlist in self.get_raw_playlists()] return [
self.raw_playlist_to_ref(playlist)
for playlist in self.get_raw_playlists()
]
def get_playlists_as_playlists(self): def get_playlists_as_playlists(self):
return [self.raw_playlist_to_playlist(playlist) for playlist in self.get_raw_playlists()] return [
self.raw_playlist_to_playlist(playlist)
for playlist in self.get_raw_playlists()
]
def get_playlist_as_playlist(self, playlist_id): def get_playlist_as_playlist(self, playlist_id):
return self.raw_playlist_to_playlist(self.get_raw_playlist(playlist_id)) return self.raw_playlist_to_playlist(self.get_raw_playlist(playlist_id))
@ -322,14 +498,14 @@ class SubsonicApi():
playlist = self.get_raw_playlist(playlist_id) playlist = self.get_raw_playlist(playlist_id)
if playlist is None: if playlist is None:
return None return None
return [self.raw_song_to_ref(song) for song in playlist.get('entry')] return [self.raw_song_to_ref(song) for song in playlist.get("entry")]
def get_artist_as_songs_as_tracks_iter(self, artist_id): def get_artist_as_songs_as_tracks_iter(self, artist_id):
albums = self.get_raw_albums(artist_id) albums = self.get_raw_albums(artist_id)
if albums is None: if albums is None:
return return
for album in albums: for album in albums:
for song in self.get_raw_songs(album.get('id')): for song in self.get_raw_songs(album.get("id")):
yield self.raw_song_to_track(song) yield self.raw_song_to_track(song)
def get_recursive_dir_as_songs_as_tracks_iter(self, directory_id): def get_recursive_dir_as_songs_as_tracks_iter(self, directory_id):
@ -337,8 +513,10 @@ class SubsonicApi():
if diritems is None: if diritems is None:
return return
for item in diritems: for item in diritems:
if item.get('isDir'): if item.get("isDir"):
yield from self.get_recursive_dir_as_songs_as_tracks_iter(item.get('id')) yield from self.get_recursive_dir_as_songs_as_tracks_iter(
item.get("id")
)
else: else:
yield self.raw_song_to_track(item) yield self.raw_song_to_track(item)
@ -346,80 +524,104 @@ class SubsonicApi():
if song is None: if song is None:
return None return None
return Ref.track( return Ref.track(
name=song.get('title') or UNKNOWN_SONG, name=song.get("title") or UNKNOWN_SONG,
uri=uri.get_song_uri(song.get('id'))) uri=uri.get_song_uri(song.get("id")),
)
def raw_song_to_track(self, song): def raw_song_to_track(self, song):
if song is None: if song is None:
return None return None
return Track( return Track(
name=song.get('title') or UNKNOWN_SONG, name=song.get("title") or UNKNOWN_SONG,
uri=uri.get_song_uri(song.get('id')), uri=uri.get_song_uri(song.get("id")),
bitrate=song.get('bitRate'), bitrate=song.get("bitRate"),
track_no=int(song.get('track')) if song.get('track') else None, track_no=int(song.get("track")) if song.get("track") else None,
date=str(song.get('year')) or 'none', date=str(song.get("year")) or "none",
genre=song.get('genre'), genre=song.get("genre"),
length=int(song.get('duration')) * 1000 if song.get('duration') else None, length=int(song.get("duration")) * 1000
disc_no=int(song.get('discNumber')) if song.get('discNumber') else None, if song.get("duration")
artists=[Artist( else None,
name=song.get('artist'), disc_no=int(song.get("discNumber"))
uri=uri.get_artist_uri(song.get('artistId')))], if song.get("discNumber")
else None,
artists=[
Artist(
name=song.get("artist"),
uri=uri.get_artist_uri(song.get("artistId")),
)
],
album=Album( album=Album(
name=song.get('album'), name=song.get("album"),
uri=uri.get_album_uri(song.get('albumId')))) uri=uri.get_album_uri(song.get("albumId")),
),
)
def raw_album_to_ref(self, album): def raw_album_to_ref(self, album):
if album is None: if album is None:
return None return None
return Ref.album( return Ref.album(
name=album.get('title') or album.get('name') or UNKNOWN_ALBUM, name=album.get("title") or album.get("name") or UNKNOWN_ALBUM,
uri=uri.get_album_uri(album.get('id'))) uri=uri.get_album_uri(album.get("id")),
)
def raw_album_to_album(self, album): def raw_album_to_album(self, album):
if album is None: if album is None:
return None return None
return Album( return Album(
name=album.get('title') or album.get('name') or UNKNOWN_ALBUM, name=album.get("title") or album.get("name") or UNKNOWN_ALBUM,
num_tracks=album.get('songCount'), num_tracks=album.get("songCount"),
uri=uri.get_album_uri(album.get('id')), uri=uri.get_album_uri(album.get("id")),
artists=[Artist( artists=[
name=album.get('artist'), Artist(
uri=uri.get_artist_uri(album.get('artistId')))]) name=album.get("artist"),
uri=uri.get_artist_uri(album.get("artistId")),
)
],
)
def raw_directory_to_ref(self, directory): def raw_directory_to_ref(self, directory):
if directory is None: if directory is None:
return None return None
return Ref.directory( return Ref.directory(
name=directory.get('title') or directory.get('name'), name=directory.get("title") or directory.get("name"),
uri=uri.get_directory_uri(directory.get('id'))) uri=uri.get_directory_uri(directory.get("id")),
)
def raw_artist_to_ref(self, artist): def raw_artist_to_ref(self, artist):
if artist is None: if artist is None:
return None return None
return Ref.artist( return Ref.artist(
name=artist.get('name') or UNKNOWN_ARTIST, name=artist.get("name") or UNKNOWN_ARTIST,
uri=uri.get_artist_uri(artist.get('id'))) uri=uri.get_artist_uri(artist.get("id")),
)
def raw_artist_to_artist(self, artist): def raw_artist_to_artist(self, artist):
if artist is None: if artist is None:
return None return None
return Artist( return Artist(
name=artist.get('name') or UNKNOWN_ARTIST, name=artist.get("name") or UNKNOWN_ARTIST,
uri=uri.get_artist_uri(artist.get('id'))) uri=uri.get_artist_uri(artist.get("id")),
)
def raw_playlist_to_playlist(self, playlist): def raw_playlist_to_playlist(self, playlist):
if playlist is None: if playlist is None:
return None return None
entries = playlist.get('entry') entries = playlist.get("entry")
tracks = [self.raw_song_to_track(song) for song in entries] if entries is not None else None tracks = (
[self.raw_song_to_track(song) for song in entries]
if entries is not None
else None
)
return Playlist( return Playlist(
uri=uri.get_playlist_uri(playlist.get('id')), uri=uri.get_playlist_uri(playlist.get("id")),
name=playlist.get('name'), name=playlist.get("name"),
tracks=tracks) tracks=tracks,
)
def raw_playlist_to_ref(self, playlist): def raw_playlist_to_ref(self, playlist):
if playlist is None: if playlist is None:
return None return None
return Ref.playlist( return Ref.playlist(
uri=uri.get_playlist_uri(playlist.get('id')), uri=uri.get_playlist_uri(playlist.get("id")),
name=playlist.get('name')) name=playlist.get("name"),
)

View file

@ -1,87 +1,109 @@
import re import re
SONG = 'song' SONG = "song"
ARTIST = 'artist' ARTIST = "artist"
PLAYLIST = 'playlist' PLAYLIST = "playlist"
ALBUM = 'album' ALBUM = "album"
DIRECTORY = 'directory' DIRECTORY = "directory"
VDIR = 'vdir' VDIR = "vdir"
PREFIX = 'subidy' PREFIX = "subidy"
SEARCH = 'search' SEARCH = "search"
regex = re.compile(r"(\w+?):(\w+?)(?::|$)(.+?)?$")
regex = re.compile(r'(\w+?):(\w+?)(?::|$)(.+?)?$')
def is_type_result_valid(result): def is_type_result_valid(result):
return result is not None and result.group(1) == PREFIX return result is not None and result.group(1) == PREFIX
def is_id_result_valid(result, type): def is_id_result_valid(result, type):
return is_type_result_valid(result) and result.group(1) == PREFIX and result.group(2) == type return (
is_type_result_valid(result)
and result.group(1) == PREFIX
and result.group(2) == type
)
def is_uri(uri): def is_uri(uri):
return regex.match(uri) is not None return regex.match(uri) is not None
def get_song_id(uri): def get_song_id(uri):
result = regex.match(uri) result = regex.match(uri)
if not is_id_result_valid(result, SONG): if not is_id_result_valid(result, SONG):
return None return None
return result.group(3) return result.group(3)
def get_artist_id(uri): def get_artist_id(uri):
result = regex.match(uri) result = regex.match(uri)
if not is_id_result_valid(result, ARTIST): if not is_id_result_valid(result, ARTIST):
return None return None
return result.group(3) return result.group(3)
def get_playlist_id(uri): def get_playlist_id(uri):
result = regex.match(uri) result = regex.match(uri)
if not is_id_result_valid(result, PLAYLIST): if not is_id_result_valid(result, PLAYLIST):
return None return None
return result.group(3) return result.group(3)
def get_album_id(uri): def get_album_id(uri):
result = regex.match(uri) result = regex.match(uri)
if not is_id_result_valid(result, ALBUM): if not is_id_result_valid(result, ALBUM):
return None return None
return result.group(3) return result.group(3)
def get_directory_id(uri): def get_directory_id(uri):
result = regex.match(uri) result = regex.match(uri)
if not is_id_result_valid(result, DIRECTORY): if not is_id_result_valid(result, DIRECTORY):
return None return None
return result.group(3) return result.group(3)
def get_vdir_id(uri): def get_vdir_id(uri):
result = regex.match(uri) result = regex.match(uri)
if not is_id_result_valid(result, VDIR): if not is_id_result_valid(result, VDIR):
return None return None
return result.group(3) return result.group(3)
def get_type(uri): def get_type(uri):
result = regex.match(uri) result = regex.match(uri)
if not is_type_result_valid(result): if not is_type_result_valid(result):
return None return None
return result.group(2) return result.group(2)
def get_type_uri(type, id): def get_type_uri(type, id):
return f'{PREFIX}:{type}:{id}' return f"{PREFIX}:{type}:{id}"
def get_artist_uri(id): def get_artist_uri(id):
return get_type_uri(ARTIST, id) return get_type_uri(ARTIST, id)
def get_album_uri(id): def get_album_uri(id):
return get_type_uri(ALBUM, id) return get_type_uri(ALBUM, id)
def get_song_uri(id): def get_song_uri(id):
return get_type_uri(SONG, id) return get_type_uri(SONG, id)
def get_directory_uri(id): def get_directory_uri(id):
return get_type_uri(DIRECTORY, id) return get_type_uri(DIRECTORY, id)
def get_vdir_uri(id): def get_vdir_uri(id):
return get_type_uri(VDIR, id) return get_type_uri(VDIR, id)
def get_playlist_uri(id): def get_playlist_uri(id):
return get_type_uri(PLAYLIST, id) return get_type_uri(PLAYLIST, id)
def get_search_uri(query): def get_search_uri(query):
return get_type_uri(SEARCH, query) return get_type_uri(SEARCH, query)