diff options
Diffstat (limited to 'lastfm_similarity.py')
-rw-r--r-- | lastfm_similarity.py | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/lastfm_similarity.py b/lastfm_similarity.py new file mode 100644 index 0000000..4e73d9d --- /dev/null +++ b/lastfm_similarity.py @@ -0,0 +1,222 @@ +# -*- coding: utf-8 -*- +import json +import urllib2 + +from gi.repository import GLib + +from quodlibet import _ +from quodlibet import app +from quodlibet.plugins.events import EventPlugin +from quodlibet.plugins import PluginConfig +from quodlibet.qltk import Icons +from quodlibet.query import Query +from quodlibet.util.dprint import print_d + + +pconfig = PluginConfig("notify") +pconfig.defaults.set("blacklist_track_count", 10) +pconfig.defaults.set("blacklist_artist_count", 10) + + +class LastFMSimilarity(EventPlugin): + PLUGIN_ID = "Last.fm Similarity" + PLUGIN_NAME = _("Last.fm Similarity") + PLUGIN_DESC = _("Finds a similar song using Last.fm's track similarity API" + " and adds it to the queue.") + PLUGIN_ICON = Icons.NETWORK_WORKGROUP + + LAST_FM_API_URI = "http://ws.audioscrobbler.com/2.0/" + API_KEY = "e94b09aa2c04ab264deb7a7ae02ecd05" + + LAST_FM_API_METHODS = { + "similar_artists": "artist.getSimilar", + "similar_tracks": "track.getSimilar", + } + + def __init__(self): + self._last_tracks = [] + self._last_artists = [] + + def _check_artist_played(self, artist): + if artist in self._last_artists: + return True + else: + return False + + def _check_track_played(self, track): + if track in self._last_tracks: + return True + else: + return False + + def _build_uri(self, request): + return "".join((self.LAST_FM_API_URI, request, "&api_key=", + self.API_KEY, "&format=json")) + + def _find_similar_tracks(self, trackname, artistname, mbid=None, limit=20): + request = "".join(("?method=", + self.LAST_FM_API_METHODS["similar_tracks"])) + + if mbid: + request = "".join((request, "&mbid=", mbid)) + else: + request = "".join((request, "&track=", trackname, "&artist=", + artistname)) + + request = "".join((request, "&limit={}".format(limit))) + + uri = self._build_uri(request) + + stream = None + + try: + stream = urllib2.urlopen(uri) + except urllib2.URLError: + return [] + + if stream.getcode() == 200: + similar_tracks = [] + + try: + response = json.load(stream) + + for track in response["similartracks"]["track"]: + similar_tracks.append( + (track["artist"]["name"], track["name"])) + + return similar_tracks + + except KeyError: + return [] + + else: + return [] + + def _find_similar_artists(self, artistname, mbid=None, limit=20): + request = "".join(("?method=", + self.LAST_FM_API_METHODS["similar_artists"])) + + if mbid: + request = "".join((request, "&mbid=", mbid)) + else: + request = "".join((request, "&artist=", artistname)) + + request = "".join((request, "&limit={}".format(limit))) + + uri = self._build_uri(request) + + stream = None + + try: + stream = urllib2.urlopen(uri) + except urllib2.URLError: + return [] + + if stream.getcode() == 200: + similar_artists = [] + + try: + response = json.load(stream) + + for artist in response["similarartists"]["artist"]: + similar_artists.append(artist["name"]) + + return similar_artists + + except KeyError: + return [] + + else: + return [] + + def on_change(self, song): + artist = song.get("artist") + track = song.get("title") + + candidates = [] + + try: + mbid = song.get("musicbrainz_releasetrackid") + + candidates = self._find_similar_tracks(track, artist, mbid) + except KeyError: + candidates = self._find_similar_tracks(track, artist) + + if candidates: + for candidate in candidates: + if not self._check_artist_played(candidate[0]): + + print_d("[similarity] found track match: %s - %s" + % (candidate[0], candidate[1])) + + if (len(self._last_tracks) + == pconfig.getint("blacklist_track_count")): + del self._last_tracks[0] + + query = Query.StrictQueryMatcher( + "&(artist = \"%s\", title = \"%s\")" + % (candidate[0], candidate[1])) + try: + results = filter(query.search, app.library) + + if results: + song = results[0] + + if self._check_track_played(song.get("~filename")): + continue + + self._last_tracks.append(song.get("~filename")) + + if (len(self._last_artists) + == pconfig.getint( + "blacklist_artist_count")): + del self._last_artists[0] + + self._last_artists.append(song.get("artist")) + + app.window.playlist.enqueue([song]) + + return + except AttributeError: + pass + + artist_candidates = self._find_similar_artists(artist) + + for artist in artist_candidates: + if not self._check_artist_played(artist): + print_d("[similarity] found artist match: %s" % artist) + + query = Query.StrictQueryMatcher( + "&(artist = \"%s\", title != \"[silence]\")" % artist) + try: + results = filter(query.search, app.library) + + for song in results: + if self._check_track_played(song.get("~filename")): + continue + + if (len(self._last_artists) + == pconfig.getint("blacklist_artist_count")): + del self._last_artists[0] + + self._last_artists.append(song.get("artist")) + self._last_tracks.append(song.get("~filename")) + + app.window.playlist.enqueue([song]) + return + except AttributeError: + pass + + def plugin_on_song_started(self, song): + self._last_tracks.append(song.get("~filename")) + self._last_artists.append(song.get("artist")) + + GLib.idle_add(self.on_change, song) + + def plugin_on_song_ended(self, song, stopped): + + if len(self._last_tracks) == pconfig.getint("blacklist_track_count"): + del self._last_tracks[0] + + if len(self._last_artists) == pconfig.getint("blacklist_artist_count"): + del self._last_artists[0] |