Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
James Alexander Clark PhD
grid-exerciser
Commits
6a7ee206
Commit
6a7ee206
authored
Oct 21, 2022
by
PRADEEP JASAL
Browse files
Upload New File
parent
eced6528
Changes
1
Hide whitespace changes
Inline
Side-by-side
es_loganalysis/app.py
0 → 100644
View file @
6a7ee206
#!/usr/bin/env python3
from
elasticsearch
import
Elasticsearch
from
elasticsearch.helpers
import
scan
import
datetime
import
json
import
csv
import
argparse
## import dask libraries
import
dask.dataframe
as
dd
import
pandas
as
pd
class
OptionParser
:
def
__init__
(
self
):
"""User based option parser"""
self
.
parser
=
argparse
.
ArgumentParser
(
prog
=
"PROG"
)
self
.
parser
.
add_argument
(
"--host"
,
action
=
"store"
,
dest
=
"host"
,
default
=
""
,
help
=
"elastic search host"
,
required
=
True
,
)
self
.
parser
.
add_argument
(
"--port"
,
action
=
"store"
,
dest
=
"port"
,
default
=
""
,
help
=
"listening port of elasticsearch host for quering"
,
required
=
True
,
)
self
.
parser
.
add_argument
(
"--username"
,
action
=
"store"
,
dest
=
"username"
,
default
=
""
,
help
=
"username for elasticsearch query"
,
required
=
True
,
)
self
.
parser
.
add_argument
(
"--password"
,
action
=
"store"
,
dest
=
"password"
,
default
=
""
,
help
=
"password for elasticsearch query"
,
required
=
True
,
)
self
.
parser
.
add_argument
(
"--days"
,
action
=
"store"
,
dest
=
"days"
,
default
=
"1"
,
help
=
"no. of days of data you want to analyse from elasticsearch "
,
required
=
True
,
)
self
.
parser
.
add_argument
(
"--index"
,
action
=
"store"
,
dest
=
"index"
,
default
=
False
,
help
=
"input the index name of elasticsearch"
,
required
=
True
,
)
class
Elastic
:
def
__init__
(
self
,
host
,
port
,
username
,
password
,
index
,
days
,
attributes
):
self
.
host
=
host
self
.
port
=
port
self
.
username
=
username
self
.
password
=
password
self
.
days
=
days
hosts
=
self
.
host
+
":"
+
str
(
self
.
port
)
# changed client api since version 8.4.3
self
.
client
=
Elasticsearch
(
[
hosts
],
basic_auth
=
(
self
.
username
,
self
.
password
)
)
self
.
index
=
index
start_ts
=
(
datetime
.
date
.
today
()
-
datetime
.
timedelta
(
self
.
days
)).
strftime
(
"%s"
)
end_ts
=
datetime
.
datetime
.
now
().
strftime
(
"%s"
)
self
.
headers
=
attributes
self
.
query
=
{
"_source"
:
self
.
headers
,
"query"
:
{
"range"
:
{
"RecordTime"
:
{
"gte"
:
start_ts
,
"lt"
:
end_ts
}
}
}
}
def
load
(
self
):
# returns a generator of elasticsearch query
try
:
if
self
.
client
.
indices
.
exists
(
index
=
self
.
index
):
print
(
f
"you are quering index
{
self
.
index
}
"
)
except
Exception
as
e
:
print
(
"index doesn't exists"
)
try
:
print
(
self
.
client
)
gen_result
=
scan
(
self
.
client
,
query
=
self
.
query
,
index
=
self
.
index
)
except
:
print
(
"scan api not working"
)
return
gen_result
def
writer
(
self
,
elastic_query_generator
):
# takes elastic generator function and writes json object to csv file
with
open
(
'data.csv'
,
'w'
)
as
csvfile
:
# Just use 'w' mode in 3.x
writer
=
csv
.
DictWriter
(
csvfile
,
fieldnames
=
self
.
headers
)
writer
.
writeheader
()
try
:
for
doc
in
elastic_query_generator
:
data_dict
=
doc
[
'_source'
]
writer
.
writerow
(
data_dict
)
except
StopIteration
:
pass
return
None
def
JobDataAnalyser
(
filename
,
attribute_list
)
->
pd
.
DataFrame
:
try
:
with
open
(
'data.csv'
,
'r'
,
newline
=
''
)
as
csvfile
:
reader
=
csv
.
DictReader
(
csvfile
)
fileheaders
=
list
(
reader
.
fieldnames
)
if
fileheaders
==
attribute_list
:
pass
except
:
print
(
"attribute is missing in the filename"
)
df
=
(
dd
.
read_csv
(
filename
,
assume_missing
=
True
,
blocksize
=
'64MB'
,
low_memory
=
False
,
dtype
=
{
'ExitCode'
:
'Float64'
,
'RecordTime'
:
'int64'
,
}
)
).
rename
(
columns
=
lambda
x
:
x
.
replace
(
'@'
,
''
))
# success percentage calculation for jobs at each site
day_data
=
df
[
'RecordTime'
].
apply
(
lambda
x
:
1
if
(
x
>
int
((
datetime
.
date
.
today
()
-
datetime
.
timedelta
(
days
=
1
)).
strftime
(
"%s"
)))
&
(
x
<
int
(
datetime
.
datetime
.
now
().
strftime
(
"%s"
)))
else
0
,
meta
=
pd
.
Series
(
dtype
=
"bool"
))
week_data
=
df
[
'RecordTime'
].
apply
(
lambda
x
:
1
if
(
x
>
int
((
datetime
.
date
.
today
()
-
datetime
.
timedelta
(
weeks
=
7
)).
strftime
(
"%s"
)))
&
(
x
<
int
(
datetime
.
datetime
.
now
().
strftime
(
"%s"
)))
else
0
,
meta
=
pd
.
Series
(
dtype
=
"bool"
))
month_data
=
df
[
'RecordTime'
].
apply
(
lambda
x
:
1
if
(
x
>
int
((
datetime
.
date
.
today
()
-
datetime
.
timedelta
(
weeks
=
4
)).
strftime
(
"%s"
)))
&
(
x
<
int
(
datetime
.
datetime
.
now
().
strftime
(
"%s"
)))
else
0
,
meta
=
pd
.
Series
(
dtype
=
"bool"
))
# filter for ligosearchtag , and aggregate based on time intervals
ddf
=
(
df
.
drop_duplicates
()
.
loc
[(
df
.
ligosearchtag
==
"cit.scheduled.test"
)
|
(
df
.
ligosearchtag
==
"cit.oneshot.test"
)]
.
assign
(
day
=
day_data
)
.
assign
(
week
=
week_data
)
.
assign
(
month
=
month_data
)
.
groupby
([
'match_exp_job_site'
,
'ExitCode'
])
.
agg
({
"day"
:
"sum"
,
"week"
:
"sum"
,
"month"
:
"sum"
})
.
reset_index
()
)
result
=
(
ddf
.
assign
(
job_efficiency_day
=
100
*
ddf
[
'day'
]
/
ddf
.
groupby
(
'match_exp_job_site'
)[
'day'
].
transform
(
'sum'
,
meta
=
pd
.
Series
(
dtype
=
"float64"
)))
.
assign
(
job_efficiency_week
=
100
*
ddf
[
'week'
]
/
ddf
.
groupby
(
'match_exp_job_site'
)[
'week'
].
transform
(
'sum'
,
meta
=
pd
.
Series
(
dtype
=
"float64"
)))
.
assign
(
job_efficiency_month
=
100
*
ddf
[
'month'
]
/
ddf
.
groupby
(
'match_exp_job_site'
)[
'month'
].
transform
(
'sum'
,
meta
=
pd
.
Series
(
dtype
=
"float64"
)))
.
round
(
1
)
.
loc
[(
ddf
.
ExitCode
==
0
)]
.
sort_values
(
'job_efficiency_day'
)
.
reset_index
(
drop
=
True
)
.
drop
(
columns
=
[
'day'
,
'week'
,
'month'
])
)
result_df
=
result
.
compute
()
return
result_df
def
color_negative_red
(
value
):
"""
Colors elements in a dateframe
green if positive and red if
negative. Does not color NaN
values.
"""
if
value
<
80
:
color
=
'red'
elif
value
>
80
:
color
=
'green'
else
:
color
=
'black'
return
'color: %s'
%
color
def
main
():
optmgr
=
OptionParser
()
opts
=
optmgr
.
parser
.
parse_args
()
host
=
opts
.
host
port
=
opts
.
port
index
=
opts
.
index
days
=
opts
.
days
username
=
opts
.
username
password
=
opts
.
password
attribute_list
=
[
"RecordTime"
,
"ligosearchtag"
,
"ExitCode"
,
"LastJobStatus"
,
"Status"
,
"Owner"
,
"ligosearchuser"
,
"job_site"
,
"match_exp_job_site"
,
"@timestamp"
]
#creates elasticsearch class object to create client which talks to elasticsearch host and gets data
es
=
Elastic
(
host
=
host
,
port
=
port
,
username
=
username
,
password
=
password
,
days
=
int
(
days
),
index
=
index
,
attributes
=
attribute_list
)
#generator function return json object for the queried data
result
=
es
.
load
()
es
.
writer
(
result
)
result_df
=
JobDataAnalyser
(
"data.csv"
,
attribute_list
)
html
=
(
result_df
.
style
.
applymap
(
color_negative_red
,
subset
=
[
'job_efficiency_day'
,
'job_efficiency_week'
,
'job_efficiency_month'
])
.
format
(
precision
=
0
)
.
to_html
()
)
try
:
with
open
(
"index.html"
,
"w"
)
as
file
:
file
.
write
(
html
)
except
:
print
(
"Problem writing html file to storage"
)
if
__name__
==
'__main__'
:
main
()
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment