init sources
This commit is contained in:
123
src/DoresA/serialize_logs_to_db.py
Normal file
123
src/DoresA/serialize_logs_to_db.py
Normal file
@@ -0,0 +1,123 @@
|
||||
import csv
|
||||
import gzip
|
||||
import glob
|
||||
import time
|
||||
import datetime
|
||||
import pandas
|
||||
from progress.bar import Bar
|
||||
|
||||
import db
|
||||
|
||||
analysis_start_date = datetime.date(2017, 4, 7)
|
||||
analysis_days_amount = 3
|
||||
|
||||
# e.g. analysis_days = ['2017-04-07', '2017-04-08', '2017-04-09']
|
||||
analysis_days = [(analysis_start_date + datetime.timedelta(days=x)).strftime('%Y-%m-%d') for x in range(analysis_days_amount)]
|
||||
|
||||
# mongodb
|
||||
|
||||
# mariadb
|
||||
|
||||
|
||||
def main():
|
||||
check_duplicates()
|
||||
start = time.time()
|
||||
|
||||
distinct_ttl_count = {}
|
||||
# everything = {}
|
||||
|
||||
# for log_file in ['data/pdns_capture.pcap-sgsgpdc0n9x-2017-04-07_00-00-02.csv.gz']:
|
||||
|
||||
for day in range(analysis_days_amount):
|
||||
log_files_hour = get_log_files_for_hours_of_day(analysis_days[day])
|
||||
# everything[day] = {}
|
||||
|
||||
progress_bar = Bar(analysis_days[day], max=24)
|
||||
|
||||
for hour in range(24):
|
||||
progress_bar.next()
|
||||
# everything[day][hour] = {}
|
||||
for hour_files in log_files_hour[hour]:
|
||||
with gzip.open(hour_files, 'rt', newline='') as file:
|
||||
reader = csv.reader(file)
|
||||
all_rows = list(reader)
|
||||
|
||||
# batch mode (batches of 1000 entries)
|
||||
for log_entries in batch(all_rows, 1000):
|
||||
db.mariadb_insert_logs(log_entries)
|
||||
db.mongodb_insert_logs(log_entries)
|
||||
|
||||
# single mode
|
||||
# for log_entry in reader:
|
||||
# db.mariadb_insert_log(log_entry)
|
||||
# # db.mongodb_insert_log(log_entry)
|
||||
|
||||
progress_bar.finish()
|
||||
|
||||
# log_entry[4] == TTL
|
||||
# if log_entry[4] in distinct_ttl_count:
|
||||
# distinct_ttl_count[log_entry[4]] += 1
|
||||
# else:
|
||||
# distinct_ttl_count[log_entry[4]] = 1
|
||||
#
|
||||
# everything[day][hour]['ttl'] = distinct_ttl_count
|
||||
|
||||
# a bit faster
|
||||
# df = pandas.read_csv(log_file, compression='gzip', header=None)
|
||||
# print(df.iloc[0])
|
||||
|
||||
# print('distinct TTLs: ' + str(len(everything[0][0]['ttl'].keys())))
|
||||
|
||||
print('total duration: ' + str(time.time() - start) + 's')
|
||||
db.close()
|
||||
|
||||
|
||||
def batch(iterable, n=1):
|
||||
l = len(iterable)
|
||||
for ndx in range(0, l, n):
|
||||
yield iterable[ndx:min(ndx + n, l)]
|
||||
|
||||
|
||||
def check_duplicates():
|
||||
days_cumulated = 0
|
||||
|
||||
for day in analysis_days:
|
||||
days_cumulated += len(get_log_files_for_day(day))
|
||||
|
||||
all_logs = len(get_log_files_for_day(''))
|
||||
|
||||
if days_cumulated != all_logs:
|
||||
raise Exception('Log files inconsistency')
|
||||
|
||||
|
||||
# TODO
|
||||
def get_log_files_for_range_of_day(date, minutes_range):
|
||||
slot_files = {}
|
||||
slots_amount = int(1440 / minutes_range)
|
||||
|
||||
for slot in range(slots_amount):
|
||||
total_mins = slot * minutes_range
|
||||
hours, minutes = divmod(total_mins, 60)
|
||||
|
||||
time_range = '%02d-%02d' % (hours, minutes)
|
||||
slot_files[slot] = 'data/*' + date + '_' + time_range + '*.csv.gz'
|
||||
|
||||
|
||||
def get_log_files_for_hours_of_day(date):
|
||||
slot_files = {}
|
||||
slots_amount = 24
|
||||
|
||||
for slot in range(slots_amount):
|
||||
slot_files[slot] = glob.glob('data/*' + date + '_' + ('%02d' % slot) + '*.csv.gz')
|
||||
|
||||
return slot_files
|
||||
|
||||
|
||||
def get_log_files_for_day(date):
|
||||
log_files = 'data/*' + date + '*.csv.gz'
|
||||
|
||||
return glob.glob(log_files)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user