# -*- coding: utf-8 -*- """Last.fm similarity plugin for Quod Libet.""" import json from urllib.parse import quote from urllib.request import urlopen from urllib.error import URLError import random from gi.repository import GLib, Gtk from quodlibet import _, app, config from quodlibet.plugins.events import EventPlugin from quodlibet.qltk import Icons from quodlibet.query import Query from quodlibet.util.dprint import print_d class LastFMSimilarity(EventPlugin): """Last.fm similarity event plugin.""" PLUGIN_ID = "Last.fm Similarity" PLUGIN_NAME = _("Last.fm Similarity") PLUGIN_DESC = _("Finds a similar song using Last.fm's track similarity API" " and adds it to the queue.") PLUGIN_ICON = Icons.NETWORK_WORKGROUP LAST_FM_API_URI = "http://ws.audioscrobbler.com/2.0/" API_KEY = "e94b09aa2c04ab264deb7a7ae02ecd05" LAST_FM_API_METHODS = { "similar_artists": "artist.getSimilar", "similar_tracks": "track.getSimilar", } def __init__(self): """Initialize the plugin.""" self._blacklist_track_count = config.getint( "plugins", "lastfm_similarity_blacklist_tracks", 10) self._blacklist_artist_count = config.getint( "plugins", "lastfm_similarity_blacklist_artists", 10) self._similarity_strictness = config.getfloat( "plugins", "lastfm_similarity_strictness", 0.5) self._last_tracks = [] self._last_artists = [] def PluginPreferences(self, parent): """Plugin Preferences.""" def blacklist_track_changed(entry): self._blacklist_track_count = int(entry.get_value()) config.set("plugins", "lastfm_similarity_blacklist_tracks", self._blacklist_track_count) def blacklist_artist_changed(entry): self._blacklist_artist_count = int(entry.get_value()) config.set("plugins", "lastfm_similarity_blacklist_artist", self._blacklist_artist_count) def strictness_changed(entry): self._similarity_strictness = entry.get_value() / 10 config.set("plugins", "lastfm_similarity_strictness", self._similarity_strictness) table = Gtk.Table(rows=3, columns=2) table.set_row_spacings(6) table.set_col_spacings(6) table.attach( Gtk.Label( label=_("Number of recently played tracks to blacklist:")), 0, 1, 0, 1) track_entry = Gtk.SpinButton(adjustment=Gtk.Adjustment.new( self._blacklist_track_count, 0, 1000, 1, 10, 0)) track_entry.connect("value-changed", blacklist_track_changed) table.attach(track_entry, 1, 2, 0, 1) table.attach( Gtk.Label( label=_("Number of recently played artists to blacklist:")), 0, 1, 1, 2) artist_entry = Gtk.SpinButton(adjustment=Gtk.Adjustment.new( self._blacklist_artist_count, 0, 1000, 1, 10, 0)) artist_entry.connect("value-changed", blacklist_artist_changed) table.attach(artist_entry, 1, 2, 1, 2) strictness_label = Gtk.Label(label=_("Similarity strictness:")) strictness_label.set_alignment(1.0, 0.0) table.attach(strictness_label, 0, 1, 2, 3, xoptions=Gtk.AttachOptions.FILL) strictness_adj = Gtk.Adjustment(lower=0.00, upper=10.00, step_increment=0.1) strictness_hscale = Gtk.HScale(adjustment=strictness_adj) strictness_hscale.set_value(self._similarity_strictness * 10) strictness_hscale.set_draw_value(False) strictness_hscale.set_show_fill_level(False) strictness_hscale.connect("value_changed", strictness_changed) table.attach(strictness_hscale, 1, 2, 2, 3) return table def _check_artist_played(self, artist): for played_artist in self._last_artists: if artist.upper() == played_artist.upper(): return True return False def _check_track_played(self, track): if track in self._last_tracks: return True else: return False def _add_played_artists(self, artists): for artist in artists: if artist not in self._last_artists: self._last_artists.append(artist) def _build_uri(self, request): return "".join((self.LAST_FM_API_URI, request, "&api_key=", self.API_KEY, "&format=json")) def _find_similar_tracks(self, trackname, artistname, mbid=None, limit=50): request = "".join(("?method=", self.LAST_FM_API_METHODS["similar_tracks"])) if mbid: print_d("Trying with mbid {}".format(mbid)) request = "".join((request, "&mbid=", mbid)) else: print_d("Trying with {} - {}".format(artistname.splitlines()[0], trackname)) request = "".join((request, "&track=", quote(trackname), "&artist=", quote(artistname.splitlines()[0]))) request = "".join((request, "&limit={}".format(limit))) uri = self._build_uri(request) stream = None try: stream = urlopen(uri) except URLError: return [] if stream.getcode() == 200: similar_tracks = [] track_weights = [] try: response = json.loads(str(stream.read(), "utf-8")) for track in response["similartracks"]["track"]: similarity_score = float(track["match"]) if similarity_score >= self._similarity_strictness: similar_tracks.append( (track["artist"]["name"], track["name"])) track_weights.append(similarity_score) return similar_tracks, track_weights except KeyError: if mbid: return self._find_similar_tracks(trackname, artistname) return [] except (ValueError, OverflowError): return [] else: return [] def _find_similar_artists(self, artistname, mbid=None, limit=40): request = "".join(("?method=", self.LAST_FM_API_METHODS["similar_artists"])) if mbid: print_d("Trying with artist mbid {}".format(mbid)) request = "".join((request, "&mbid=", mbid)) else: print_d("Trying with {}".format(artistname.splitlines()[0])) request = "".join((request, "&artist=", quote(artistname.splitlines()[0]))) request = "".join((request, "&limit={}".format(limit))) uri = self._build_uri(request) stream = None try: stream = urlopen(uri) except URLError: return [] if stream.getcode() == 200: similar_artists = [] artist_weights = [] try: response = json.loads(str(stream.read(), "utf-8")) for artist in response["similarartists"]["artist"]: similarity_score = float(artist["match"]) if similarity_score >= self._similarity_strictness: similar_artists.append(artist["name"]) artist_weights.append(similarity_score) return similar_artists, artist_weights except KeyError: if mbid: return self._find_similar_artists(artistname) return [] except (ValueError, OverflowError): return [] else: return [] def on_change(self, song): """Find similar track on song change.""" artist = song.get("artist").splitlines()[0] track = song.get("title") candidates = [] weights = [] try: mbid = song.get("musicbrainz_releasetrackid") candidates, weights = self._find_similar_tracks( track, artist, mbid) except KeyError: candidates, weights = self._find_similar_tracks(track, artist) found_tracks = [] found_track_weights = [] if candidates: for idx, candidate in enumerate(candidates): if not self._check_artist_played(candidate[0]): query = Query.StrictQueryMatcher( "&(artist = \"%s\", title = \"%s\")" % (candidate[0], candidate[1])) try: results = list(filter(query.search, app.library)) if results: song = results[0] if self._check_track_played(song.get("~filename")): continue print_d("[similarity] found track match: %s - %s" % (candidate[0], candidate[1])) found_tracks.append(song) found_track_weights.append(weights[idx]) except AttributeError: pass if found_tracks: selected_track = random.choices( found_tracks, found_track_weights, k=1)[0] app.window.playlist.enqueue([selected_track]) return artist_candidates, artist_weights = self._find_similar_artists(artist) found_artists = [] found_artist_weights = [] for idx, artist in enumerate(artist_candidates): if not self._check_artist_played(artist): print_d("[similarity] found artist match: %s" % artist) found_artists.append(artist) found_artist_weights.append(artist_weights[idx]) while True: if found_artists: artist = random.choices( found_artists, found_artist_weights, k=1)[0] for idx, a in enumerate(found_artists): if a == artist: found_artist_weights.pop(idx) found_artists.remove(artist) query = Query.StrictQueryMatcher( "&(artist = \"%s\", title != \"[silence]\")" % artist) try: results = list(filter(query.search, app.library)) candidate_song_length = len(results) for dummy in range(candidate_song_length): idx = random.randint(0, (candidate_song_length - 1)) song = results[idx] if not self._check_track_played(song.get("~filename")): app.window.playlist.enqueue([song]) return except AttributeError: pass else: return def plugin_on_song_started(self, song): """Append current track to last played tracks and artists.""" self._last_tracks.append(song.get("~filename")) self._add_played_artists(song.get("artist").splitlines()) GLib.idle_add(self.on_change, song) def plugin_on_song_ended(self, song, stopped): """Append current track to last played tracks and artists.""" track_count = len(self._last_tracks) artist_count = len(self._last_artists) if track_count > self._blacklist_track_count: self._last_tracks = self._last_tracks[ (track_count - self._blacklist_track_count):track_count] if artist_count > self._blacklist_artist_count: self._last_artists = self._last_artists[ (artist_count - self._blacklist_artist_count):artist_count]