# -*- coding: utf-8 -*- import json import urllib2 import random from gi.repository import GLib from quodlibet import _ from quodlibet import app from quodlibet.plugins.events import EventPlugin from quodlibet.plugins import PluginConfig from quodlibet.qltk import Icons from quodlibet.query import Query from quodlibet.util.dprint import print_d pconfig = PluginConfig("lastfm_similarity") pconfig.defaults.set("blacklist_track_count", 10) pconfig.defaults.set("blacklist_artist_count", 10) class LastFMSimilarity(EventPlugin): PLUGIN_ID = "Last.fm Similarity" PLUGIN_NAME = _("Last.fm Similarity") PLUGIN_DESC = _("Finds a similar song using Last.fm's track similarity API" " and adds it to the queue.") PLUGIN_ICON = Icons.NETWORK_WORKGROUP LAST_FM_API_URI = "http://ws.audioscrobbler.com/2.0/" API_KEY = "e94b09aa2c04ab264deb7a7ae02ecd05" LAST_FM_API_METHODS = { "similar_artists": "artist.getSimilar", "similar_tracks": "track.getSimilar", } def __init__(self): self._last_tracks = [] self._last_artists = [] def _check_artist_played(self, artist): for played_artist in self._last_artists: if unicode(artist).upper() == played_artist.upper(): return True return False def _check_track_played(self, track): if track in self._last_tracks: return True else: return False def _add_played_artists(self, artists): for artist in artists: if artist not in self._last_artists: self._last_artists.append(artist) def _build_uri(self, request): return "".join((self.LAST_FM_API_URI, request, "&api_key=", self.API_KEY, "&format=json")) def _find_similar_tracks(self, trackname, artistname, mbid=None, limit=20): request = "".join(("?method=", self.LAST_FM_API_METHODS["similar_tracks"])) if mbid: print_d("Trying with mbid {}".format(mbid)) request = "".join((request, "&mbid=", mbid)) else: print_d("Trying with {} - {}".format(artistname.splitlines()[0], trackname)) request = "".join((request, "&track=", trackname, "&artist=", artistname.splitlines()[0])) request = "".join((request, "&limit={}".format(limit))) uri = self._build_uri(request) stream = None try: stream = urllib2.urlopen(uri) except urllib2.URLError: return [] if stream.getcode() == 200: similar_tracks = [] try: response = json.load(stream) for track in response["similartracks"]["track"]: similar_tracks.append( (track["artist"]["name"], track["name"])) return similar_tracks except KeyError: if mbid: return self._find_similar_tracks(trackname, artistname) return [] else: return [] def _find_similar_artists(self, artistname, mbid=None, limit=20): request = "".join(("?method=", self.LAST_FM_API_METHODS["similar_artists"])) if mbid: print_d("Trying with artist mbid {}".format(mbid)) request = "".join((request, "&mbid=", mbid)) else: print_d("Trying with {}".format(artistname.splitlines()[0])) request = "".join((request, "&artist=", artistname.splitlines()[0])) request = "".join((request, "&limit={}".format(limit))) uri = self._build_uri(request) stream = None try: stream = urllib2.urlopen(uri) except urllib2.URLError: return [] if stream.getcode() == 200: similar_artists = [] try: response = json.load(stream) for artist in response["similarartists"]["artist"]: similar_artists.append(artist["name"]) return similar_artists except KeyError: if mbid: return self._find_similar_artists(artistname) return [] else: return [] def on_change(self, song): artist = song.get("artist").splitlines()[0] track = song.get("title") candidates = [] try: mbid = song.get("musicbrainz_releasetrackid") candidates = self._find_similar_tracks(track, artist, mbid) except KeyError: candidates = self._find_similar_tracks(track, artist) if candidates: for candidate in candidates: if not self._check_artist_played(candidate[0]): print_d("[similarity] found track match: %s - %s" % (candidate[0], candidate[1])) query = Query.StrictQueryMatcher( "&(artist = \"%s\", title = \"%s\")" % (candidate[0], candidate[1])) try: results = filter(query.search, app.library) if results: song = results[0] if self._check_track_played(song.get("~filename")): continue app.window.playlist.enqueue([song]) return except AttributeError: pass artist_candidates = self._find_similar_artists(artist) for artist in artist_candidates: if not self._check_artist_played(artist): print_d("[similarity] found artist match: %s" % artist) query = Query.StrictQueryMatcher( "&(artist = \"%s\", title != \"[silence]\")" % artist) try: results = filter(query.search, app.library) candidate_song_length = len(results) for dummy in xrange(candidate_song_length): idx = random.randint(0, (candidate_song_length - 1)) song = results[idx] if not self._check_track_played(song.get("~filename")): app.window.playlist.enqueue([song]) return except AttributeError: pass def plugin_on_song_started(self, song): self._last_tracks.append(song.get("~filename")) self._add_played_artists(song.get("artist").splitlines()) GLib.idle_add(self.on_change, song) def plugin_on_song_ended(self, song, stopped): track_count = len(self._last_tracks) artist_count = len(self._last_artists) max_track_count = pconfig.getint("blacklist_track_count") max_artist_count = pconfig.getint("blacklist_artist_count") if track_count > max_track_count: self._last_tracks = self._last_tracks[ (track_count - max_track_count):track_count] if artist_count > max_artist_count: self._last_artists = self._last_artists[ (artist_count - max_artist_count):artist_count]