summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--bower.json6
-rw-r--r--common/utils.py2
-rw-r--r--db/db_manager.py143
-rw-r--r--mach2.py160
-rw-r--r--models/album.py113
-rw-r--r--models/artist.py124
-rw-r--r--models/base.py10
-rw-r--r--models/track.py456
-rw-r--r--requirements.txt13
-rw-r--r--tests/common/utils_test.py14
-rw-r--r--tests/conftest.py44
-rw-r--r--tests/db/db_manager_test.py53
-rw-r--r--tests/mach2_test.py52
-rw-r--r--tests/models/album_test.py63
-rw-r--r--tests/models/artist_test.py67
-rw-r--r--tests/models/track_test.py106
-rw-r--r--tests/test.dbbin0 -> 19456 bytes
-rw-r--r--tests/test.oggbin0 -> 3929 bytes
-rw-r--r--tests/testapp.dbbin0 -> 4096 bytes
20 files changed, 985 insertions, 444 deletions
diff --git a/.gitignore b/.gitignore
index def8954..d14836d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,5 @@ node_modules
library.db
cscope.*
static/scripts/libs/
-.jshintrc \ No newline at end of file
+.jshintrc
+.cache
diff --git a/bower.json b/bower.json
index 024e6f3..17dbe11 100644
--- a/bower.json
+++ b/bower.json
@@ -1,6 +1,6 @@
{
"name": "mach2",
- "version": "0.0.1",
+ "version": "0.1.0",
"homepage": "https://github.com/michael-ball/mach2",
"authors": [
"Michaël Ball"
@@ -17,7 +17,7 @@
"tests"
],
"dependencies": {
- "angular": "1.4.8",
+ "angular": "~1.5.0",
"bootstrap": "3.3.5",
"moment": "~2.10.6",
"angular-moment": "~0.10.3",
@@ -27,7 +27,7 @@
"moment-timezone": "~0.4.1"
},
"resolutions": {
- "angular": "1.3.10"
+ "angular": "~1.5.0"
},
"exportsOverride": {
"bootstrap": {
diff --git a/common/utils.py b/common/utils.py
index a7489f9..efac527 100644
--- a/common/utils.py
+++ b/common/utils.py
@@ -53,7 +53,7 @@ def update_clause_from_dict(data):
try:
for key in data.keys():
- update_items.append("%s = :%s", (key, key))
+ update_items.append("%s = :%s" % (key, key))
if len(update_items) > 1:
update_clause = ", ".join(update_items)
diff --git a/db/db_manager.py b/db/db_manager.py
index f9b10c3..ad2fd14 100644
--- a/db/db_manager.py
+++ b/db/db_manager.py
@@ -4,6 +4,40 @@ import sqlite3
class DbManager:
+
+ create_album_table = "CREATE TABLE IF NOT EXISTS album (id INTEGER "\
+ "PRIMARY KEY, name TEXT, date TEXT, musicbrainz_albumid TEXT)"
+ create_album_artist_table = "CREATE TABLE IF NOT EXISTS album_artist ("\
+ "album_id INTEGER, artist_id INTEGER, CONSTRAINT ALBUM_ARTIST_PK "\
+ "PRIMARY KEY (album_id, artist_id), CONSTRAINT ALBUM_ARTIST_FK_ALBUM "\
+ "FOREIGN KEY (album_id) REFERENCES album(id), CONSTRAINT "\
+ "ALBUM_ARTIST_FK_ARTIST FOREIGN KEY(artist_id) REFERENCES artist(id))"
+ create_album_track_table = "CREATE TABLE IF NOT EXISTS album_track ("\
+ "album_id INTEGER, track_id INTEGER, CONSTRAINT ALBUM_TRACK_PK "\
+ "PRIMARY KEY (album_id, track_id), FOREIGN KEY(album_id) REFERENCES "\
+ "album(id), FOREIGN KEY(track_id) REFERENCES track(id))"
+ create_artist_table = "CREATE TABLE IF NOT EXISTS artist (id INTEGER "\
+ "PRIMARY KEY, name TEXT(2000000000), sortname TEXT(2000000000), "\
+ "musicbrainz_artistid TEXT(2000000000))"
+ create_artist_track_table = "CREATE TABLE IF NOT EXISTS artist_track ("\
+ "artist_id INTEGER, track_id INTEGER, CONSTRAINT ARTIST_TRACK_PK "\
+ "PRIMARY KEY (artist_id, track_id), FOREIGN KEY(artist_id) "\
+ "REFERENCES artist(id), FOREIGN KEY(track_id) REFERENCES track(id))"
+ create_track_table = "CREATE TABLE IF NOT EXISTS track (id INTEGER, "\
+ "tracknumber INTEGER, name TEXT(2000000000), grouping "\
+ "TEXT(2000000000), filename TEXT(2000000000), CONSTRAINT TRACK_PK "\
+ "PRIMARY KEY (id))"
+ create_musicbrainz_artist_index = "CREATE UNIQUE INDEX IF NOT EXISTS "\
+ "artist_musicbrainz_artistid ON artist(musicbrainz_artistid ASC)"
+ create_track_filename_index = "CREATE INDEX IF NOT EXISTS "\
+ "track_filename_IDX ON track(filename)"
+ create_track_grouping_index = "CREATE INDEX IF NOT EXISTS "\
+ "track_grouping_IDX ON track(grouping)"
+ create_track_name_index = "CREATE INDEX IF NOT EXISTS track_name_IDX ON "\
+ "track(name)"
+ create_track_number_index = "CREATE INDEX IF NOT EXISTS "\
+ "track_tracknumber_IDX ON track(tracknumber)"
+
class __DbManager:
config = configparser.ConfigParser()
config.read("mach2.ini")
@@ -57,25 +91,30 @@ class DbManager:
yield("COMMIT;")
- def __init__(self):
+ def __init__(self, db=None):
new_db = False
cache_size_kb = 9766
- if not os.path.isfile(self.config["DEFAULT"]["database"]):
- new_db = True
+ if db:
+ self.conn = sqlite3.connect(db)
- if new_db:
- self.conn = sqlite3.connect(":memory:")
- self.create_tables()
else:
- self.conn = sqlite3.connect(self.config["DEFAULT"]["database"])
- library_info = os.stat(self.config["DEFAULT"]["database"])
- cache_size_kb = round((library_info.st_size * 1.2) / 1024)
+ if not os.path.isfile(self.config["DEFAULT"]["database"]):
+ new_db = True
- cursor = self.conn.cursor()
- # Setting pragma with ? placeholder produces an error
- cursor.execute("pragma cache_size=-%s" % cache_size_kb)
- cursor.close()
+ if new_db:
+ self.conn = sqlite3.connect(":memory:")
+ self.create_tables()
+ else:
+ self.conn = sqlite3.connect(
+ self.config["DEFAULT"]["database"])
+ library_info = os.stat(self.config["DEFAULT"]["database"])
+ cache_size_kb = round((library_info.st_size * 1.2) / 1024)
+
+ cursor = self.conn.cursor()
+ # Setting pragma with ? placeholder produces an error
+ cursor.execute("pragma cache_size=-%s" % cache_size_kb)
+ cursor.close()
self.conn.row_factory = sqlite3.Row
@@ -96,6 +135,12 @@ class DbManager:
def __str__(self):
return repr(self)
+ def execute(self, script, parameters=None):
+ if parameters:
+ return self.conn.execute(script, parameters)
+
+ return self.conn.execute(script)
+
def commit(self):
return self.conn.commit()
@@ -109,65 +154,25 @@ class DbManager:
return self.conn.interrupt()
def create_tables(self):
- cursor = self.conn.cursor()
- cursor.execute("""CREATE TABLE IF NOT EXISTS album (id
- INTEGER PRIMARY KEY, name TEXT, date TEXT,
- musicbrainz_albumid TEXT)""")
- cursor.execute("""CREATE TABLE IF NOT EXISTS album_artist (
- album_id INTEGER,
- artist_id INTEGER,
- CONSTRAINT ALBUM_ARTIST_PK PRIMARY KEY (album_id,
- artist_id),
- CONSTRAINT ALBUM_ARTIST_FK_ALBUM FOREIGN KEY (album_id)
- REFERENCES album(id),
- CONSTRAINT ALBUM_ARTIST_FK_ARTIST FOREIGN KEY
- (artist_id) REFERENCES artist(id)
- )""")
- cursor.execute("""CREATE TABLE IF NOT EXISTS album_track (
- album_id INTEGER,
- track_id INTEGER,
- CONSTRAINT ALBUM_TRACK_PK PRIMARY KEY (album_id,
- track_id),
- FOREIGN KEY(album_id) REFERENCES album(id),
- FOREIGN KEY(track_id) REFERENCES track(id)
- )""")
- cursor.execute("""CREATE TABLE IF NOT EXISTS artist (id
- INTEGER PRIMARY KEY, name
- TEXT(2000000000), sortname TEXT(2000000000),
- musicbrainz_artistid TEXT(2000000000))""")
- cursor.execute("""CREATE TABLE IF NOT EXISTS artist_track (
- artist_id INTEGER,
- track_id INTEGER,
- CONSTRAINT ARTIST_TRACK_PK PRIMARY KEY (artist_id,
- track_id),
- FOREIGN KEY(artist_id) REFERENCES artist(id),
- FOREIGN KEY(track_id) REFERENCES track(id)
- )""")
- cursor.execute("""CREATE TABLE IF NOT EXISTS track (
- id INTEGER,
- tracknumber INTEGER,
- name TEXT(2000000000),
- grouping TEXT(2000000000),
- filename TEXT(2000000000),
- CONSTRAINT TRACK_PK PRIMARY KEY (id)
- )""")
- cursor.execute("""CREATE UNIQUE INDEX IF NOT EXISTS
- artist_musicbrainz_artistid ON
- artist(musicbrainz_artistid ASC)""")
- cursor.execute("""CREATE INDEX IF NOT EXISTS track_filename_IDX
- ON track(filename)""")
- cursor.execute("""CREATE INDEX IF NOT EXISTS track_grouping_IDX
- ON track(grouping)""")
- cursor.execute("""CREATE INDEX IF NOT EXISTS track_name_IDX ON
- track(name)""")
- cursor.execute("""CREATE INDEX IF NOT EXISTS
- track_tracknumber_IDX ON track(tracknumber)""")
- cursor.close()
+ with self.conn:
+ self.conn.execute(DbManager.create_album_table)
+ self.conn.execute(DbManager.create_album_artist_table)
+ self.conn.execute(DbManager.create_album_track_table)
+ self.conn.execute(DbManager.create_artist_table)
+ self.conn.execute(DbManager.create_artist_track_table)
+ self.conn.execute(DbManager.create_track_table)
+ self.conn.execute(DbManager.create_musicbrainz_artist_index)
+ self.conn.execute(DbManager.create_track_filename_index)
+ self.conn.execute(DbManager.create_track_grouping_index)
+ self.conn.execute(DbManager.create_track_name_index)
+ self.conn.execute(DbManager.create_track_number_index)
instance = None
- def __new__(self):
- if not DbManager.instance:
+ def __new__(self, db=None):
+ if db:
+ return DbManager.__DbManager(db)
+ elif not DbManager.instance:
DbManager.instance = DbManager.__DbManager()
return DbManager.instance
diff --git a/mach2.py b/mach2.py
index a7654b1..412ac9d 100644
--- a/mach2.py
+++ b/mach2.py
@@ -6,8 +6,8 @@ import os
import sqlite3
import tempfile
-from flask import Flask, Response, g, redirect, render_template
-from flask import request, url_for
+from flask import Blueprint, Flask, Response, current_app, g, redirect, \
+ render_template, request, url_for
from flask.ext.compress import Compress
from flask.ext.login import LoginManager, current_user, login_required
from flask.ext.login import login_user, logout_user
@@ -18,37 +18,32 @@ from models.artist import Artist
from models.track import Track
from models.user import User
+import builtins
-DATABASE = "app.db"
-app = Flask(__name__)
-app.config.from_object(__name__)
+builtins.library_db = None
+
config = configparser.ConfigParser()
config.read("mach2.ini")
-app.config["DEBUG"] = config["DEFAULT"]["debug"]
-app.config["SECRET_KEY"] = config["DEFAULT"]["secret_key"]
+mach2 = Blueprint("mach2", __name__)
login_manager = LoginManager()
-login_manager.login_view = "login"
-login_manager.init_app(app)
-
-compress = Compress()
-compress.init_app(app)
+login_manager.login_view = "mach2.login"
def get_db():
db = getattr(g, "_database", None)
if db is None:
- db = sqlite3.connect(DATABASE)
+ db = sqlite3.connect(current_app.config["DATABASE"])
db.row_factory = sqlite3.Row
setattr(g, "_database", db)
return db
-@app.teardown_appcontext
+@mach2.teardown_app_request
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
@@ -98,13 +93,13 @@ def load_user_from_request(request):
return None
-@app.route("/")
+@mach2.route("/")
@login_required
def index():
return render_template("index.html", user=current_user)
-@app.route("/albums")
+@mach2.route("/albums")
@login_required
def albums():
returned_albums = []
@@ -143,60 +138,61 @@ def albums():
all_params.update(search_params)
if search_params:
- returned_albums = Album.search(**all_params)
+ returned_albums = Album.search(db=builtins.library_db, **all_params)
else:
- returned_albums = Album.all(**params)
+ returned_albums = Album.all(db=builtins.library_db, **params)
for album in returned_albums:
- albums.append(album.__dict__)
+ albums.append(album.as_dict())
return json.dumps(albums)
-@app.route("/albums/<int:album_id>/tracks")
+@mach2.route("/albums/<int:album_id>/tracks")
@login_required
def album_tracks(album_id):
tracks = []
- album = Album(id=album_id)
+ album = Album(db=builtins.library_db, id=album_id)
for track in album.tracks:
- tracks.append(track.__dict__)
+ tracks.append(track.as_dict())
return json.dumps(tracks)
-@app.route("/albums/<int:album_id>/artists")
+@mach2.route("/albums/<int:album_id>/artists")
@login_required
def album_artists(album_id):
artists = []
- album = Album(id=album_id)
+ album = Album(db=builtins.library_db, id=album_id)
for artist in album.artists:
- artists.append(artist.__dict__)
+ artists.append(artist.as_dict())
return json.dumps(artists)
-@app.route("/albums/<int:album_id>")
+@mach2.route("/albums/<int:album_id>")
@login_required
def album(album_id):
- album = Album(id=album_id)
+ album = Album(db=builtins.library_db, id=album_id)
- return json.dumps(album.__dict__)
+ return json.dumps(album.as_dict())
-@app.route("/albums/<album_name>")
+@mach2.route("/albums/<album_name>")
@login_required
def album_search(album_name):
albums = []
- for album in Album.search(name={"data": album_name, "operator": "LIKE"}):
- albums.append(album.__dict__)
+ for album in Album.search(db=builtins.library_db,
+ name={"data": album_name, "operator": "LIKE"}):
+ albums.append(album.as_dict())
return json.dumps(albums)
-@app.route("/artists")
+@mach2.route("/artists")
@login_required
def artists():
order_by = None
@@ -219,64 +215,66 @@ def artists():
off = request.args.get("offset")
if order_by:
- returned_artists = Artist.all(order=order_by,
+ returned_artists = Artist.all(db=builtins.library_db, order=order_by,
direction=order_direction,
limit=lim, offset=off)
else:
- returned_artists = Artist.all(limit=lim, offset=off)
+ returned_artists = Artist.all(db=builtins.library_db, limit=lim,
+ offset=off)
for artist in returned_artists:
- artists.append(artist.__dict__)
+ artists.append(artist.as_dict())
return json.dumps(artists)
-@app.route("/artists/<int:artist_id>/tracks")
+@mach2.route("/artists/<int:artist_id>/tracks")
@login_required
def artist_tracks(artist_id):
tracks = []
- artist = Artist(id=artist_id)
+ artist = Artist(db=builtins.library_db, id=artist_id)
for track in artist.tracks:
- tracks.append(track.__dict__)
+ tracks.append(track.as_dict())
return json.dumps(tracks)
-@app.route("/artists/<int:artist_id>/albums")
+@mach2.route("/artists/<int:artist_id>/albums")
@login_required
def artist_albums(artist_id):
albums = []
- artist = Artist(id=artist_id)
+ artist = Artist(db=builtins.library_db, id=artist_id)
for album in artist.albums:
- albums.append(album.__dict__)
+ albums.append(album.as_dict())
return json.dumps(albums)
-@app.route("/artists/<int:artist_id>")
+@mach2.route("/artists/<int:artist_id>")
@login_required
def artist_info(artist_id):
- artist = Artist(id=artist_id)
+ artist = Artist(id=artist_id, db=builtins.library_db)
- return json.dumps(artist.__dict__)
+ return json.dumps(artist.as_dict())
-@app.route("/artists/<artist_name>")
+@mach2.route("/artists/<artist_name>")
@login_required
def artist_search(artist_name):
artists = []
- for artist in Artist.search(name={
- "data": artist_name,
- "operator": "LIKE"
+ for artist in Artist.search(db=builtins.libary_db,
+ name={
+ "data": artist_name,
+ "operator": "LIKE"
}):
- artists.append(artist.__dict__)
+ artists.append(artist.as_dict())
return json.dumps(artists)
-@app.route("/tracks")
+@mach2.route("/tracks")
@login_required
def tracks():
order_by = None
@@ -299,30 +297,32 @@ def tracks():
off = request.args.get("offset")
if order_by:
- returned_tracks = Track.all(order=order_by, direction=order_direction,
- limit=lim, offset=off)
+ returned_tracks = Track.all(db=builtins.library_db, order=order_by,
+ direction=order_direction, limit=lim,
+ offset=off)
else:
- returned_tracks = Track.all(limit=lim, offset=off)
+ returned_tracks = Track.all(db=builtins.library_db,
+ limit=lim, offset=off)
for track in returned_tracks:
- tracks.append(track.__dict__)
+ tracks.append(track.as_dict())
return json.dumps(tracks)
-@app.route("/tracks/<int:track_id>/artists")
+@mach2.route("/tracks/<int:track_id>/artists")
@login_required
def track_artists(track_id):
artists = []
- track = Track(id=track_id)
+ track = Track(db=builtins.library_db, id=track_id)
for artist in track.artists:
- artists.append(artist.__dict__)
+ artists.append(artist.as_dict())
return json.dumps(artists)
-@app.route("/tracks/<int:track_id>")
+@mach2.route("/tracks/<int:track_id>")
@login_required
def track(track_id):
def stream_file(filename, chunksize=8192):
@@ -335,7 +335,7 @@ def track(track_id):
os.remove(filename)
break
- local_track = Track(track_id)
+ local_track = Track(db=builtins.library_db, id=track_id)
fd, temp_filename = tempfile.mkstemp()
@@ -356,12 +356,13 @@ def track(track_id):
return resp
-@app.route("/tracks/<track_name>")
+@mach2.route("/tracks/<track_name>")
@login_required
def track_search(track_name):
tracks = []
- for track in Track.search(name={"data": track_name, "operator": "LIKE"}):
- tracks.append(track.__dict__)
+ for track in Track.search(db=builtins.library_db,
+ name={"data": track_name, "operator": "LIKE"}):
+ tracks.append(track.as_dict())
return json.dumps(tracks)
@@ -378,7 +379,7 @@ def load_user(userid):
return user
-@app.route("/login", methods=["GET", "POST"])
+@mach2.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
user = None
@@ -396,19 +397,48 @@ def login():
if user and user.verify(password):
login_user(user)
- return redirect(request.args.get("next") or url_for("index"))
+ return redirect(request.args.get("next") or url_for("mach2.index"))
else:
user = None
return render_template("login.html")
-@app.route("/logout")
+@mach2.route("/logout")
@login_required
def logout():
logout_user()
return redirect("/")
+@mach2.before_app_first_request
+def setup_globals():
+ setattr(g, "_db_path", current_app.config["DATABASE"])
+
+
+def create_app(database=None, library=None):
+ app = Flask(__name__)
+ if database:
+ app.config["DATABASE"] = database
+ else:
+ app.config["DATABASE"] = config["DEFAULT"]["app_db"]
+
+ if library:
+ builtins.library_db = library
+
+ app.config["DEBUG"] = config["DEFAULT"]["debug"]
+ app.config["SECRET_KEY"] = config["DEFAULT"]["secret_key"]
+
+ app.register_blueprint(mach2)
+
+ login_manager.init_app(app)
+
+ compress = Compress()
+ compress.init_app(app)
+
+ return app
+
+
if __name__ == "__main__":
+ app = create_app()
app.run()
diff --git a/models/album.py b/models/album.py
index 0d7cd54..96bea81 100644
--- a/models/album.py
+++ b/models/album.py
@@ -1,15 +1,17 @@
from common import utils
from db.db_manager import DbManager
+from models.base import BaseModel
-class Album():
- def __init__(self, id=None, **kwargs):
- if id is not None:
- db = DbManager()
- cursor = db.cursor()
+class Album(BaseModel):
- for row in cursor.execute("SELECT * FROM album WHERE id = ?",
- (id,)):
+ def __init__(self, id=None, db=None, **kwargs):
+ if db:
+ self.db = db
+
+ if id is not None:
+ for row in self.db.execute("SELECT * FROM album WHERE id = ?",
+ (id,)):
setattr(self, "id", id)
setattr(self, "name", row[1])
setattr(self, "date", row[2])
@@ -18,26 +20,32 @@ class Album():
setattr(self, key, value)
def delete(self):
- db = DbManager()
- cursor = db.cursor()
-
for track in self.tracks:
track.delete()
- cursor.execute("BEGIN TRANSACTION")
+ with self.db.conn:
+ delete_album = "DELETE FROM album WHERE id = ?"
+ self.db.execute(delete_album, (self.id,))
- delete_sql = "DELETE FROM album WHERE id = ?"
- cursor.execute(delete_sql, (self.id,))
+ delete_track_rel = "DELETE FROM album_track WHERE album_id = ?"
+ self.db.execute(delete_track_rel, (self.id,))
- delete_track_rel_sql = "DELETE FROM album_track WHERE album_id = ?"
- cursor.execute(delete_track_rel_sql, (self.id,))
+ delete_artist_rel = "DELETE FROM album_artist WHERE album_id = ?"
+ self.db.execute(delete_artist_rel, (self.id,))
- delete_artist_rel_sql = "DELETE FROM album_artist WHERE album_id = ?"
- cursor.execute(delete_artist_rel_sql, (self.id,))
+ return True
- cursor.execute("COMMIT TRANSACTION")
+ @property
+ def db(self):
+ try:
+ return self._db
+ except AttributeError:
+ self._db = DbManager()
+ return self._db
- return True
+ @db.setter
+ def db(self, db):
+ self._db = db
@property
def artists(self):
@@ -46,15 +54,13 @@ class Album():
if not hasattr(self, "_artists"):
setattr(self, "_artists", [])
- db = DbManager()
- cursor = db.cursor()
-
- for row in cursor.execute("""SELECT artist.* FROM artist INNER JOIN
- album_artist ON artist.id =
- album_artist.artist_id WHERE album_id = ?
- ORDER BY name ASC""", (self.id,)):
- artist = Artist(id=row[0], name=row[1], sortname=row[2],
- musicbrainz_artistid=row[3])
+ for row in self.db.execute("SELECT artist.* FROM artist INNER "
+ "JOIN album_artist ON artist.id = "
+ "album_artist.artist_id WHERE "
+ "album_id = ? ORDER BY name ASC",
+ (self.id,)):
+ artist = Artist(id=row[0], db=self.db, name=row[1],
+ sortname=row[2], musicbrainz_artistid=row[3])
self._artists.append(artist)
return self._artists
@@ -66,16 +72,16 @@ class Album():
if not hasattr(self, "_tracks"):
setattr(self, "_tracks", [])
- db = DbManager()
- cursor = db.cursor()
-
- for row in cursor.execute("""SELECT track.* FROM track
- INNER JOIN album_track ON track.id =
- album_track.track_id WHERE album_id = ?
- ORDER BY tracknumber ASC""", (self.id,)):
+ for row in self.db.execute("SELECT track.* FROM track INNER "
+ "JOIN album_track ON track.id = "
+ "album_track.track_id WHERE "
+ "album_id = ? ORDER BY tracknumber "
+ "ASC", (self.id,)):
- track = Track(id=row[0], tracknumber=row[1], name=row[2],
- grouping=row[3], filename=row[4])
+ track = Track(id=row["id"], db=self.db,
+ tracknumber=row["tracknumber"],
+ name=row["name"], grouping=row["grouping"],
+ filename=row["filename"])
self._tracks.append(track)
return self._tracks
@@ -89,18 +95,16 @@ class Album():
dirty_attributes[attr] = value
if len(dirty_attributes) > 0:
- db = DbManager()
- cursor = db.cursor()
-
set_clause = utils.update_clause_from_dict(dirty_attributes)
dirty_attributes[id] = self.id
sql = " ".join(("UPDATE album"), set_clause, "WHERE id = :id")
- cursor.execute(sql, dirty_attributes)
- def search(order="album.id", direction="ASC", limit=None,
- offset=None, **search_params):
+ with self.db.conn:
+ self.db.execute(sql, dirty_attributes)
+
+ def search(db=None, **search_params):
"""Find an album with the given params
Args:
@@ -110,8 +114,8 @@ class Album():
"""
albums = []
- db = DbManager()
- cursor = db.cursor()
+ if not db:
+ db = DbManager()
# unpack search params
where_params = {}
@@ -132,20 +136,21 @@ class Album():
result = None
if where_clause:
statement = " ".join(("SELECT * FROM album", where_clause))
- result = cursor.execute(statement, value_params)
+ result = db.execute(statement, value_params)
else:
- result = cursor.execute("SELECT * FROM album")
+ result = db.execute("SELECT * FROM album")
for row in result:
albums.append(
- Album(id=row[0], name=row[1], date=row[2])
+ Album(id=row["id"], db=db, name=row["name"], date=row["date"])
)
return albums
- def all(order="album.id", direction="ASC", limit=None, offset=None):
- db = DbManager()
- cursor = db.cursor()
+ def all(db=None, order="album.id", direction="ASC", limit=None,
+ offset=None):
+ if not db:
+ db = DbManager()
albums = []
@@ -156,13 +161,13 @@ class Album():
if limit is not None and offset is not None:
select_string = " ".join((select_string,
- "LIMIT %s OFFSET %s" % (limit, offset)))
+ "LIMIT %s OFFSET %s" % (limit, offset)))
- result = cursor.execute(select_string)
+ result = db.execute(select_string)
for row in result:
albums.append(
- Album(id=row[0], name=row[1], date=row[2])
+ Album(id=row["id"], db=db, name=row["name"], date=row["date"])
)
return albums
diff --git a/models/artist.py b/models/artist.py
index fee081c..a76b2ee 100644
--- a/models/artist.py
+++ b/models/artist.py
@@ -1,44 +1,50 @@
from common import utils
from db.db_manager import DbManager
+from models.base import BaseModel
-class Artist:
- def __init__(self, id=None, **kwargs):
+class Artist(BaseModel):
+
+ def __init__(self, id=None, db=None, **kwargs):
+ if db:
+ self.db = db
+
if id is not None:
- db = DbManager()
- cursor = db.cursor()
-
- for row in cursor.execute("SELECT * FROM artist WHERE id = ?",
- (id,)):
- setattr(self, "id", id)
- setattr(self, "name", row[1])
- setattr(self, "sortname", row[2])
- setattr(self, "musicbrainz_artistid", row[3])
+ for row in self.db.execute("SELECT * FROM artist WHERE id = ?",
+ (id,)):
+ for key in ["id", "name", "sortname", "musicbrainz_artistid"]:
+ setattr(self, key, row[key])
else:
for (key, value) in kwargs.items():
setattr(self, key, value)
def delete(self):
- db = DbManager()
- cursor = db.cursor()
-
for album in self.albums:
album.delete()
- cursor.execute("BEGIN TRANSACTION")
+ with self.db.conn:
+ delete_artist = "DELETE FROM artist WHERE id = ?"
+ self.db.execute(delete_artist, (self.id,))
- delete_sql = "DELETE FROM artist WHERE id = ?"
- cursor.execute(delete_sql, (self.id,))
+ delete_track_rel = "DELETE FROM artist_track WHERE artist_id = ?"
+ self.db.execute(delete_track_rel, (self.id,))
- delete_track_rel_sql = "DELETE FROM artist_track WHERE artist_id = ?"
- cursor.execute(delete_track_rel_sql, (self.id,))
+ delete_album_rel = "DELETE FROM album_artist WHERE artist_id = ?"
+ self.db.execute(delete_album_rel, (self.id,))
- delete_album_rel_sql = "DELETE FROM album_artist WHERE artist_id = ?"
- cursor.execute(delete_album_rel_sql, (self.id,))
+ return True
- cursor.execute("COMMIT TRANSACTION")
+ @property
+ def db(self):
+ try:
+ return self._db
+ except AttributeError:
+ self._db = DbManager()
+ return self._db
- return True
+ @db.setter
+ def db(self, db):
+ self._db = db
@property
def tracks(self):
@@ -47,16 +53,16 @@ class Artist:
if not hasattr(self, "_tracks"):
setattr(self, "_tracks", [])
- db = DbManager()
- cursor = db.cursor()
-
- for row in cursor.execute("""SELECT track.* FROM track
- INNER JOIN artist_track ON track.id =
- artist_track.track_id WHERE artist_id = ?
- ORDER BY name ASC""", (self.id,)):
+ for row in self.db.execute("SELECT track.* FROM track INNER "
+ "JOIN artist_track ON track.id = "
+ "artist_track.track_id WHERE "
+ "artist_id = ? ORDER BY name ASC",
+ (self.id,)):
- track = Track(id=row[0], tracknumber=row[1], name=row[2],
- grouping=row[3], filename=row[4])
+ track = Track(id=row["id"], db=self.db,
+ tracknumber=row["tracknumber"], name=row["name"],
+ grouping=row["grouping"],
+ filename=row["filename"])
self._tracks.append(track)
return self._tracks
@@ -68,14 +74,13 @@ class Artist:
if not hasattr(self, "_albums"):
setattr(self, "_albums", [])
- db = DbManager()
- cursor = db.cursor()
-
- for row in cursor.execute("""SELECT album.* FROM album
- INNER JOIN album_artist ON album.id =
- album_artist.album_id WHERE artist_id = ?
- ORDER BY date ASC""", (self.id,)):
- album = Album(id=row[0], name=row[1], date=row[2])
+ for row in self.db.execute("SELECT album.* FROM album INNER "
+ "JOIN album_artist ON album.id = "
+ "album_artist.album_id WHERE "
+ "artist_id = ? ORDER BY date ASC",
+ (self.id,)):
+ album = Album(id=row["id"], db=self.db, name=row["name"],
+ date=row["date"])
self._albums.append(album)
return self._albums
@@ -89,17 +94,16 @@ class Artist:
dirty_attributes[attr] = value
if len(dirty_attributes) > 0:
- db = DbManager()
- cursor = db.cursor()
-
set_clause = utils.update_clause_from_dict(dirty_attributes)
dirty_attributes[id] = self.id
sql = " ".join(("UPDATE artist"), set_clause, "WHERE id = :id")
- cursor.execute(sql, dirty_attributes)
- def search(**search_params):
+ with self.db.conn:
+ self.db.execute(sql, dirty_attributes)
+
+ def search(db=None, **search_params):
"""Find an artist with the given params
Args:
@@ -109,8 +113,8 @@ class Artist:
"""
artists = []
- db = DbManager()
- cursor = db.cursor()
+ if not db:
+ db = DbManager()
# unpack search params
where_params = {}
@@ -124,21 +128,24 @@ class Artist:
result = []
if where_clause:
statement = " ".join(("SELECT * FROM artist", where_clause))
- result = cursor.execute(statement, value_params)
+ result = db.execute(statement, value_params)
else:
- result = cursor.execute("SELECT * FROM artist")
+ result = db.execute("SELECT * FROM artist")
for row in result:
artists.append(
- Artist(id=row[0], name=row[1], sortname=row[2],
- musicbrainz_artistid=row[3])
+ Artist(id=row["id"], db=db, name=row["name"],
+ sortname=row["sortname"],
+ musicbrainz_artistid=row["musicbrainz_artistid"])
)
return artists
- def all(order="sortname", direction="ASC", limit=None, offset=None):
- db = DbManager()
- cursor = db.cursor()
+ def all(db=None, order="sortname", direction="ASC", limit=None,
+ offset=None):
+ if not db:
+ db = DbManager()
+
artists = []
select_string = "SELECT * FROM artist ORDER BY %s %s" % (order,
@@ -146,14 +153,15 @@ class Artist:
if limit is not None and offset is not None:
select_string = " ".join((select_string,
- "LIMIT %s OFFSET %s" % (limit, offset)))
+ "LIMIT %s OFFSET %s" % (limit, offset)))
- result = cursor.execute(select_string)
+ result = db.execute(select_string)
for row in result:
artists.append(
- Artist(id=row[0], name=row[1], sortname=row[2],
- musicbrainz_artistid=row[3])
+ Artist(id=row["id"], db=db, name=row["name"],
+ sortname=row["sortname"],
+ musicbrainz_artistid=row["musicbrainz_artistid"])
)
return artists
diff --git a/models/base.py b/models/base.py
new file mode 100644
index 0000000..fd40001
--- /dev/null
+++ b/models/base.py
@@ -0,0 +1,10 @@
+class BaseModel():
+
+ def as_dict(self):
+ this_dict = {}
+
+ for k in self.__dict__.keys():
+ if k != "_db":
+ this_dict[k] = getattr(self, k)
+
+ return this_dict
diff --git a/models/track.py b/models/track.py
index 688f6ff..e0905e6 100644
--- a/models/track.py
+++ b/models/track.py
@@ -5,78 +5,120 @@ from common import utils
from db.db_manager import DbManager
from models.artist import Artist
from models.album import Album
+from models.base import BaseModel
logging.basicConfig(format="%(asctime)s %(message)s", level=logging.DEBUG)
-class Track:
+class Track(BaseModel):
- def __init__(self, id=None, **kwargs):
+ def __init__(self, id=None, db=None, **kwargs):
+ if db:
+ self.db = db
- setattr(self, "__data", {})
+ self.__data = {}
if id is not None:
- db = DbManager()
- cursor = db.cursor()
-
- for row in cursor.execute("""SELECT * FROM track WHERE id = ?""",
- (id,)):
- setattr(self, "id", row[0])
- setattr(self, "tracknumber", row[1])
- setattr(self, "name", row[2])
- setattr(self, "grouping", row[3])
- setattr(self, "filename", row[4])
+ for row in self.db.execute("SELECT * FROM track WHERE id = ?",
+ (id,)):
+ for key in ["id", "tracknumber", "name", "grouping",
+ "filename"]:
+ setattr(self, key, row[key])
+ self.__data[key] = row[key]
else:
for (key, value) in kwargs.items():
setattr(self, key, value)
self.__data[key] = value
def delete(self):
- db = DbManager()
- cursor = db.cursor()
delete_sql = "DELETE FROM track WHERE id = ?"
- cursor.execute(delete_sql, (self.id,))
+
+ with self.db.conn:
+ self.db.execute(delete_sql, (self.id,))
+
+ # If there is an old album, remove it if it no longer has any
+ # tracks
+ try:
+ del self._album
+ except Exception:
+ pass
+
+ old_album = self.album
+
+ if old_album:
+ self.db.execute("DELETE FROM album_track WHERE track_id = ?",
+ (self.id,))
+
+ if not old_album.tracks:
+ old_album.delete()
+
+ # If there are old artists, remove them if they no longer have
+ # any tracks
+ try:
+ del self._artists
+ except Exception:
+ pass
+ old_artists = self.artists
+
+ for old_artist in old_artists:
+ self.db.execute("DELETE FROM artist_track WHERE track_id = "
+ "?", (self.id,))
+
+ if not old_artist.tracks:
+ old_artist.delete()
return True
@property
+ def db(self):
+ try:
+ return self._db
+ except AttributeError:
+ self._db = DbManager()
+ return self._db
+
+ @db.setter
+ def db(self, db):
+ self._db = db
+
+ @property
def album(self):
if not hasattr(self, "_album"):
setattr(self, "_album", None)
- db = DbManager()
- cursor = db.cursor()
-
- for row in cursor.execute("""SELECT album.* FROM album INNER JOIN
- album_track ON album.id =
- album_track.album_id WHERE track_id = ?
- LIMIT 1""", (self.id,)):
- setattr(self, "_album", Album(row[0]))
+ for row in self.db.execute("SELECT album.* FROM album INNER "
+ "JOIN album_track ON album.id = "
+ "album_track.album_id WHERE "
+ "track_id = ? LIMIT 1", (self.id,)):
+ setattr(self, "_album", Album(id=row["id"], db=self.db,
+ name=row["name"],
+ date=row["date"]))
return self._album
@property
def artists(self):
if not hasattr(self, "_artists"):
- db = DbManager()
- cursor = db.cursor()
+ cursor = self.db.cursor()
setattr(self, "_artists", [])
- for row in cursor.execute("""SELECT artist.* FROM artist INNER JOIN
- artist_track ON artist.id =
- artist_track.artist_id WHERE
- artist.id = ?""", (self.id,)):
- self._artists.append(Artist(row[0]))
+ for row in cursor.execute("SELECT artist.* FROM artist INNER JOIN "
+ "artist_track ON artist.id = "
+ "artist_track.artist_id WHERE "
+ "artist_track.track_id = ?",
+ (self.id,)):
+ self._artists.append(Artist(id=row["id"], db=self.db,
+ name=row["name"],
+ sortname=row["sortname"],
+ musicbrainz_artistid=row[
+ "musicbrainz_artistid"]))
return self._artists
def update(self, metadata):
- db = DbManager()
- c = db.cursor()
-
- c.execute("BEGIN TRANSACTION")
+ c = self.db.cursor()
artist_names = metadata["artist"]
musicbrainz_artist_ids = []
@@ -109,38 +151,38 @@ class Track:
rows = None
if musicbrainz_artistid:
- rows = c.execute("""SELECT * FROM artist WHERE
- musicbrainz_artistid = ?""",
+ rows = c.execute("SELECT * FROM artist WHERE "
+ "musicbrainz_artistid = ?",
(musicbrainz_artistid,))
else:
- rows = c.execute("""SELECT * FROM artist WHERE
- name = ?""", (artist_name,))
+ rows = c.execute("SELECT * FROM artist WHERE name = ?",
+ (artist_name,))
row = rows.fetchone()
if row:
- artist = Artist(id=row[0], name=row[1],
- sortname=row[2],
- musicbrainz_artistid=row[3])
+ artist = Artist(id=row["id"], db=self.db, name=row["name"],
+ sortname=row["sortname"],
+ musicbrainz_artistid=row[
+ "musicbrainz_artistid"])
if artist.name != artist_name:
- c.execute("""UPDATE artist SET name = ? WHERE id = ?""",
+ c.execute("UPDATE artist SET name = ? WHERE id = ?",
(artist_name, artist.id))
artist.name = artist_name
if artist.sortname != artistsort:
- c.execute("""UPDATE artist SET sortname = ? WHERE id =
- ? """, (artistsort, id))
+ c.execute("UPDATE artist SET sortname = ? WHERE id = ?",
+ (artistsort, id))
artist.sortname = artistsort
else:
- c.execute("""INSERT INTO artist
- (name, sortname, musicbrainz_artistid) VALUES(
- ?,?,?)""", (artist_name, artistsort,
- musicbrainz_artistid))
+ c.execute("INSERT INTO artist (name, sortname, "
+ "musicbrainz_artistid) VALUES(?, ?, ?)",
+ (artist_name, artistsort, musicbrainz_artistid))
artist = Artist(
- id=c.lastrowid, name=artist_name,
+ id=c.lastrowid, db=self.db, name=artist_name,
sortname=artistsort,
musicbrainz_artistid=musicbrainz_artistid
)
@@ -170,100 +212,135 @@ class Track:
if mb_albumid:
rows = c.execute(
- """SELECT * FROM album WHERE musicbrainz_albumid = ?""",
+ "SELECT * FROM album WHERE musicbrainz_albumid = ?",
(mb_albumid,)
)
row = rows.fetchone()
if row:
- album = Album(id=row[0], name=row[1], date=row[2],
- mb_albumid=row[3])
+ album = Album(id=row["id"], db=self.db, name=row["name"],
+ date=row["date"],
+ musicbrainz_albumid=row["musicbrainz_albumid"])
else:
- c.execute("""INSERT INTO album (name, `date`,
- musicbrainz_albumid) VALUES (?,?,?)""",
+ c.execute("INSERT INTO album (name, `date`, "
+ "musicbrainz_albumid) VALUES (?, ?, ?)",
(album_name, album_date, mb_albumid))
- album = Album(id=c.lastrowid, name=album_name,
+ album = Album(id=c.lastrowid, db=self.db, name=album_name,
date=album_date, musicbrainz_albumid=mb_albumid)
elif album_name:
rows = c.execute(
- """SELECT album.* FROM album INNER JOIN album_artist ON
- album_artist.album_id = album.id WHERE album.name = ?
- AND artist_id = ?""", (album_name, artist.id)
+ "SELECT album.* FROM album INNER JOIN album_artist ON "
+ "album_artist.album_id = album.id WHERE album.name = ? "
+ "AND artist_id = ?", (album_name, artist.id)
)
row = rows.fetchone()
if row:
- album = Album(id=row[0], name=row[1], date=row[2])
+ album = Album(id=row["id"], db=self.db, name=row["name"],
+ date=row["date"],
+ musicbrainz_albumid=row["musicbrainz_albumid"])
else:
- c.execute("""INSERT INTO album (name, `date`) VALUES
- (?,?)""", (album_name, album_date))
+ c.execute("INSERT INTO album (name, `date`) VALUES (?, ?)",
+ (album_name, album_date))
- album = Album(id=c.lastrowid, name=album_name,
+ album = Album(id=c.lastrowid, db=self.db, name=album_name,
date=album_date)
if album:
if album.name != album_name:
- c.execute("""UPDATE album SET name = ? WHERE id = ?""",
+ c.execute("UPDATE album SET name = ? WHERE id = ?",
(album_name, album.id))
album.name = album_name
if album.date != album_date:
- c.execute("""UPDATE album SET date = ? WHERE id = ?""",
+ c.execute("UPDATE album SET date = ? WHERE id = ?",
(album_date, album.id))
album.date = album_date
- for artist in artists:
- if album:
- try:
- c.execute(
- """INSERT INTO album_artist (artist_id,
- album_id) VALUES(?,?)""",
- (artist.id, album.id)
- )
- except sqlite3.IntegrityError:
- pass
-
track_number = None
track_name = None
track_grouping = None
try:
track_number = metadata["tracknumber"][0]
+ setattr(self, "tracknumber", track_number)
except KeyError:
pass
try:
track_name = metadata["title"][0]
+ setattr(self, "name", track_name)
except KeyError:
pass
try:
track_grouping = metadata["grouping"][0]
+ setattr(self, "grouping", track_grouping)
except KeyError:
pass
- c.execute("""UPDATE track SET tracknumber = ?, name = ?,
- grouping = ? WHERE id = ?""",
- (track_number, track_name, track_grouping,
- self.id))
+ c.execute("UPDATE track SET tracknumber = ?, name = ?, grouping = ? "
+ "WHERE id = ?", (track_number, track_name, track_grouping,
+ self.id))
+
+ # If there is an old album, remove it if it no longer has any tracks
+ try:
+ del self._album
+ except Exception:
+ pass
+
+ old_album = self.album
+
+ if old_album:
+ c.execute("DELETE FROM album_track WHERE track_id = ?", (self.id,))
+
+ if not old_album.tracks:
+ old_album.delete()
+
+ # If there are old artists, remove them if they no longer have
+ # any tracks
+ try:
+ del self._artists
+ except Exception:
+ pass
+ old_artists = self.artists
+
+ for old_artist in old_artists:
+ c.execute("DELETE FROM artist_track WHERE track_id = ?",
+ (self.id,))
+
+ if not old_artist.tracks:
+ old_artist.delete()
if album:
try:
- c.execute("""INSERT INTO album_track (album_id,
- track_id) VALUES(?,?)""",
- (album.id, self.id))
+ c.execute("INSERT INTO album_track (album_id, track_id) "
+ "VALUES(?, ?)", (album.id, self.id))
except sqlite3.IntegrityError:
pass
+ setattr(self, "_album", album)
+
for artist in artists:
try:
- c.execute("""INSERT INTO artist_track
- (artist_id, track_id) VALUES(?,?)""",
- (artist.id, self.id))
+ c.execute("INSERT INTO artist_track (artist_id, track_id) "
+ "VALUES(?, ?)", (artist.id, self.id))
except sqlite3.IntegrityError:
pass
- db.commit()
+ if album:
+ try:
+ c.execute(
+ "INSERT INTO album_artist (artist_id, album_id) "
+ "VALUES(?, ?)", (artist.id, album.id))
+ except sqlite3.IntegrityError:
+ pass
+
+ if artists:
+ setattr(self, "_artists", artists)
+
+ self.db.commit()
+ c.close()
return True
@@ -272,21 +349,25 @@ class Track:
# check if the internal dict has been modified
for (attr, value) in self.__dict__.items():
- if self.__data[attr] != getattr(self, attr):
- dirty_attributes[attr] = value
+ try:
+ if self.__data[attr] != getattr(self, attr):
+ dirty_attributes[attr] = value
+ except AttributeError:
+ pass
+ except KeyError:
+ pass
if len(dirty_attributes) > 0:
- db = DbManager()
- cursor = db.cursor()
-
set_clause = utils.update_clause_from_dict(dirty_attributes)
- dirty_attributes[id] = self.id
+ dirty_attributes["id"] = self.id
+
+ sql = " ".join(("UPDATE track", set_clause, "WHERE id = :id"))
- sql = " ".join(("UPDATE track"), set_clause, "WHERE id = :id")
- cursor.execute(sql, dirty_attributes)
+ with self.db.conn:
+ self.db.execute(sql, dirty_attributes)
- def search(**search_params):
+ def search(db=None, **search_params):
"""Find a track with the given params
Args:
@@ -296,8 +377,9 @@ class Track:
filename: dict, with 'data' and 'operator' keys
"""
- db = DbManager()
- cursor = db.cursor()
+ if not db:
+ db = DbManager()
+
tracks = []
# unpack search params
@@ -312,37 +394,40 @@ class Track:
result = None
if where_clause:
statement = " ".join(("SELECT * FROM track", where_clause))
- result = cursor.execute(statement, value_params)
+ result = db.execute(statement, value_params)
else:
- result = cursor.execute("SELECT * FROM track")
+ result = db.execute("SELECT * FROM track")
for row in result:
tracks.append(
- Track(id=row[0], tracknumber=row[1], name=row[3],
- grouping=row[3], filename=row[4])
+ Track(id=row["id"], db=db, tracknumber=row["tracknumber"],
+ name=row["name"], grouping=row["grouping"],
+ filename=row["filename"])
)
return tracks
- def find_by_path(path):
- db = DbManager()
- cursor = db.cursor()
+ def find_by_path(path, db=None):
+ if not db:
+ db = DbManager()
track = None
- for row in cursor.execute("""SELECT * FROM track WHERE filename = ?
- LIMIT 1""", (path,)):
- track = Track(row[0])
+ for row in db.execute("SELECT * FROM track WHERE filename = ? "
+ "LIMIT 1", (path,)):
+ track = Track(id=row["id"], db=db, tracknumber=row["tracknumber"],
+ name=row["name"], grouping=row["grouping"],
+ filename=row["filename"])
return track
- def store(filename, metadata):
- if Track.find_by_path(filename):
+ def store(filename, metadata, db=None):
+ if Track.find_by_path(filename, db=db):
return True
- db = DbManager()
- c = db.cursor()
+ if not db:
+ db = DbManager()
- c.execute("BEGIN TRANSACTION")
+ c = db.cursor()
artist_names = metadata["artist"]
musicbrainz_artist_ids = []
@@ -361,7 +446,7 @@ class Track:
for artist_name in artist_names:
musicbrainz_artistid = None
- artistsort = artist_name
+ artistsort = None
try:
musicbrainz_artistid = musicbrainz_artist_ids[i]
except IndexError:
@@ -374,48 +459,47 @@ class Track:
rows = None
row = None
if musicbrainz_artistid:
- rows = c.execute("""SELECT * FROM artist WHERE
- musicbrainz_artistid = ?""",
+ rows = c.execute("SELECT * FROM artist WHERE "
+ "musicbrainz_artistid = ?",
(musicbrainz_artistid,))
row = rows.fetchone()
if not row:
- rows = c.execute("""SELECT * FROM artist WHERE
- name = ? AND musicbrainz_artistid IS NULL""",
+ rows = c.execute("SELECT * FROM artist WHERE name = ? "
+ "AND musicbrainz_artistid IS NULL",
(artist_name,))
row = rows.fetchone()
if not row:
- rows = c.execute("""SELECT * FROM artist WHERE
- name = ?""", (artist_name,))
+ rows = c.execute("SELECT * FROM artist WHERE name = ?",
+ (artist_name,))
row = rows.fetchone()
if row:
- artist = Artist(id=row[0], name=row[1],
- sortname=row[2],
- musicbrainz_artistid=row[3])
+ artist = Artist(id=row["id"], db=db, name=row["name"],
+ sortname=row["sortname"],
+ musicbrainz_artistid=row[
+ "musicbrainz_artistid"])
if (musicbrainz_artistid and
- (not hasattr(artist, "musicbrainz_artistid")
- or not artist.musicbrainz_artistid)):
- c.execute("""UPDATE artist SET
- musicbrainz_artistid = ? WHERE id = ?""",
+ (not hasattr(artist, "musicbrainz_artistid") or
+ not artist.musicbrainz_artistid)):
+ c.execute("UPDATE artist SET musicbrainz_artistid = ? "
+ "WHERE id = ?",
(musicbrainz_artistid, artist.id))
if (artistsort and
- (not hasattr(artist, "sortname")
- or not artist.sortname)):
- c.execute("""UPDATE artist SET
- sortname = ? WHERE id = ?""",
+ (not hasattr(artist, "sortname") or
+ not artist.sortname)):
+ c.execute("UPDATE artist SET sortname = ? WHERE id = ?",
(artistsort, artist.id))
else:
- c.execute("""INSERT INTO artist
- (name, sortname, musicbrainz_artistid) VALUES(
- ?,?,?)""", (artist_name, artistsort,
- musicbrainz_artistid))
+ c.execute("INSERT INTO artist (name, sortname, "
+ "musicbrainz_artistid) VALUES(?, ?, ?)",
+ (artist_name, artistsort, musicbrainz_artistid))
artist = Artist(
- id=c.lastrowid, name=artist_name,
+ id=c.lastrowid, db=db, name=artist_name,
sortname=artistsort,
musicbrainz_artistid=musicbrainz_artistid
)
@@ -444,50 +528,46 @@ class Track:
pass
if mb_albumid:
- rows = c.execute(
- """SELECT * FROM album WHERE musicbrainz_albumid = ?""",
- (mb_albumid,)
- )
+ rows = c.execute("SELECT * FROM album WHERE "
+ "musicbrainz_albumid = ?", (mb_albumid,))
row = rows.fetchone()
if row:
- album = Album(id=row[0], name=row[1], date=row[2],
- mb_albumid=row[3])
+ album = Album(id=row["id"], db=db, name=row["name"],
+ date=row["date"], musicbrainz_albumid=row[
+ "musicbrainz_albumid"])
else:
- c.execute("""INSERT INTO album (name, `date`,
- musicbrainz_albumid) VALUES (?,?,?)""",
+ c.execute("INSERT INTO album (name, `date`, "
+ "musicbrainz_albumid) VALUES (?, ?, ?)",
(album_name, album_date, mb_albumid))
- album = Album(id=c.lastrowid, name=album_name,
+ album = Album(id=c.lastrowid, db=db, name=album_name,
date=album_date, musicbrainz_albumid=mb_albumid)
elif album_name:
for artist in artists:
- rows = c.execute(
- """SELECT album.* FROM album INNER JOIN album_artist ON
- album_artist.album_id = album.id WHERE album.name = ?
- AND artist_id = ?""", (album_name, artist.id)
- )
+ rows = c.execute("SELECT album.* FROM album INNER JOIN "
+ "album_artist ON album_artist.album_id = "
+ "album.id WHERE album.name = ? AND "
+ "artist_id = ?", (album_name, artist.id))
row = rows.fetchone()
if row:
- album = Album(id=row[0], name=row[1], date=row[2])
+ album = Album(id=row["id"], db=db, name=row["name"],
+ date=row["date"])
else:
- c.execute("""INSERT INTO album (name, `date`) VALUES
- (?,?)""", (album_name, album_date))
+ c.execute("INSERT INTO album (name, `date`) VALUES(?, ?)",
+ (album_name, album_date))
- album = Album(id=c.lastrowid, name=album_name,
+ album = Album(id=c.lastrowid, db=db, name=album_name,
date=album_date)
for artist in artists:
if album:
try:
- c.execute(
- """INSERT INTO album_artist (artist_id,
- album_id) VALUES(?,?)""",
- (artist.id, album.id)
- )
+ c.execute("INSERT INTO album_artist (artist_id, album_id) "
+ "VALUES(?, ?)", (artist.id, album.id))
except sqlite3.IntegrityError:
pass
@@ -508,63 +588,65 @@ class Track:
pass
track = None
- rows = c.execute("""SELECT * FROM track WHERE filename = ?""",
- (filename,))
+ rows = c.execute("SELECT * FROM track WHERE filename = ?", (filename,))
row = rows.fetchone()
if row:
- track = Track(id=row[0], tracknumber=row[1], name=row[2],
- grouping=row[3], filename=row[4])
+ track = Track(id=row["id"], db=db,
+ tracknumber=row["tracknumber"], name=row["name"],
+ grouping=row["grouping"], filename=row["filename"])
else:
- c.execute("""INSERT INTO track (tracknumber, name,
- grouping, filename) VALUES(?,?,?,?)""",
+ c.execute("INSERT INTO track (tracknumber, name, grouping, "
+ "filename) VALUES(?, ?, ?, ?)",
(track_number, track_name, track_grouping,
filename))
- track = Track(id=c.lastrowid, tracknumber=track_number,
+ track = Track(id=c.lastrowid, db=db, tracknumber=track_number,
name=track_name, grouping=track_grouping,
filename=filename)
if album:
try:
- c.execute("""INSERT INTO album_track (album_id,
- track_id) VALUES(?,?)""",
- (album.id, track.id))
+ c.execute("INSERT INTO album_track (album_id, track_id) "
+ "VALUES(?,?)", (album.id, track.id))
except sqlite3.IntegrityError:
pass
for artist in artists:
try:
- c.execute("""INSERT INTO artist_track
- (artist_id, track_id) VALUES(?,?)""",
- (artist.id, track.id))
+ c.execute("INSERT INTO artist_track (artist_id, track_id) "
+ "VALUES(?, ?)", (artist.id, track.id))
except sqlite3.IntegrityError:
pass
db.commit()
+ c.close()
- return True
+ return track
+
+ def all(db=None, order="track.id", direction="ASC", limit=None,
+ offset=None):
+ if not db:
+ db = DbManager()
- def all(order="track.id", direction="ASC", limit=None, offset=None):
- db = DbManager()
tracks = []
- select_string = """SELECT * FROM track LEFT JOIN artist_track ON
- artist_track.track_id = track.id LEFT JOIN artist ON
- artist_track.artist_id = artist.id LEFT JOIN album_track ON
- album_track.track_id = track.id LEFT JOIN album ON
- album_track.album_id = album.id ORDER BY %s %s""" % (order,
- direction)
+ select_string = "SELECT * FROM track LEFT JOIN artist_track ON " \
+ "artist_track.track_id = track.id LEFT JOIN artist ON " \
+ "artist_track.artist_id = artist.id LEFT JOIN album_track ON " \
+ "album_track.track_id = track.id LEFT JOIN album ON " \
+ "album_track.album_id = album.id ORDER BY %s %s" % (order,
+ direction)
- if limit is not None and offset is not None:
+ if limit and offset:
select_string = " ".join((select_string,
- "LIMIT %s OFFSET %s" % (limit, offset)))
+ "LIMIT %s OFFSET %s" % (limit, offset)))
result = db.execute(select_string)
for row in result:
- tracks.append(
- Track(id=row[0], tracknumber=row[1], name=row[3],
- grouping=row[3], filename=row[4])
- )
+ tracks.append(Track(id=row["id"], db=db,
+ tracknumber=row["tracknumber"],
+ name=row["name"], grouping=row["name"],
+ filename=row["filename"]))
return tracks
diff --git a/requirements.txt b/requirements.txt
index 1433b04..f3895ff 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,7 @@
-Flask==0.10.1
-Flask-Compress==1.3.0
-Flask-Login==0.3.2
-gevent==1.1rc3
-mutagen==1.31
-passlib==1.6.5 \ No newline at end of file
+Flask>=0.10.1
+Flask-Compress>=1.3.0
+Flask-Login>=0.3.2
+gevent>=1.1rc3
+mutagen>=1.31
+passlib>=1.6.5
+pytest>=2.8.7
diff --git a/tests/common/utils_test.py b/tests/common/utils_test.py
new file mode 100644
index 0000000..9cf1c55
--- /dev/null
+++ b/tests/common/utils_test.py
@@ -0,0 +1,14 @@
+from common import utils
+
+
+def test_make_where_clause():
+ params_with_between = {"column": "BETWEEN"}
+
+ assert utils.make_where_clause(params_with_between) == "WHERE column "\
+ "BETWEEN :column1 AND :column2"
+
+
+def test_update_clause_from_dict():
+ test_data = {"name": "Flaf"}
+
+ assert utils.update_clause_from_dict(test_data) == "SET name = :name"
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..0028475
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,44 @@
+import os
+
+import pytest
+
+from db.db_manager import DbManager
+
+
+@pytest.fixture(scope="module")
+def database(request):
+ database = DbManager(
+ db=os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "test.db"))
+
+ def fin():
+ database.close()
+
+ request.addfinalizer(fin)
+
+ return database
+
+
+@pytest.fixture(scope="module")
+def test_file():
+ test_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "test.ogg")
+
+ return test_file
+
+
+@pytest.fixture(scope="class")
+def app(request):
+ db = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "testapp.db")
+ library_db = DbManager(
+ db=os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "test.db"))
+
+ def fin():
+ library_db.close()
+
+ request.addfinalizer(fin)
+
+ request.cls.db = db
+ request.cls.library_db = library_db
diff --git a/tests/db/db_manager_test.py b/tests/db/db_manager_test.py
new file mode 100644
index 0000000..911c62d
--- /dev/null
+++ b/tests/db/db_manager_test.py
@@ -0,0 +1,53 @@
+from db.db_manager import DbManager
+
+
+class TestDbManager:
+
+ def test_create_tables(self):
+ assert DbManager.create_album_table == "CREATE TABLE IF NOT EXISTS "\
+ "album (id INTEGER PRIMARY KEY, name TEXT, date TEXT, "\
+ "musicbrainz_albumid TEXT)"
+
+ assert DbManager.create_album_artist_table == "CREATE TABLE IF NOT "\
+ "EXISTS album_artist (album_id INTEGER, artist_id INTEGER, "\
+ "CONSTRAINT ALBUM_ARTIST_PK PRIMARY KEY (album_id, artist_id), "\
+ "CONSTRAINT ALBUM_ARTIST_FK_ALBUM FOREIGN KEY (album_id) "\
+ "REFERENCES album(id), CONSTRAINT ALBUM_ARTIST_FK_ARTIST "\
+ "FOREIGN KEY(artist_id) REFERENCES artist(id))"
+
+ assert DbManager.create_album_track_table == "CREATE TABLE IF NOT "\
+ "EXISTS album_track (album_id INTEGER, track_id INTEGER, "\
+ "CONSTRAINT ALBUM_TRACK_PK PRIMARY KEY (album_id, track_id), "\
+ "FOREIGN KEY(album_id) REFERENCES album(id), FOREIGN "\
+ "KEY(track_id) REFERENCES track(id))"
+
+ assert DbManager.create_artist_table == "CREATE TABLE IF NOT EXISTS "\
+ "artist (id INTEGER PRIMARY KEY, name TEXT(2000000000), sortname "\
+ "TEXT(2000000000), musicbrainz_artistid TEXT(2000000000))"
+
+ assert DbManager.create_artist_track_table == "CREATE TABLE IF NOT "\
+ "EXISTS artist_track (artist_id INTEGER, track_id INTEGER, "\
+ "CONSTRAINT ARTIST_TRACK_PK PRIMARY KEY (artist_id, track_id), "\
+ "FOREIGN KEY(artist_id) REFERENCES artist(id), FOREIGN "\
+ "KEY(track_id) REFERENCES track(id))"
+
+ assert DbManager.create_track_table == "CREATE TABLE IF NOT EXISTS "\
+ "track (id INTEGER, tracknumber INTEGER, name TEXT(2000000000), "\
+ "grouping TEXT(2000000000), filename TEXT(2000000000), "\
+ "CONSTRAINT TRACK_PK PRIMARY KEY (id))"
+
+ assert DbManager.create_musicbrainz_artist_index == "CREATE UNIQUE "\
+ "INDEX IF NOT EXISTS artist_musicbrainz_artistid ON "\
+ "artist(musicbrainz_artistid ASC)"
+
+ assert DbManager.create_track_filename_index == "CREATE INDEX IF NOT "\
+ "EXISTS track_filename_IDX ON track(filename)"
+
+ assert DbManager.create_track_grouping_index == "CREATE INDEX IF NOT "\
+ "EXISTS track_grouping_IDX ON track(grouping)"
+
+ assert DbManager.create_track_name_index == "CREATE INDEX IF NOT "\
+ "EXISTS track_name_IDX ON track(name)"
+
+ assert DbManager.create_track_number_index == "CREATE INDEX IF NOT "\
+ "EXISTS track_tracknumber_IDX ON track(tracknumber)"
diff --git a/tests/mach2_test.py b/tests/mach2_test.py
new file mode 100644
index 0000000..c45ff2a
--- /dev/null
+++ b/tests/mach2_test.py
@@ -0,0 +1,52 @@
+import json
+import unittest
+
+import pytest
+
+from mach2 import create_app
+
+
+@pytest.mark.usefixtures("app")
+class Mach2TestCase(unittest.TestCase):
+
+ def setUp(self):
+ app = create_app(database=self.db, library=self.library_db)
+ app.config['TESTING'] = True
+ self.app = app.test_client()
+
+ def login(self, username, password):
+ return self.app.post('/login', data=dict(
+ username=username,
+ password=password
+ ), follow_redirects=True)
+
+ def logout(self):
+ return self.app.get('/logout', follow_redirects=True)
+
+ def test_login(self):
+ rv = self.login("admin", "testpass")
+ assert bytes("Log out", "utf-8") in rv.data
+ self.logout()
+ rv = self.login("wrong", "definitelywrong")
+ assert bytes("Log out", "utf-8") not in rv.data
+ self.logout()
+
+ def test_album(self):
+ self.login("admin", "testpass")
+
+ rv = self.app.get("/albums/1")
+ assert bytes("Album 1", "utf-8") in rv.data
+
+ self.logout()
+
+ def test_artists(self):
+ self.login("admin", "testpass")
+ rv = self.app.get("/artists")
+
+ assert bytes("Artist 1", "utf-8") in rv.data
+ assert bytes("Artist 2", "utf-8") in rv.data
+
+ artists = json.loads(rv.data.decode("utf-8"))
+ assert artists
+
+ self.logout()
diff --git a/tests/models/album_test.py b/tests/models/album_test.py
new file mode 100644
index 0000000..680a5cb
--- /dev/null
+++ b/tests/models/album_test.py
@@ -0,0 +1,63 @@
+from models.album import Album
+
+
+def test_instance(database):
+ album = Album(id=1, db=database)
+ assert album.id == 1
+ assert album.name == "Album 1"
+ assert album.date == "1999-02-04"
+
+
+def test_artists(database):
+ album = Album(id=1, db=database)
+ assert len(album.artists) == 1
+ assert album.artists[0].name == "Artist 2"
+
+
+def test_tracks(database):
+ album = Album(id=1, db=database)
+ assert len(album.tracks) == 2
+ assert album.tracks[0].name == "Album track 1"
+ assert album.tracks[0].tracknumber == 1
+ assert album.tracks[0].filename == "album/1.mp3"
+ assert album.tracks[1].name == "Album track 2"
+ assert album.tracks[1].tracknumber == 2
+ assert album.tracks[1].grouping == "swing"
+ assert album.tracks[1].filename == "album/2.mp3"
+
+
+def test_delete(database):
+ with database.conn:
+ cursor = database.cursor()
+
+ cursor.execute("INSERT INTO album (name, date) VALUES(?,?)",
+ ("Test album", "2016-02-05"))
+
+ album_id = cursor.lastrowid
+ cursor.close()
+
+ album = Album(album_id, db=database)
+
+ assert album.delete()
+
+ test_album = Album(album_id, db=database)
+ assert not hasattr(test_album, "name")
+
+
+def test_search(database):
+ search_payload = {"name": {"data": "Album 1", "operator": "="}}
+ album_results = Album.search(db=database, **search_payload)
+
+ assert len(album_results) > 0
+
+ invalid_search_payload = {"name": {"data": "This album does not exist",
+ "operator": "="}}
+ no_album_results = Album.search(db=database, **invalid_search_payload)
+
+ assert len(no_album_results) == 0
+
+
+def test_all(database):
+ album_results = Album.all(db=database)
+
+ assert len(album_results) > 0
diff --git a/tests/models/artist_test.py b/tests/models/artist_test.py
new file mode 100644
index 0000000..bb66813
--- /dev/null
+++ b/tests/models/artist_test.py
@@ -0,0 +1,67 @@
+from models.artist import Artist
+
+
+def test_instance(database):
+ album = Artist(id=1, db=database)
+ assert album.id == 1
+ assert album.name == "Artist 1"
+
+
+def test_albums(database):
+ artist1 = Artist(id=1, db=database)
+ assert len(artist1.albums) == 0
+ artist2 = Artist(id=2, db=database)
+ assert len(artist2.albums) == 1
+ assert artist2.albums[0].name == "Album 1"
+ assert artist2.albums[0].date == "1999-02-04"
+
+
+def test_tracks(database):
+ artist1 = Artist(id=1, db=database)
+ assert len(artist1.tracks) == 1
+ assert artist1.tracks[0].name == "Non album track"
+ assert artist1.tracks[0].tracknumber is None
+ assert artist1.tracks[0].filename == "1.mp3"
+ artist2 = Artist(id=2, db=database)
+ assert artist2.tracks[0].name == "Album track 1"
+ assert artist2.tracks[0].tracknumber == 1
+ assert artist2.tracks[0].filename == "album/1.mp3"
+ assert artist2.tracks[1].name == "Album track 2"
+ assert artist2.tracks[1].tracknumber == 2
+ assert artist2.tracks[1].grouping == "swing"
+ assert artist2.tracks[1].filename == "album/2.mp3"
+
+
+def test_delete(database):
+ with database.conn:
+ cursor = database.cursor()
+
+ cursor.execute("INSERT INTO artist (name) VALUES(?)", ("Test artist",))
+
+ artist_id = cursor.lastrowid
+
+ artist = Artist(artist_id, db=database)
+
+ assert artist.delete()
+
+ test_artist = Artist(artist_id, db=database)
+ assert not hasattr(test_artist, "name")
+
+
+def test_search(database):
+ search_payload = {"name": {"data": "Artist 1", "operator": "="}}
+ artist_results = Artist.search(db=database, **search_payload)
+
+ assert len(artist_results) > 0
+
+ invalid_search_payload = {"name": {"data": "This artist does not exist",
+ "operator": "="}}
+ no_artist_results = Artist.search(db=database, **invalid_search_payload)
+
+ assert len(no_artist_results) == 0
+
+
+def test_all(database):
+ artist_results = Artist.all(db=database)
+
+ assert len(artist_results) > 0
diff --git a/tests/models/track_test.py b/tests/models/track_test.py
new file mode 100644
index 0000000..56685fd
--- /dev/null
+++ b/tests/models/track_test.py
@@ -0,0 +1,106 @@
+import mutagen
+
+from models.track import Track
+
+
+def test_instance(database):
+ track = Track(id=1, db=database)
+ assert track.id == 1
+ assert track.name == "Non album track"
+ assert track.filename == "1.mp3"
+
+
+def test_as_dict(database):
+ track = Track(id=1, db=database)
+
+ track_dict = track.as_dict()
+
+ assert "_db" not in track_dict.keys()
+ assert track_dict["id"] == 1
+ assert track_dict["name"] == "Non album track"
+ assert track_dict["filename"] == "1.mp3"
+
+
+def test_album(database):
+ track1 = Track(id=1, db=database)
+ assert track1.album is None
+ track2 = Track(id=2, db=database)
+ assert track2.album.name == "Album 1"
+ assert track2.album.date == "1999-02-04"
+
+
+def test_artists(database):
+ track = Track(id=1, db=database)
+ assert track.artists is not None
+ assert len(track.artists) > 0
+ assert track.artists[0].name == "Artist 1"
+
+
+def test_find_by_path(database):
+ track1 = Track.find_by_path("album/2.mp3", db=database)
+
+ assert track1.filename == "album/2.mp3"
+ assert track1.name == "Album track 2"
+ assert track1.grouping == "swing"
+
+ nonexistent_track = Track.find_by_path("path/does/not/exist.mp3",
+ db=database)
+ assert nonexistent_track is None
+
+
+def test_search(database):
+ tracks = Track.search(db=database, name={"data": "Album track %",
+ "operator": "LIKE"})
+
+ assert tracks is not None
+ assert len(tracks) == 2
+
+
+def test_store(database, test_file):
+ metadata = mutagen.File(test_file, easy=True)
+
+ test_track = Track.store(test_file, metadata, db=database)
+
+ assert test_track.filename == test_file
+ assert test_track.name == "Silence"
+ assert test_track.grouping == "Jazz"
+ assert test_track.tracknumber == 3
+
+ assert test_track.album.name == "Dummy album"
+ assert test_track.album.date == "2003"
+
+ assert test_track.artists
+ assert test_track.artists[0].name == "Test Artist Flaf"
+
+
+def test_update(database, test_file):
+ metadata = {"artist": ["New artist"], "title": ["New title"]}
+
+ test_track = Track.find_by_path(test_file, db=database)
+ test_track.update(metadata)
+
+ assert test_track.artists
+ assert len(test_track.artists) == 1
+ assert test_track.artists[0].name == "New artist"
+ assert test_track.name == "New title"
+
+
+def test_save(database, test_file):
+ test_track = Track.find_by_path(test_file, db=database)
+
+ test_track.name = "Totally new name"
+ test_track.save()
+
+ new_track_to_test = Track.find_by_path(test_file, db=database)
+
+ assert new_track_to_test.name == "Totally new name"
+
+
+def test_delete(database, test_file):
+ test_track = Track.find_by_path(test_file, db=database)
+
+ test_track.delete()
+
+ should_not_exist = Track.find_by_path(test_file, db=database)
+
+ assert should_not_exist is None
diff --git a/tests/test.db b/tests/test.db
new file mode 100644
index 0000000..ccb7825
--- /dev/null
+++ b/tests/test.db
Binary files differ
diff --git a/tests/test.ogg b/tests/test.ogg
new file mode 100644
index 0000000..4ece5bc
--- /dev/null
+++ b/tests/test.ogg
Binary files differ
diff --git a/tests/testapp.db b/tests/testapp.db
new file mode 100644
index 0000000..2fc7b8c
--- /dev/null
+++ b/tests/testapp.db
Binary files differ