Productionalize analytics Flask app with Docker, AWS, and Google API

Hopefully this can be useful to anyone looking to host a Flask app on AWS with Docker, but more specifically this will deal with many of the hurdles involved with putting this app into production while analyzing Google Analytics API data, visualizing with Plotly Dash, and version controlled with Amazon Elastic Container Registry (ECR).

Here’s the general form of the visualization that will be built here for any given day in some date range. This will connect directly to your Google Analytics account and a slider bar can be dragged to change the visualization from day to day.

Start with some imports for the file.

My directory structure looks like this:

— application.py
— credentials.json
— docker-compose.yml
— Dockerfile
— nginx
— requirements.txt
— my-aws-instance.pem

I’ll start with the file.

import httplib2 as lib2
from oauth2client.client import OAuth2WebServerFlow
from oauth2client import file, client, tools
import google.oauth2.credentials
import google_auth_oauthlib.flow
import googleapiclient.discovery
from googleapiclient.discovery import build as google_build
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
from textwrap import dedent
import plotly.graph_objs as go
import flaskimport pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
import os
import requests
server = flask.Flask(__name__)
app = dash.Dash(sharing=True, server=server, csrf_protect=False)
app.config.suppress_callback_exceptions = True

After we set up the application, we need to import the file which has our and . This has the OAuth 2.0 information for this application.

Whatever Google API you plan on using, here we are using Google Analytics v4 API, you will have to enable the API in your Google account here (if you already have a project with APIs enabled and credentials you can skip this part). After you enable the API, go to the navigation panel then click on APIs & Services and then go to Credentials. Click on Create Credentials and then OAuth Client ID. Once you have created a client ID you can click on Download JSON to the far right of the page and it will download the file you will need.

CLIENT_SECRETS_FILE = "client_secret.json"SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
API_SERVICE_NAME = 'analytics'
API_VERSION = 'v4'

Then we want to create an app server secret key for our flask app. You can simply create one by running the command:

os.urandom(24)

and then pasting this into

app.server.secret_key = '\xsdf2\x32\x34\x54\xbe2\x34\x53\x3hj+\xd3\x43n>\xdj3\xe34d\xtt\x46\x83\xe2'

Every time you query the Google API, you must have a Refresh Token and an Access Token. These tokens expire every 3600s, so you can generate one from the playground, but ultimately this will not work in production because the tokens will expire. So we want a way to authorize our web server application for Google Analytics so that any user of the application must be authenticated and then the tokens are automatically generated and the application will store the tokens in the flask session (more to come on this). We will then dump these tokens into a file where we can pull them back in later as needed.

So every time a new user hits the Flask app, they will click an Authorize API button.

This will be redirect them to authorize the application with the tokens.

Then the user will be redirected back to the app because in below the return is . Now the holds the tokens needed to query the API.

with open('credentials.json') as f:
client_ids = json.loads(f.read())
@app.server.route('/')
def index():
print('IM IN ROUTE /')
return print_index_table()
@app.server.route('/test')
def test_api_request():
print('IM IN ROUTE /TEST')
if 'credentials' not in flask.session:
return flask.redirect('authorize')
# Load credentials from the session.
credentials = google.oauth2.credentials.Credentials(
**flask.session['credentials'])
print(credentials_to_dict(credentials))
# Save credentials back to session in case access token was
refreshed.
flask.session['credentials'] = credentials_to_dict(credentials)
return flask.redirect(flask.url_for('/'))
@app.server.route('/authorize')
def authorize():
print('IM IN ROUTE /AUTHORIZE')
# Create flow instance to manage the OAuth 2.0 Authorization Grant Flow steps.
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
CLIENT_SECRETS_FILE, scopes=SCOPES)
flow.redirect_uri = flask.url_for('oauth2callback', _external=True)
authorization_url, state = flow.authorization_url(
# Enable offline access so that you can refresh an access token without
# re-prompting the user for permission. Recommended for web server apps.
access_type='offline',
approval_prompt='force',
# Enable incremental authorization. Recommended as a best practice.
include_granted_scopes='true')
# Store the state so the callback can verify the auth server response.
flask.session['state'] = state

return flask.redirect(authorization_url)
@app.server.route('/oauth2callback')
def oauth2callback():
print('IM IN ROUTE /OAUTH')
# Specify the state when creating the flow in the callback so that it can
# verified in the authorization server response.
state = flask.session['state']
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
CLIENT_SECRETS_FILE, scopes=SCOPES, state=state)
flow.redirect_uri = flask.url_for('oauth2callback', _external=True)
# Use the authorization server's response to fetch the OAuth 2.0 tokens.
authorization_response = flask.request.url
flow.fetch_token(authorization_response=authorization_response)

# Store credentials in the session.
credentials = flow.credentials
flask.session['credentials'] = credentials_to_dict(credentials)
with open('credentials.json', 'w') as f:
json.dump(credentials_to_dict(credentials), f)
return flask.redirect(flask.url_for('test_api_request'))@app.server.route('/revoke')
def revoke():
if 'credentials' not in flask.session:
return ('You need to <a href="/authorize">authorize</a> before ' + 'testing the code to revoke credentials.')
credentials = google.oauth2.credentials.Credentials(
**flask.session['credentials'])

revoke = requests.post('https://accounts.google.com/o/oauth2/revoke', params={'token': credentials.token}, headers = {'content-type': 'application/x-www-form-urlencoded'})
status_code = getattr(revoke, 'status_code') if status_code == 200:
return('Credentials successfully revoked.' + print_index_table())
else:
return('An error occurred.' + print_index_table())
@app.server.route('/clear')
def clear_credentials():
if 'credentials' in flask.session:
del flask.session['credentials']
return ('Credentials have been cleared.<br><br>' + print_index_table())
def credentials_to_dict(credentials):
return {'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes}
def print_index_table():
return ('<table>' +
'<tr><td><a href="/test">Test an API request</a></td>' +
'<td>Submit an API request and see a formatted JSON response. ' + ' Go through the authorization flow if there are no stored ' + ' credentials for the user.</td></tr>' + '<tr><td><a href="/authorize">Test the auth flow directly</a></td>' + '<td>Go directly to the authorization flow. If there are stored ' + ' credentials, you still might not be prompted to reauthorize ' + ' the application.</td></tr>' + '<tr><td><a href="/revoke">Revoke current credentials</a></td>' + '<td>Revoke the access token associated with the current user ' + ' session. After revoking credentials, if you go to the test ' + ' page, you should see an <code>invalid_grant</code> error.' + '</td></tr>' + '<tr><td><a href="/clear">Clear Flask session credentials</a></td>' + '<td>Clear the access token currently stored in the user session. ' + ' After clearing the token, if you <a href="/test">test the ' + ' API request</a> again, you should go back to the auth flow.' + '</td></tr></table>')
def credentials_to_dict(credentials):
return {'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'scopes': credentials.scopes}

So now we need a function that will actually make a query to the API. The tokens and ids will be pulled directly from the since the user authenticated through their Google account already.

def make_query(client_ids, local=False):
print('IM IN MAKE QUERY')
if local:
scopes = 'https://www.googleapis.com/auth/analytics.readonly'
# Setup the API
store = file.Storage('credentials/credentials.json')
credentials = store.get()
if not credentials or credentials.invalid:
flow = OAuth2WebServerFlow(client_id=client_ids['installed']['client_id'],client_secret=client_ids['installed']['client_secret'],scope='https://www.googleapis.com/auth/analytics.readonly',redirect_uri='https://accounts.google.com/o/oauth2/token')
args = tools.argparser.parse_args()
args.noauth_local_webserver = True
credentials = tools.run_flow(flow, store, args)
access_token = flask.session['credentials']['token']
refresh_token = flask.session['credentials']['refresh_token']
client_id = flask.session['credentials']['client_id']
client_secret = flask.session['credentials']['client_secret']
token_uri = 'https://accounts.google.com/o/oauth2/token'
token_expiry = datetime.now() + timedelta(days = 10)
user_agent = 'my-user-agent/1.0'

credentials = client.GoogleCredentials(access_token=access_token, refresh_token=refresh_token,client_id=client_id, client_secret=client_secret,token_uri=token_uri, token_expiry=token_expiry,user_agent=user_agent)
#Initialize Http Protocol
http = lib2.Http()
#Authorize client
authorized = credentials.authorize(http)
api_name = 'analyticsreporting'
api_version = 'v4'
api_client = google_build(serviceName=api_name, version=api_version, http=authorized)
request_test = {
'viewId': 'your_view_id',
"pageToken": "0",
"pageSize": "100000",
'dateRanges': {
'startDate': datetime.strftime(pd.to_datetime('2019-02-01'),'%Y-%m-%d'),'endDate': datetime.strftime(pd.to_datetime('2019-02-28'),'%Y-%m-%d')},
'dimensions': [
{'name':'ga:date'},
{'name':'ga:Region'},
{'name':'ga:landingPagePath'},
{'name':'ga:exitPagePath'}
],
'metrics': [{'expression': 'ga:sessions'}]
}
response = api_client.reports().batchGet(
body={
'reportRequests': request_test
}).execute()
return response

‘your_view_id’ is the view id you can pull from your Analytics 360 account. Click on Settings > View Settings > View ID and then just copy and paste that number into here.

The response will come back as a json. You will need a function to parse this json and transform it into a dataframe, here is .

#Parse the response of API
def parse_response(report):
print('IM PARSING THE RESPONSE')
"""Parses and prints the Analytics Reporting API V4 response"""
#Initialize results, in list format because two dataframes might return
result_list = []

#Initialize empty data container for the two dateranges (if there are two that is)
data_csv = []
data_csv2 = []
#Initialize header rows
header_row = []
#Get column headers, metric headers, and dimension headers.
columnHeader = report.get('columnHeader', {})
metricHeaders = columnHeader.get('metricHeader', {}).get('metricHeaderEntries', [])
dimensionHeaders = columnHeader.get('dimensions', []) #Combine all of those headers into the header_row, which is in a list format for dheader in dimensionHeaders:
header_row.append(dheader)
for mheader in metricHeaders:
header_row.append(mheader['name'])
#Get data from each of the rows, and append them into a list
rows = report.get('data', {}).get('rows', [])
for row in rows:
row_temp = []
dimensions = row.get('dimensions', [])
metrics = row.get('metrics', [])
for d in dimensions:
row_temp.append(d)
for m in metrics[0]['values']:
row_temp.append(m)
data_csv.append(row_temp)
#In case of a second date range, do the same thing for the second request
if len(metrics) == 2:
row_temp2 = []
for d in dimensions:
row_temp2.append(d)
for m in metrics[1]['values']:
row_temp2.append(m)
data_csv2.append(row_temp2)
#Putting those list formats into pandas dataframe, and append them into the final result result_df = pd.DataFrame(data_csv, columns=header_row)
result_list.append(result_df)
if data_csv2 != []:
result_list.append(pd.DataFrame(data_csv2, columns=header_row))
return result_list[0]

Now we are pretty much ready to start building the visualization. We have a dataframe that represents the sessions of each Landing Page to Exit Page for every day in February broken out by Region. So one thing I can imagine building would be a visualization of the pathways from landing page to exit page and maybe we are interested in comparing everyone from California versus everyone else in the United States. A Sankey Diagram seems like a perfect implementation of this visualization, so ultimately we want our visualization to look like this for Plotly to understand our data structure.

So let’s first build a kind of ‘starter’ dataframe that if we ever want to make another visualization we can always start with this dataframe and finetune from there. This function takes the response from the API and transforms it into a ‘starter’ dataframe.

def get_starter_df(response):
print('IM STARTING THE DF')
response_data = response.get('reports', [])[0]
df = parse_response(response_data)
df = df.rename(columns = {'ga:Region':'Region',
'ga:date':'date',
'ga:landingPagePath':'landingPagePath',
'ga:exitPagePath':'exitPagePath',
'ga:sessions':'sessions'
})
regs, reg_codes = (df.Region.astype('category'),
df.Region.astype('category').cat.codes)
df2 = df.replace('(not set)',np.nan).bfill() df2['landingPagePath'] = df2.landingPagePath.apply(lambda x: x.split('/')[1] if (len(x.split('/')) > 1) else x[0]) df2['exitPagePath'] = df2.exitPagePath.apply(lambda x: x.split('/')[1] if (len(x.split('/')) > 1) else x[0]) df3 = pd.DataFrame({'date':df2.date,
'landingPagePath':df2.landingPagePath.astype('category').cat.codes, 'Region':df2.Region.astype('category').cat.codes, 'exitPagePath':df2.exitPagePath.astype('category').cat.codes, 'sessions':df2.sessions})
df3['sessions'] = df3.sessions.astype(int)
df3 = df3.groupby(['landingPagePath','exitPagePath','Region']).agg({'sessions':'sum', 'date':'min'}).reset_index()
reg_group = ['California']
regions_dict = {cat:code for code, cat in enumerate(df2['Region'].astype('category').cat.categories)}
reg_group_cats = [regions_dict[reg] for reg in reg_group]
df3['Region'] = np.where(df3['Region'].isin(reg_group_cats), 1, 0)
df3 = df3.groupby(['landingPagePath','exitPagePath','Region','date']).agg({'sessions':'sum'}).reset_index()
return df3, df2

Let’s make an app layout that we will create callbacks to update the data and visualization. Notice the API Authorization button has which will redirect the user to this url and then fire off the function listed above.

app.layout = html.Div([
html.Div([
html.H2('Analytics', id='sub-title'),
html.Button('Collect Data', id='button'),
html.Div(id='dfs_start', style={'display': 'none'}),
], className='row'),
html.A(html.Button('API Authorization'), href='/test'),
html.Div(id='output-df', style={'display': 'none'}),
html.Div([
dcc.Slider(
id='day-slider',
min=20190201,
max=20190228,
step=1,
value=20181017,
),
html.Div(id='output-container-slider')
]),
html.Div([
dcc.Graph(
id = 'sankey',
figure=dict(
data = [],
layout = go.Layout(
font = dict(
size = 10
),
)
),
),
]),
])

We want to use the starter df that we just wrote the function to create, but we will have to use the dataframe ‘on the fly’ per se. Meaning we can not just create the dataframe the first time the app gets run and then call it whenever we want. We need to store it somewhere in the page and then pull it down when we need it. So we will store the jsonifyed dataframe in a hidden Div tag in the page and then use it whenever the user clicks the ‘Collect Data’ button specified in the app.layout.

@app.callback(
Output('dfs_start','children'),
[Input('button','n_clicks')])
def get_starter_dfs(n_clicks):
print('IM IN STARTER DFS CALLBACK')
if n_clicks is None:
return pd.DataFrame().to_json(date_format='iso', orient='split')
response = make_query(client_ids)
df_start, df_check = get_starter_df(response)
return json.dumps({'df_start':df_start.to_json(date_format='iso', orient='split'), 'df_check':df_check.to_json(date_format='iso', orient='split')})

Plotly Dash buttons have an property that essentially is a counter that goes up +1 every time the user clicks the button. So we get to use that property to fire off this function if the counter in the button goes up by 1.

So now every time the user clicks the Collect Data button, the function gets called and the dataframe will be stores as a json in the children of the html.Div tag, which recall is a hidden Div tag: .

Now we can create a callback function for the visualization.

@app.callback(
Output('output-df','children'),
[Input('day-slider','value'),
Input('dfs_start','children'),
Input('button','n_clicks')])
def get_df_viz(day, df_dict, n_clicks):
print('IM GETTING DF VIZ')
if n_clicks is None:
return pd.DataFrame().to_json(date_format='iso', orient='split')
df_dict = json.loads(df_dict)
df = pd.read_json(df_dict['df_start'], orient='split')
df2 = pd.read_json(df_dict['df_check'], orient='split')
df4 = df[df.date.astype(str) == str(day)].rename(columns =
{'landingPagePath':'Source', 'exitPagePath':'Target', 'sessions':'Value'}).drop(columns = 'date')
df4.loc[df4.Region == 0,'Region'] = 'rgba(253, 227, 212, 0.5)'
df4.loc[df4.Region == 1,'Region'] = 'rgba(73, 148, 206, 1)'
colors = ['#F27420','#4994CE','#FABC13','#7FC241','#D3D3D3']
land_index = pd.Series(df2.landingPagePath.astype('category').cat.codes).drop_duplicates().index exit_index = pd.Series(df2.exitPagePath.astype('category').cat.codes).drop_duplicates().index codes = pd.DataFrame({'paths':list(df2.loc[land_index,'landingPagePath'].values) + list(df2.loc[exit_index,'exitPagePath'].values), 'codes':list(pd.Series(df2.landingPagePath.astype('category').cat.codes).drop_duplicates()) + list(pd.Series(df2.exitPagePath.astype('category').cat.codes).drop_duplicates())}) codes = codes.drop_duplicates(subset = 'codes').reset_index(drop=True).sort_values('codes') df4.loc[df4.Source == 0,'Source'] = np.max(codes.codes) + 1
df4.loc[df4.Source == 1,'Source'] = np.max(codes.codes) + 2
colors_full = list(np.repeat(colors, np.max(df4.Target) / len(colors) + 1)[:np.max(df4.Target)]) + [''] * len(df4) node_label = list(codes.paths) + [''] * len(df4) df_viz = pd.DataFrame({
'Date':df[df.date.astype(str) == str(day)].date,
'Source':df4.Source,
'Target':df4.Target,
'Value':df4.Value,
'Node, Label':node_label[:len(df4)],
'Color':colors_full[:len(df4)],
'Link Color':df4.Region
})
df_viz['Target'] = [np.max(df_viz.Target) + 1 if row.Target == row.Source else row.Target for i, row in df_viz.iterrows()] return df_viz.to_json(date_format='iso', orient='split')

Now the dataframe needed to produce the visualization is stored in the hidden Div tag so now we can create a callback that pulls this dataframe down and creates the actual visualization.

@app.callback(
Output('sankey','figure'),
[Input('day-slider','value'),
Input('output-df','children')])
def landing_sankey(day, jsonified_cleaned_data):
print('IM MAKING THE GRAPH')
df_viz = pd.read_json(jsonified_cleaned_data, orient='split')
if df_viz.empty:
return dict(data=[], layout=dict())
data_trace = dict(
type='sankey',
domain = dict(
x = [0,1],
y = [0,1]
),
orientation = "h",
valueformat = ".0f",
node = dict(
pad = 10,
thickness = 30,
line = dict(
color = "black",
width = 0
),
label = df_viz['Node, Label'].dropna(axis=0, how='any'),
color = df_viz['Color']
),
link = dict(
source = df_viz['Source'].dropna(axis=0, how='any'),
target = df_viz['Target'].dropna(axis=0, how='any'),
value = df_viz['Value'].dropna(axis=0, how='any'),
color = df_viz['Link Color'].dropna(axis=0, how='any'),
)
)
layout = dict(
font = dict(
size = 10
),
)
fig = dict(data=[data_trace], layout=layout) return fig

Then we can just add one more callback to indicate to the user which day is selected beneath the slider bar.

@app.callback(
Output('output-container-slider', 'children'),
[Input('day-slider', 'value')])
def update_output(value):
return 'You have selected
{}"'.format(str(pd.to_datetime(str(value), format='%Y%m%d')))

Finally we can just add the hostname and port (defaults to 8050). Note: this will change soon.

if __name__ == '__main__':
app.run_server(host='0.0.0.0', debug=True)

Now that we have the app running locally we want to push it up to an ec2 instance to run for others to use.

First you need to spin up an ec2 instance with your AWS account. Then just get the permission file and place it in your working directory.

We can Dockerize our application and then push the image up to an AWS ECR and then we will pull that image down to our ec2 instance. The benefit of using the ECR is that we can version control our application, and if we are serving other applications we can simply maintain all of our apps for production. We will also utilize docker-compose which simply allows us to pull down and push up our applications with much fewer commands.

Here is an example file:

version: "3"
services:
my-web-app:
image: aws_account_id.dkr.ecr.region.amazonaws.com/my-web-app
ports:
- "8050:8050"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8050"]
interval: 15s
timeout: 15s
retries: 3

We can place this in our AWS ec2 instance and will use this with the docker-compose commands each time we pull down/push up the application. I just keep it in the ec2’s home directory ~/.

Now we need a Dockerfile to containerize the application. This is where things get a bit tricky. I realized that everything worked fine when I ran the application locally, but then when I started hosting the application on the ec2 instance I was experiencing a lot of latency issues and ultimately the docker image’s status was unhealthy (you can check this by running in the console). I realized that the issue was the application points to the localhost, but when it is running inside of the Docker container, Docker points localhost to both ipv6 AND ipv4.

$ cat /etc/hosts
172.17.1.112 27392a3e0fa5
127.0.0.1 localhost
::1 localhost ip6-localhost ip6-loopback

I then started googling and found this tutorial which explained how to use nginx to block localhost pointing to ::1.

By adding an config file

location / { try_files $uri @project; }
location @project {
include uwsgi_params;
uwsgi_pass unix:/tmp/uwsgi.sock;
}

with a couple of extra

flup6==1.1.1
uwsgi==2.0.18

and altering the slightly to add the command

CMD uwsgi -s /tmp/uwsgi.sock -w project:app --chown-socket=www-data:www-data --enable-threads & \
nginx -g 'daemon off;'

We can now run in the Docker container from AWS with no latency issues. Here is an example to use.

FROM ubuntu:xenialRUN apt-get update && \
apt-get install -y software-properties-common build-essential
nginx python3-pip nano && \
apt-get clean
COPY . .RUN pip3 install --upgrade pipRUN pip3 install -r requirements.txtEXPOSE 8050COPY nginx /etc/nginxCMD uwsgi -s /tmp/uwsgi.sock -w . --chown-socket=www-data:www-data --enable-threads & \
nginx -g 'daemon off;'
CMD gunicorn -w 10 -b 0.0.0.0:8050 -t 100000 --max-requests 20 app:serverCMD ["python3", "application.py"]

Locally, you should have a hidden directory which contains the and .

[default]
aws_access_key_id = YOUR_KEY_HERE
aws_secret_access_key = YOUR_SECRET_KEY_HERE

Then when you login to your AWS ECR account you should have access to push up your new Docker image.

$ $(aws ecr get-login --no-include-email --region us-west-2)

Now we just need to build the Docker container inside of your application directory. You don’t have to specify a version but you can by adding name_of_container:1.0. Without specifying a version it will be referred to as latest.

$ docker build -t name_of_container .

Now we need to tag our container to our ECR. Again we can version control by adding the version at the end

$ docker tag name_of_container:latest aws_account_id.dkr.ecr.region.amazonaws.com/name_of_container:1.0

Now we can just push up the image to the ECR

$ docker push aws_account_id.dkr.ecr.region.amazonaws.com/name_of_container:1.0

SSH into your ec2 instance

$ ssh -i "aws.pem" ubuntu@34.00.000.000

You should have your file here. Then you can just run a few commands to pull down the image.

$ docker-compose pull

Then we can test out the app by running

$ docker-compose up

And if everything runs ok with no errors, we can run…

BUT WAIT!

Remember I said we would have to change the hostname before? Google APIs do not allow connections from IP addresses or anything with a port specified. So here is the final workaround:

Insted of specifying the host as we can instead specify it as and it will still resolve to and Google API will never know the difference.

So at the end of our application.py we can change it to

if __name__ == '__main__':
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
app.run_server(host='test.0.0.0.0.xip.io', debug=True, threaded=True)

Ultimately you will want to host your application with a domain name and https. is because Google API does not allow connection from http:// only https://.

Now we can run the final command to host the application on your instance

$ docker-compose up -d

Navigate to http://test.aws_public_ip.xip.io in your browser and you should be ready to go!

RAND Researcher. Based out of Los Angeles. @ChaykowskyMike linkedin.com/in/chaykowsky

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store