diff --git a/src/DoresA/serialize_logs_to_db.py b/src/DoresA/csv_tools.py similarity index 72% rename from src/DoresA/serialize_logs_to_db.py rename to src/DoresA/csv_tools.py index 6fdf449..11c7d43 100644 --- a/src/DoresA/serialize_logs_to_db.py +++ b/src/DoresA/csv_tools.py @@ -4,21 +4,40 @@ import glob import time import datetime import os -from progress.bar import Bar +import logging +import progressbar +# import db_sql -import db +logger = logging.getLogger('csv') +logger.setLevel(logging.INFO) -analysis_start_date = datetime.date(2017, 5, 1) +analysis_start_date = datetime.date(2017, 9, 1) analysis_days_amount = 7 -# pdns_logs_path = 'data/' -pdns_logs_path = '/run/media/felix/ext/2017.05/' +pdns_logs_path = '/home/felix/pdns/' # e.g. analysis_days = ['2017-04-07', '2017-04-08', '2017-04-09'] analysis_days = [(analysis_start_date + datetime.timedelta(days=x)).strftime('%Y-%m-%d') for x in range(analysis_days_amount)] -def main(): +def iterate_logs(): + start = time.time() + + for day in range(analysis_days_amount): + log_files_hour = get_log_files_for_hours_of_day(analysis_days[day]) + + progress_bar = progressbar.ProgressBar() + + for hour in progress_bar(range(24)): + for hour_files in log_files_hour[hour]: + with gzip.open(hour_files, 'rt', newline='') as file: + reader = csv.reader(file) + + for row in reader: + logger.info('loaded row: ' + str(row)) + + +def serialize_logs_to_db(): # check_duplicates() TODO readd start = time.time() @@ -48,18 +67,18 @@ def main(): # batch mode (batches of 1000 entries) for log_entries in batch(all_rows, 1000): - db.mariadb_insert_logs(log_entries) - #db.mongodb_insert_logs(log_entries) + db_sql.mariadb_insert_logs(log_entries) + #db_mongo.mongodb_insert_logs(log_entries) # single mode # for log_entry in reader: - # db.mariadb_insert_log(log_entry) - # # db.mongodb_insert_log(log_entry) + # db_sql.mariadb_insert_log(log_entry) + # # db_mongo.mongodb_insert_log(log_entry) progress_bar.finish() print('total duration: ' + str(time.time() - start) + 's') - db.close() + db_sql.close() def batch(iterable, n=1): @@ -110,4 +129,4 @@ def get_log_files_for_day(date): if __name__ == "__main__": - main() + iterate_logs() diff --git a/src/DoresA/db-redis.py b/src/DoresA/db-redis.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/DoresA/db_mongo.py b/src/DoresA/db_mongo.py new file mode 100644 index 0000000..18c6845 --- /dev/null +++ b/src/DoresA/db_mongo.py @@ -0,0 +1,39 @@ +import logging +from pymongo import MongoClient + +mongodb_host = 'localhost' +mongodb_db_name = 'doresa' +# mongodb_collection_name = 'pdns_logs' +mongodb_collection_name = 'may' # tmp TODO remove + +logger = logging.getLogger('db') +logger.setLevel(logging.DEBUG) + + +mongo_client = MongoClient(mongodb_host, 27017) +mongo_db = mongo_client[mongodb_db_name] +pdns_logs_mongo = mongo_db[mongodb_collection_name] + + +def mongodb_insert_log(log_entry): + db_entry = {'timestamp': log_entry[0], 'domain': log_entry[1], 'type': log_entry[2], 'record': log_entry[3], 'ttl': log_entry[4]} + pdns_logs_mongo.insert_one(db_entry) + + +def mongodb_insert_logs(log_entries): + db_entries = [] + + for log_entry in log_entries: + db_entries.append( + {'timestamp': log_entry[0], 'domain': log_entry[1], 'type': log_entry[2], 'record': log_entry[3], 'ttl': log_entry[4]} + ) + + pdns_logs_mongo.insert_many(db_entries) + + +def close(): + mongo_client.close() + + +if __name__ == "__main__": + exit() diff --git a/src/DoresA/db_redis.py b/src/DoresA/db_redis.py new file mode 100644 index 0000000..bd82708 --- /dev/null +++ b/src/DoresA/db_redis.py @@ -0,0 +1,113 @@ +from binascii import crc32 +from struct import unpack +from datetime import datetime +from redis import Redis +from redis import RedisError + +import logging + +logger = logging.getLogger('redis') +logger.setLevel(logging.INFO) +logger.debug('connecting redis') + +redis_host = 'localhost' +# TODO name ports properly +redis_start_port_first_seen = 2337 +redis_start_port_last_seen = 2340 +redis_port_reverse = 2343 +redis_port_4 = 2344 +redis_port_ttl = 2345 +bucket_mod = 1048576 # = 2 ** 20 + + +def _get_redis_shard(rrname): + bucket = crc32(rrname.encode('utf-8')) % bucket_mod # convert string to byte array + # split 32-bit crc32 integer into three regions + shard = 0 if bucket < 349525 else 1 if bucket < 349525 * 2 else 2 + return bucket, shard + + +def get_stats_for_domain(rrname, rrtype='A'): + bucket, shard = _get_redis_shard(rrname) + + redis_f = Redis(redis_host, port=redis_start_port_first_seen + shard) + redis_l = Redis(redis_host, port=redis_start_port_last_seen + shard) + redis_r = Redis(redis_host, port=redis_port_reverse) + redis_t = Redis(redis_host, port=redis_port_ttl) + + ttls_b = redis_t.lrange('t:{}:{}'.format(rrname, rrtype), 0, -1) + + ttls = map(lambda ttl: unpack('L', result)[0] + result = '.'.join([str(tuple) for tuple in ((ip & (0xff << 8 * i)) >> 8 * i for i in range(4))]) + + logger.debug('rrname: ' + str(rrname)) + logger.debug('res: ' + str(result)) + logger.debug('id: ' + str('f' + str(bucket))) + + t_f = float(unpack('