Create a RESTfull API using Python and Flask (Part 3)

Kasun Dissanayake
8 min readMar 16, 2019

--

Summary From Part 2

In part 2, we implemented a small working API using list of Python dictionaries.

In this part, we’re finally going to retrieve data from a database, implement error handling, and filter Music Tracks by Track Id, Name, Album Id and the Composer. The database that is used is SQLite, a lightweight database engine that is supported in Python by default. SQLite files typically end with the .db file extension.

Let’s Go

Before we modify our code, first download the example database from this location and copy the file to your root folder using your graphical user interface (GUI) and unzip it. The final version of our API will query this database when returning results to users.

Copy the below code into your text editor. As before, we’ll examine the code in more depth once you have it running.

import flaskfrom flask import request, jsonifyimport sqlite3app = flask.Flask(__name__)app.config["DEBUG"] = Truedef dict_factory(cursor, row):d = {}for idx, col in enumerate(cursor.description):d[col[0]] = row[idx]return d@app.route('/', methods=['GET'])def home():return '''<h1>Distant Reading Archive</h1><p>A prototype API for distant reading of Music Albums and Playlists.</p>'''#Get all media types@app.route('/api/v1/resources/media/all', methods=['GET'])def api_media_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()all_media = cur.execute('SELECT * FROM media_types;').fetchall()return jsonify(all_media)#Get all genres types@app.route('/api/v1/resources/genres/all', methods=['GET'])def api_genres_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()genres = cur.execute('SELECT * FROM genres;').fetchall()return jsonify(genres)#Get all playlists types@app.route('/api/v1/resources/playlists/all', methods=['GET'])def api_playlists_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()playlists = cur.execute('SELECT * FROM playlists;').fetchall()return jsonify(playlists)#Get all playlist_track types@app.route('/api/v1/resources/playlist_track/all', methods=['GET'])def api_playlist_track_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()playlist_track = cur.execute('SELECT * FROM playlist_track;').fetchall()return jsonify(playlist_track)#Get all tracks types@app.route('/api/v1/resources/tracks/all', methods=['GET'])def api_tracks_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()tracks = cur.execute('SELECT * FROM tracks;').fetchall()return jsonify(tracks)#Get all artists types@app.route('/api/v1/resources/artists/all', methods=['GET'])def api_artists_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()artists = cur.execute('SELECT * FROM artists;').fetchall()return jsonify(artists)#Get all albums types@app.route('/api/v1/resources/albums/all', methods=['GET'])def api_albums_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()albums = cur.execute('SELECT * FROM albums;').fetchall()return jsonify(albums)@app.errorhandler(404)def page_not_found(e):return "<h1>404</h1><p>The resource could not be found.</p>", 404#Get all tracks by filtering@app.route('/api/v1/resources/tracks', methods=['GET'])def api_filter():query_parameters = request.argstrackid = query_parameters.get('TrackId')name = query_parameters.get('Name')albumid = query_parameters.get('AlbumId')composer = query_parameters.get('Composer')query = "SELECT * FROM tracks WHERE"to_filter = []if trackid:query += ' trackid=? AND'to_filter.append(trackid)if name:query += ' name=? AND'to_filter.append(name)if albumid:query += ' albumid=? AND'to_filter.append(albumid)if composer:query += ' composer=? AND'to_filter.append(composer)if not (trackid or name or albumid or composer):return page_not_found(404)query = query[:-4] + ';'conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()results = cur.execute(query, to_filter).fetchall()return jsonify(results)app.run()

Save the code as app.py in your root folder and run it by navigating to your project folder using the terminal and by entering the command:

python app.py

Once this example is running, try out the filtering functionality with these HTTP requests:

When you navigate to http://127.0.0.1:5000/api/v1/resources/media/all following output will be displayed.

When you navigate to http://127.0.0.1:5000/api/v1/resources/genres/all following output will be displayed.

When you navigate to http://127.0.0.1:5000/api/v1/resources/playlists/all following output will be displayed.

When you navigate to http://127.0.0.1:5000/api/v1/resources/playlist_track/all following output will be displayed.

When you navigate to http://127.0.0.1:5000/api/v1/resources/artists/all following output will be displayed.

When you navigate to http://127.0.0.1:5000/api/v1/resources/albums/all following output will be displayed.

When you navigate to http://127.0.0.1:5000/api/v1/resources/tracks?Composer=Angus+Young,+Malcolm+Young,+Brian+Johnson following output will be displayed.

As shown above, you can request for data in Music Tracks, Albums, Artists, Playlists, Genres etc. by using this application. If you are requesting for all details, those requests will return data with all the arrtibutes (fields). But if you want to get filtered data, you should use a request like the last one.

The first request returns all entries in the database, similar to the /all request we implemented for the last version of our API. The last request returns all tracks by the composer Angus Young, Malcolm Young, Brian Johnson .

?Composer=Angus+Young,+Malcolm+Young,+Brian+Johnson

Note that, within a query parameter, spaces between words are denoted with a + sign,

Angus+Young,+Malcolm+Young,+Brian+Johnson

You can request filters by one or more fields. Our API allows users to filter Tracks by four fields: TrackId, Name, AlbumId, Composer

http://127.0.0.1:5000/api/v1/resources/tracks?Composer=Angus+Young,+Malcolm+Young,+Brian+Johnson&TrackId=1

This request returns all tracks belongs to composer who is Angus Young, Malcolm Young, Brian Johnson and which Track Id is 1.

As we can see this version of our API serves a larger number of results, data that are stored in an SQLite database (chinook.db). When our user requests an entry or set of entries, our API pulls that information from the database by building and executing an SQL query. This iteration of our API also allows for filtering by more than one field. We’ll discuss potential uses of this functionality after examining our code more closely.

Examining Our API

The database we’re working with has many tables (Ex: media_types, genres, playlists, tracks, artists albums….). Each table consists of different columns(Fields) and rows(Table Data).

Rather than use test data defined in the application, our api_media_all function pulls in data from our chinook database:

#Get all media types@app.route('/api/v1/resources/media/all', methods=['GET'])def api_media_all():conn = sqlite3.connect('chinook.db')conn.row_factory = dict_factorycur = conn.cursor()all_media = cur.execute('SELECT * FROM media_types;').fetchall()return jsonify(all_media)

First, we connect to the database using our sqlite3 library. An object representing the connection to the database is bound to the conn variable. The conn.row_factory = dict_factory line lets the connection object know to use the dict_factory function we’ve defined, which returns items from the database as dictionaries rather than lists—these work better when we output them to JSON.

Then we create a cursor object (cur = conn.cursor()), which is the object that actually moves through the database to pull our data.

After that, we execute an SQL query with the cur.execute method to pull out all available data (*) from the media_types table of our database.

At the end of our function, this data is returned as JSON: jsonify(all_media). Note that our other function that returns data, api_filter, will use a similar approach to pull data from the database.

The purpose of our page_not_found function is to create an error page seen by the user if the user encounters an error or inputs a route that hasn’t been defined:

@app.errorhandler(404)
def page_not_found(e):
return "<h1>404</h1><p>The resource could not be found.</p>", 404

In HTML responses, the code 200 means “OK”(the expected data transferred), while the code 404 means “Not Found” (there was no resource available at the URL given). This function allows us to return 404 pages when something goes wrong in the application.

This new function allows for filtering by four different fields: TrackId, Name, AlbumId,and Composer. The function first grabs all the query parameters provided in the URL (remember, query parameters are the part of the URL that follows the ?, like ?TrackId=1).

query_parameters = request.args

It then pulls the supported parameters TrackId, Name, AlbumId,and Composer .Then binds them to appropriate variables:

trackid = query_parameters.get('TrackId')
name = query_parameters.get('Name')
albumid = query_parameters.get('AlbumId')
composer = query_parameters.get('Composer')

The next segment begins to build an SQL query that will be used to find the requested information in the database. SQL queries used to find data in a database take this form:

SELECT <columns> FROM <table> WHERE <column=match> AND <column=match>;

To get the correct data, we need to build both an SQL query that looks like the above and a list with the filters that will be matched. Combined, the query and the the filters provided by the user will allow us to pull the correct tracks from our database.

We begin to define both the query and the filter list:

query = "SELECT * FROM tracks WHERE"
to_filter = []

Then, if TrackId, Name, AlbumId or Composer were provided as query parameters, we add them to both the query and the filter list:

if trackid:
query += ' trackid=? AND'
to_filter.append(trackid)
if name:
query += ' name=? AND'
to_filter.append(name)
if albumid:
query += ' albumid=? AND'
to_filter.append(albumid)
if composer:
query += ' composer=? AND'
to_filter.append(composer)

If the user has provided none of these query parameters, we have nothing to show, so we send them to the “404 Not Found” page:

if not (trackid or name or albumid or composer):
return page_not_found(404)

Next, we required ‘ ; ’ at the end of all SQL statements:

query = query[:-4] + ';'

Finally, we connect to our database as in our api_media_all function, then execute the query we’ve built using our filter list:

conn = sqlite3.connect('chinook.db')
conn.row_factory = dict_factory
cur = conn.cursor()
results = cur.execute(query, to_filter).fetchall()

Finally, we return the results of our executed SQL query as JSON to the user:

return jsonify(results)

Whew! When all is said and done, this section of the code reads query parameters provided by the user, builds an SQL query based on those parameters, executes that query to find matching tracks in the database, and returns those matches as JSON to the user. Here our API’s filtering capability considerably more sophisticated — users can now find Music Tracks by TrackId, Name, AlbumId and Composer.

Note : In this filter section, I have used only Tracks table to filter data.You can implement your own API with other tables too.

Congratulations !!! You’ve created a working RESTfull API !

You can find the work we did in this tutorial in the below git repository named Create a RESTfull API using Python and Flask.

Thank you and Goodbye!!

https://github.com/KasunDissanayake94/Create-a-RESTfull-API-using-Python-and-Flask

--

--

Kasun Dissanayake
Kasun Dissanayake

Written by Kasun Dissanayake

Senior Software Engineer at IFS R & D International || Former Software Engineer at Pearson Lanka || Former Associate Software Engineer at hSenid Mobile

No responses yet