Skip to content

Commit

Permalink
Celery Redis Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
balu-ce committed Apr 11, 2020
1 parent 4511ac1 commit 4435bcd
Show file tree
Hide file tree
Showing 33 changed files with 7,539 additions and 65 deletions.
4 changes: 4 additions & 0 deletions .idea/dictionaries/balu.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 8 additions & 25 deletions Preprocessing_Scripts/RFM_calculation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json

import pandas as pd
from datetime import datetime
from sklearn.cluster import KMeans

class RFM_Analysis:
def __init__(self, data):
self.df = pd.DataFrame.from_dict(data)
print(self.df)

@staticmethod
def order_cluster(cluster_field_name, target_field_name, data_frame, ascending):
Expand All @@ -17,34 +18,21 @@ def order_cluster(cluster_field_name, target_field_name, data_frame, ascending):
df_final = df_final.rename(columns={"index": cluster_field_name})
return df_final

async def rfm_calc(self):
def rfm_calc(self):
df = self.df
df['Order_Date'] = pd.to_datetime(df['Order_Date'])
tx_1y = df[(df.Order_Date <= datetime(2017, 12, 31)) & (df.Order_Date >= datetime(2017, 1, 1))].reset_index(
drop=True)
tx_1y['Sales'] = df['Sales'].astype(float)
tx_user = pd.DataFrame(tx_1y['Customer_ID'].unique())
tx_user.columns = ['Customer_ID']
tx_next_purchase = df.drop_duplicates(['Customer_ID', 'Order_Date'], keep='last')
tx_max = tx_next_purchase.groupby('Customer_ID').Order_Date.nlargest(2).reset_index()
tx_next_first_purchase = tx_max.groupby('Customer_ID').Order_Date.min().reset_index()
tx_next_first_purchase.columns = ['Customer_ID', 'MinPurchaseDate']
tx_last_purchase = tx_max.groupby('Customer_ID').Order_Date.max().reset_index()
tx_last_purchase.columns = ['Customer_ID', 'MaxPurchaseDate']
tx_purchase_dates = pd.merge(tx_last_purchase, tx_next_first_purchase, on='Customer_ID', how='left')
# calculate the time difference in days:
tx_purchase_dates['NextPurchaseDay'] = (
tx_purchase_dates['MaxPurchaseDate'] - tx_purchase_dates['MinPurchaseDate']).dt.days
# merge with tx_user
tx_user = pd.merge(tx_user, tx_purchase_dates[['Customer_ID', 'NextPurchaseDay']], on='Customer_ID', how='left')
tx_user.sort_values("NextPurchaseDay", axis=0, ascending=True, inplace=True, na_position='last')

tx_user.columns = ['Customer_ID']
# FEATURE ENGINEERING
tx_max_purchase = tx_1y.groupby('Customer_ID').Order_Date.max().reset_index()
tx_max_purchase.columns = ['Customer_ID', 'MaxPurchaseDate']

tx_max_purchase['Recency'] = (
tx_max_purchase['MaxPurchaseDate'].max() - tx_max_purchase['MaxPurchaseDate']).dt.days
tx_max_purchase['MaxPurchaseDate'].max() - tx_max_purchase['MaxPurchaseDate']).dt.days
tx_user = pd.merge(tx_user, tx_max_purchase[['Customer_ID', 'Recency']], on='Customer_ID')

kmeans = KMeans(n_clusters=4)
Expand Down Expand Up @@ -106,11 +94,6 @@ async def rfm_calc(self):
tx_day_order_last = tx_day_order_last.dropna()
tx_day_order_last = pd.merge(tx_day_order_last, tx_day_diff, on='Customer_ID')

tx_user = pd.merge(tx_user, tx_day_order_last, on='Customer_ID', how='right')

print(tx_user.head(5))
final_out = tx_user.to_json()[1:-1].replace('},{', '} {')
print("Final Out")
print(final_out)
return final_out

tx_user = pd.merge(tx_user, tx_day_order_last[
['Customer_ID', 'DayDiff', 'DayDiff2', 'DayDiff3', 'DayDiffMean', 'DayDiffStd']], on='Customer_ID')
return json.loads(tx_user.reset_index().to_json(orient='records'))
2 changes: 1 addition & 1 deletion QueryBuilder/sales_forecast_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def get_avg_query():
"query": {
"range": {
"Order_Date": {
"lte": "31/12/2016"
"lte": "2015-12-31"
}
}
},
Expand Down
3 changes: 0 additions & 3 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from . import db_operations
from . import model_predict

5 changes: 5 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from celery import Celery
def make_celery(app_name=__name__):
redis_uri = 'redis://127.0.0.1:6379/0'
return Celery(app_name, backend=redis_uri, broker=redis_uri)
celery = make_celery()
10 changes: 10 additions & 0 deletions app/celery_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
def init_celery(celery, app):
celery.conf.update(app.config)
TaskBase = celery.Task

class ContextTask(TaskBase):
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask
13 changes: 13 additions & 0 deletions app/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from flask import Flask

from app.celery_utils import init_celery


def create_app(app_name, **kwargs):
app = Flask(app_name)
if kwargs.get("celery"):
init_celery(kwargs.get("celery"), app)
from app.routes import bp
app.register_blueprint(bp)
print("Reached")
return app
29 changes: 10 additions & 19 deletions app.py → app/routes.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import json

from flask import Flask, request

from flask import Blueprint, request, jsonify, url_for
import numpy as np
from IngestionLayer.elastic_forecast_ingest import Elasticsearch_ingest
from Preprocessing_Scripts.RFM_calculation import RFM_Analysis
from Preprocessing_Scripts.denormalization import DeNormalization
from Preprocessing_Scripts.normalization import Normalization
from db_operations.elasticsearch_oper import Elastic_Sales_Connect
import numpy as np
import requests

from app.tasks import rfm_analysis
from db_operations.elasticsearch_oper import Elastic_Sales_Connect
from loggers.es_logger import ES_Logger

app = Flask(__name__)
bp = Blueprint("sales_analysis", __name__)

Tensorflow_Base_URL = "http://localhost:8501/v1/models/"


@app.route('/')
@bp.route('/')
def hello():
return "Hello World!"


@app.route('/forecast_ingest_pipeline')
@bp.route('/forecast_ingest_pipeline')
def ingest_pipeline():
interval = request.args.get('interval', default='monthly', type=str)
es = Elastic_Sales_Connect() # setup a elasticsearch connection
Expand All @@ -43,14 +41,7 @@ def ingest_pipeline():
return "Data Ingested Successfully"


@app.route('/rfm_ingest_pipeline')
@bp.route('/rfm_ingest_pipeline')
def ingest_rfm_value():
es = Elastic_Sales_Connect() # setup a elasticsearch connection
sales_data = [hit["_source"] for hit in es.elastic_get_sales_data()]
rfm_data = RFM_Analysis(sales_data)
rfm_data.rfm_calc()
return 'Test'


if __name__ == '__main__':
app.run(debug=True)
task = rfm_analysis.apply_async()
return task.id
20 changes: 20 additions & 0 deletions app/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import time

from Preprocessing_Scripts.RFM_calculation import RFM_Analysis
from app import celery
from db_operations.elasticsearch_oper import Elastic_Sales_Connect


@celery.task(bind=True)
def rfm_analysis(self):
self.update_state(state='PROGRESS', meta={'status': 'Collecting Sales Data'})
es = Elastic_Sales_Connect() # setup a elasticsearch connection
sales_data = [hit["_source"] for hit in es.elastic_get_sales_data()]
self.update_state(state='PROGRESS', meta={'status': 'Processing Customer Segmentation'})
rfm_data = RFM_Analysis(sales_data)
time.sleep(40)
data = rfm_data.rfm_calc()
self.update_state(state='PROGRESS', meta={'status': 'Refreshing Customer Index'})
es.ingest_es_rmf_data(data)
return {'status': 'Customer List Ingestion completed!'}

7 changes: 7 additions & 0 deletions celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from app import celery
from app.factory import create_app
from app.celery_utils import init_celery
import app

app = create_app('sales_analysis', celery=app.celery)
init_celery(celery, app)
21 changes: 21 additions & 0 deletions db_operations/elastic_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import configparser
config = configparser.ConfigParser()
configFilePath = 'elastic_config.ini'
config.read(configFilePath)

class Elastic_Index_Config:
@staticmethod
def get_sales_index():
return config['ELASTICSEARCH']['sales_data_index']

@staticmethod
def get_sales_forecast_index():
return config['ELASTICSEARCH']['forecast_index']

@staticmethod
def get_sales_rfm_index():
return config['ELASTICSEARCH']['rfm_index']

@staticmethod
def get_business_unit_key():
return config['ELASTICSEARCH']['business_unit']
8 changes: 0 additions & 8 deletions db_operations/elasticsearch_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,3 @@ def elastic_sales_conn():
print(ex)
ES_Logger.error_logs("Error in connecting to Elasticsearch")
return es

@staticmethod
def get_sales_index():
return config['ELASTICSEARCH']['sales_data_index']

@staticmethod
def get_sales_forecast_index():
return config['ELASTICSEARCH']['forecast_index']
30 changes: 25 additions & 5 deletions db_operations/elasticsearch_oper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from QueryBuilder.sales_forecast_builder import Sales_Query_Builder
from db_operations.elastic_config import Elastic_Index_Config
from db_operations.elasticsearch_conn import Elastic_Conn_Sales_data
from loggers.es_logger import ES_Logger
from elasticsearch import Elasticsearch, helpers
from elasticsearch import helpers

class Elastic_Sales_Connect:
def __init__(self):
Expand All @@ -10,23 +11,23 @@ def __init__(self):
def elastic_get_forecast_data(self):
try:
res = self.es.search(
index=Elastic_Conn_Sales_data.get_sales_index(),
index=Elastic_Index_Config.get_sales_index(),
body=Sales_Query_Builder.get_forecast_query())
return res
except Exception as ex:
ES_Logger.error_logs(str(ex))

def elastic_get_sales_data(self):
try:
res = list(helpers.scan(self.es, size=1000, scroll='1m', query=Sales_Query_Builder.get_all_sales_query()))
res = list(helpers.scan(self.es, size=10000, scroll='1m', query=Sales_Query_Builder.get_all_sales_query()))
return res
except Exception as ex:
ES_Logger.error_logs(str(ex))

def elastic_get_sales_stats(self):
try:
res = self.es.search(
index=Elastic_Conn_Sales_data.get_sales_index(),
index=Elastic_Index_Config.get_sales_index(),
body=Sales_Query_Builder.get_avg_query())
return res
except Exception as ex:
Expand All @@ -35,6 +36,25 @@ def elastic_get_sales_stats(self):
def ingest_es_forecast_data(self, data):
try:
for x in data:
self.es.index(index=Elastic_Conn_Sales_data.get_sales_forecast_index(), body=x, doc_type="_doc")
self.es.index(index=Elastic_Index_Config.get_sales_forecast_index(), body=x, doc_type="_doc")
except Exception as ex:
ES_Logger.error_logs(str(ex))

def ingest_es_rmf_data(self, data):
try:
bulk_data = [
{
"_index": Elastic_Index_Config.get_sales_rfm_index(),
"_type": "_doc",
"_id": str(source['Customer_ID']),
"_source": source
}
for source in data
]
helpers.bulk(self.es, bulk_data)
return "Ingested Successfully"

# for x in data:
# self.es.index(index=Elastic_Conn_Sales_data.get_sales_rfm_index(), body=x, id=x['Customer_ID'], doc_type="_doc")
except Exception as ex:
ES_Logger.error_logs(str(ex))
3 changes: 2 additions & 1 deletion elastic_config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ password = ''
ssl = no
sales_data_index = sales_data
forecast_index = sales_forecast
rfm_index = rfm_analysis
rfm_index = rfm_data
business_unit = City
Binary file modified elasticsearch.log
Binary file not shown.
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,10 @@ wrapt==1.12.1
xkit==0.0.0
zipp==3.1.0
zope.interface==4.3.2

celery~=4.4.2
scipy~=1.4.1
joblib~=0.14.1
pandas~=1.0.3
sklearn~=0.0
scikit-learn~=0.22.2.post1
5 changes: 5 additions & 0 deletions run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from app import factory
import app
if __name__ == "__main__":
app = factory.create_app("sales_analysis", celery=app.celery)
app.run(debug=True)
2 changes: 1 addition & 1 deletion venv/lib/python3.6/site-packages/flask/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class Flask(_PackageBoundObject):
using a package, it's usually recommended to hardcode the name of
your package there.
For example if your application is defined in :file:`yourapplication/app.py`
For example if your application is defined in :file:`yourapplication/app_copy.py`
you should create it with one of the two versions below::
app = Flask('yourapplication')
Expand Down
4 changes: 2 additions & 2 deletions venv/lib/python3.6/site-packages/flask/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def load_app(self):
import_name = prepare_import(path)
app = locate_app(self, import_name, name)
else:
for path in ("wsgi.py", "app.py"):
for path in ("wsgi.py", "app_copy.py"):
import_name = prepare_import(path)
app = locate_app(self, import_name, None, raise_if_not_found=False)

Expand All @@ -398,7 +398,7 @@ def load_app(self):
raise NoAppException(
"Could not locate a Flask application. You did not provide "
'the "FLASK_APP" environment variable, and a "wsgi.py" or '
'"app.py" module was not found in the current directory.'
'"app_copy.py" module was not found in the current directory.'
)

if self.set_debug_flag:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pip
22 changes: 22 additions & 0 deletions venv/lib/python3.6/site-packages/redis-3.4.1.dist-info/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Copyright (c) 2012 Andy McCurdy

Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
Loading

0 comments on commit 4435bcd

Please sign in to comment.