Commit 280538e7 authored by PRADEEP JASAL's avatar PRADEEP JASAL
Browse files

Upload New File

parent eced6528
#!/usr/bin/env python3
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import datetime
import json
import csv
import argparse
## import dask libraries
import dask.dataframe as dd
import pandas as pd
class OptionParser:
def __init__(self):
"""User based option parser"""
self.parser = argparse.ArgumentParser(prog="PROG")
self.parser.add_argument(
"--host",
action="store",
dest="host",
default="",
help="elastic search host",
required=True,
)
self.parser.add_argument(
"--port",
action="store",
dest="port",
default="",
help="listening port of elasticsearch host for quering",
required=True,
)
self.parser.add_argument(
"--username",
action="store",
dest="username",
default="",
help="username for elasticsearch query",
required=True,
)
self.parser.add_argument(
"--password",
action="store",
dest="password",
default="",
help="password for elasticsearch query",
required=True,
)
self.parser.add_argument(
"--days",
action="store",
dest="days",
default="1",
help="no. of days of data you want to analyse from elasticsearch ",
required=True,
)
self.parser.add_argument(
"--index",
action="store",
dest="index",
default=False,
help="input the index name of elasticsearch",
required=True,
)
class Elastic:
def __init__(self, host, port, username, password, index, days, attributes):
self.host = host
self.port = port
self.username = username
self.password = password
self.days=days
hosts = self.host + ":" + str(self.port) # changed client api since version 8.4.3
self.client = Elasticsearch(
[hosts],
basic_auth=(self.username, self.password)
)
self.index = index
start_ts = (datetime.date.today() - datetime.timedelta(self.days)).strftime("%s")
end_ts = datetime.datetime.now().strftime("%s")
self.headers = attributes
self.query = {
"_source" : self.headers,
"query": {
"range": {
"RecordTime": {
"gte": start_ts,
"lt": end_ts
}
}
}
}
def load(self):
# returns a generator of elasticsearch query
try :
if self.client.indices.exists(index=self.index):
print(f"you are quering index {self.index}")
except Exception as e:
print("index doesn't exists")
try:
print(self.client)
gen_result = scan(self.client,
query=self.query,
index=self.index
)
except:
print("scan api not working")
return gen_result
def writer(self, elastic_query_generator):
# takes elastic generator function and writes json object to csv file
with open('data.csv', 'w') as csvfile: # Just use 'w' mode in 3.x
writer = csv.DictWriter(csvfile, fieldnames=self.headers)
writer.writeheader()
try:
for doc in elastic_query_generator:
data_dict = doc['_source']
writer.writerow(data_dict)
except StopIteration:
pass
return None
def JobDataAnalyser(filename , attribute_list) -> pd.DataFrame :
try :
with open('data.csv', 'r', newline='') as csvfile:
reader = csv.DictReader(csvfile)
fileheaders = list(reader.fieldnames)
if fileheaders == attribute_list:
pass
except:
print("attribute is missing in the filename")
df = (dd.read_csv(
filename,
assume_missing=True,
blocksize='64MB',
low_memory=False,
dtype={
'ExitCode': 'Float64',
'RecordTime' : 'int64',
}
)
).rename(columns = lambda x:x.replace('@',''))
# success percentage calculation for jobs at each site
day_data = df['RecordTime'].apply(lambda x: 1 if (x > int((datetime.date.today() - datetime.timedelta(days=1)).strftime("%s"))) & (x < int(datetime.datetime.now().strftime("%s"))) else 0 , meta=pd.Series(dtype="bool"))
week_data = df['RecordTime'].apply(lambda x: 1 if (x > int((datetime.date.today() - datetime.timedelta(weeks=7)).strftime("%s"))) & (x < int(datetime.datetime.now().strftime("%s"))) else 0 , meta=pd.Series(dtype="bool"))
month_data = df['RecordTime'].apply(lambda x: 1 if (x > int((datetime.date.today() - datetime.timedelta(weeks=4)).strftime("%s"))) & (x < int(datetime.datetime.now().strftime("%s"))) else 0 , meta=pd.Series(dtype="bool"))
# filter for ligosearchtag , and aggregate based on time intervals
ddf = (df
.drop_duplicates()
.loc[(df.ligosearchtag == "cit.scheduled.test") | (df.ligosearchtag == "cit.oneshot.test")]
.assign(day=day_data)
.assign(week=week_data)
.assign(month=month_data)
.groupby(['match_exp_job_site', 'ExitCode'])
.agg({"day": "sum", "week": "sum" , "month": "sum"})
.reset_index()
)
result = (ddf
.assign(job_efficiency_day = 100 * ddf['day'] / ddf.groupby('match_exp_job_site')['day'].transform('sum', meta=pd.Series(dtype="float64")))
.assign(job_efficiency_week = 100 * ddf['week'] / ddf.groupby('match_exp_job_site')['week'].transform('sum', meta=pd.Series(dtype="float64")))
.assign(job_efficiency_month = 100 * ddf['month'] / ddf.groupby('match_exp_job_site')['month'].transform('sum', meta=pd.Series(dtype="float64")))
.round(1)
.loc[(ddf.ExitCode == 0)]
.sort_values('job_efficiency_day')
.reset_index(drop=True)
.drop(columns=['day', 'week','month'])
)
result_df = result.compute()
return result_df
def color_negative_red(value):
"""
Colors elements in a dateframe
green if positive and red if
negative. Does not color NaN
values.
"""
if value < 80:
color = 'red'
elif value > 80:
color = 'green'
else:
color = 'black'
return 'color: %s' % color
def main():
optmgr = OptionParser()
opts = optmgr.parser.parse_args()
host = opts.host
port = opts.port
index = opts.index
days = opts.days
username = opts.username
password = opts.password
attribute_list = ["RecordTime", "ligosearchtag", "ExitCode", "LastJobStatus", "Status" , "Owner" , "ligosearchuser","job_site","match_exp_job_site","@timestamp"]
#creates elasticsearch class object to create client which talks to elasticsearch host and gets data
es = Elastic(host=host, port=port, username=username, password=password, days=int(days), index=index, attributes=attribute_list)
#generator function return json object for the queried data
result = es.load()
es.writer(result)
result_df = JobDataAnalyser("data.csv", attribute_list)
html = (result_df
.style
.applymap(color_negative_red, subset=['job_efficiency_day','job_efficiency_week','job_efficiency_month'])
.format(precision=0)
.to_html()
)
try :
with open("index.html" , "w") as file:
file.write(html)
except:
print("Problem writing html file to storage")
if __name__ == '__main__':
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment