diff options
| -rw-r--r-- | Readme.md | 7 | ||||
| -rw-r--r-- | lastfm_similarity.py | 222 | 
2 files changed, 229 insertions, 0 deletions
diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..40402f6 --- /dev/null +++ b/Readme.md @@ -0,0 +1,7 @@ +A Quod Libet plugin to queue a similar song to the currently playing one. + +Installation +============ + +1. If it doesn't already exist, create the folder ~/.quodlibet/plugins/events +2. Copy lastfm_similarity.py to ~/.quodlibet/plugins/events diff --git a/lastfm_similarity.py b/lastfm_similarity.py new file mode 100644 index 0000000..4e73d9d --- /dev/null +++ b/lastfm_similarity.py @@ -0,0 +1,222 @@ +# -*- coding: utf-8 -*- +import json +import urllib2 + +from gi.repository import GLib + +from quodlibet import _ +from quodlibet import app +from quodlibet.plugins.events import EventPlugin +from quodlibet.plugins import PluginConfig +from quodlibet.qltk import Icons +from quodlibet.query import Query +from quodlibet.util.dprint import print_d + + +pconfig = PluginConfig("notify") +pconfig.defaults.set("blacklist_track_count", 10) +pconfig.defaults.set("blacklist_artist_count", 10) + + +class LastFMSimilarity(EventPlugin): +    PLUGIN_ID = "Last.fm Similarity" +    PLUGIN_NAME = _("Last.fm Similarity") +    PLUGIN_DESC = _("Finds a similar song using Last.fm's track similarity API" +                    " and adds it to the queue.") +    PLUGIN_ICON = Icons.NETWORK_WORKGROUP + +    LAST_FM_API_URI = "http://ws.audioscrobbler.com/2.0/" +    API_KEY = "e94b09aa2c04ab264deb7a7ae02ecd05" + +    LAST_FM_API_METHODS = { +        "similar_artists": "artist.getSimilar", +        "similar_tracks": "track.getSimilar", +    } + +    def __init__(self): +        self._last_tracks = [] +        self._last_artists = [] + +    def _check_artist_played(self, artist): +        if artist in self._last_artists: +            return True +        else: +            return False + +    def _check_track_played(self, track): +        if track in self._last_tracks: +            return True +        else: +            return False + +    def _build_uri(self, request): +        return "".join((self.LAST_FM_API_URI, request, "&api_key=", +                        self.API_KEY, "&format=json")) + +    def _find_similar_tracks(self, trackname, artistname, mbid=None, limit=20): +        request = "".join(("?method=", +                           self.LAST_FM_API_METHODS["similar_tracks"])) + +        if mbid: +            request = "".join((request, "&mbid=", mbid)) +        else: +            request = "".join((request, "&track=", trackname, "&artist=", +                               artistname)) + +        request = "".join((request, "&limit={}".format(limit))) + +        uri = self._build_uri(request) + +        stream = None + +        try: +            stream = urllib2.urlopen(uri) +        except urllib2.URLError: +            return [] + +        if stream.getcode() == 200: +            similar_tracks = [] + +            try: +                response = json.load(stream) + +                for track in response["similartracks"]["track"]: +                    similar_tracks.append( +                        (track["artist"]["name"], track["name"])) + +                return similar_tracks + +            except KeyError: +                return [] + +        else: +            return [] + +    def _find_similar_artists(self, artistname, mbid=None, limit=20): +        request = "".join(("?method=", +                           self.LAST_FM_API_METHODS["similar_artists"])) + +        if mbid: +            request = "".join((request, "&mbid=", mbid)) +        else: +            request = "".join((request, "&artist=", artistname)) + +        request = "".join((request, "&limit={}".format(limit))) + +        uri = self._build_uri(request) + +        stream = None + +        try: +            stream = urllib2.urlopen(uri) +        except urllib2.URLError: +            return [] + +        if stream.getcode() == 200: +            similar_artists = [] + +            try: +                response = json.load(stream) + +                for artist in response["similarartists"]["artist"]: +                    similar_artists.append(artist["name"]) + +                return similar_artists + +            except KeyError: +                return [] + +        else: +            return [] + +    def on_change(self, song): +        artist = song.get("artist") +        track = song.get("title") + +        candidates = [] + +        try: +            mbid = song.get("musicbrainz_releasetrackid") + +            candidates = self._find_similar_tracks(track, artist, mbid) +        except KeyError: +            candidates = self._find_similar_tracks(track, artist) + +        if candidates: +            for candidate in candidates: +                if not self._check_artist_played(candidate[0]): + +                    print_d("[similarity] found track match: %s - %s" +                            % (candidate[0], candidate[1])) + +                    if (len(self._last_tracks) +                            == pconfig.getint("blacklist_track_count")): +                        del self._last_tracks[0] + +                    query = Query.StrictQueryMatcher( +                        "&(artist = \"%s\", title = \"%s\")" +                        % (candidate[0], candidate[1])) +                    try: +                        results = filter(query.search, app.library) + +                        if results: +                            song = results[0] + +                            if self._check_track_played(song.get("~filename")): +                                continue + +                            self._last_tracks.append(song.get("~filename")) + +                            if (len(self._last_artists) +                                    == pconfig.getint( +                                        "blacklist_artist_count")): +                                del self._last_artists[0] + +                            self._last_artists.append(song.get("artist")) + +                            app.window.playlist.enqueue([song]) + +                            return +                    except AttributeError: +                        pass + +        artist_candidates = self._find_similar_artists(artist) + +        for artist in artist_candidates: +            if not self._check_artist_played(artist): +                print_d("[similarity] found artist match: %s" % artist) + +                query = Query.StrictQueryMatcher( +                    "&(artist = \"%s\", title != \"[silence]\")" % artist) +                try: +                    results = filter(query.search, app.library) + +                    for song in results: +                        if self._check_track_played(song.get("~filename")): +                            continue + +                        if (len(self._last_artists) +                                == pconfig.getint("blacklist_artist_count")): +                            del self._last_artists[0] + +                        self._last_artists.append(song.get("artist")) +                        self._last_tracks.append(song.get("~filename")) + +                        app.window.playlist.enqueue([song]) +                        return +                except AttributeError: +                    pass + +    def plugin_on_song_started(self, song): +        self._last_tracks.append(song.get("~filename")) +        self._last_artists.append(song.get("artist")) + +        GLib.idle_add(self.on_change, song) + +    def plugin_on_song_ended(self, song, stopped): + +        if len(self._last_tracks) == pconfig.getint("blacklist_track_count"): +            del self._last_tracks[0] + +        if len(self._last_artists) == pconfig.getint("blacklist_artist_count"): +            del self._last_artists[0]  | 
