splitted db into separate files

added redis
train using csv files instead of sql
This commit is contained in:
2017-11-30 15:51:46 +01:00
parent 42fce4f17c
commit cf0536483b
12 changed files with 22775 additions and 73 deletions

View File

@@ -4,21 +4,40 @@ import glob
import time
import datetime
import os
from progress.bar import Bar
import logging
import progressbar
# import db_sql
import db
logger = logging.getLogger('csv')
logger.setLevel(logging.INFO)
analysis_start_date = datetime.date(2017, 5, 1)
analysis_start_date = datetime.date(2017, 9, 1)
analysis_days_amount = 7
# pdns_logs_path = 'data/'
pdns_logs_path = '/run/media/felix/ext/2017.05/'
pdns_logs_path = '/home/felix/pdns/'
# e.g. analysis_days = ['2017-04-07', '2017-04-08', '2017-04-09']
analysis_days = [(analysis_start_date + datetime.timedelta(days=x)).strftime('%Y-%m-%d') for x in
range(analysis_days_amount)]
def main():
def iterate_logs():
start = time.time()
for day in range(analysis_days_amount):
log_files_hour = get_log_files_for_hours_of_day(analysis_days[day])
progress_bar = progressbar.ProgressBar()
for hour in progress_bar(range(24)):
for hour_files in log_files_hour[hour]:
with gzip.open(hour_files, 'rt', newline='') as file:
reader = csv.reader(file)
for row in reader:
logger.info('loaded row: ' + str(row))
def serialize_logs_to_db():
# check_duplicates() TODO readd
start = time.time()
@@ -48,18 +67,18 @@ def main():
# batch mode (batches of 1000 entries)
for log_entries in batch(all_rows, 1000):
db.mariadb_insert_logs(log_entries)
#db.mongodb_insert_logs(log_entries)
db_sql.mariadb_insert_logs(log_entries)
#db_mongo.mongodb_insert_logs(log_entries)
# single mode
# for log_entry in reader:
# db.mariadb_insert_log(log_entry)
# # db.mongodb_insert_log(log_entry)
# db_sql.mariadb_insert_log(log_entry)
# # db_mongo.mongodb_insert_log(log_entry)
progress_bar.finish()
print('total duration: ' + str(time.time() - start) + 's')
db.close()
db_sql.close()
def batch(iterable, n=1):
@@ -110,4 +129,4 @@ def get_log_files_for_day(date):
if __name__ == "__main__":
main()
iterate_logs()

View File

39
src/DoresA/db_mongo.py Normal file
View File

@@ -0,0 +1,39 @@
import logging
from pymongo import MongoClient
mongodb_host = 'localhost'
mongodb_db_name = 'doresa'
# mongodb_collection_name = 'pdns_logs'
mongodb_collection_name = 'may' # tmp TODO remove
logger = logging.getLogger('db')
logger.setLevel(logging.DEBUG)
mongo_client = MongoClient(mongodb_host, 27017)
mongo_db = mongo_client[mongodb_db_name]
pdns_logs_mongo = mongo_db[mongodb_collection_name]
def mongodb_insert_log(log_entry):
db_entry = {'timestamp': log_entry[0], 'domain': log_entry[1], 'type': log_entry[2], 'record': log_entry[3], 'ttl': log_entry[4]}
pdns_logs_mongo.insert_one(db_entry)
def mongodb_insert_logs(log_entries):
db_entries = []
for log_entry in log_entries:
db_entries.append(
{'timestamp': log_entry[0], 'domain': log_entry[1], 'type': log_entry[2], 'record': log_entry[3], 'ttl': log_entry[4]}
)
pdns_logs_mongo.insert_many(db_entries)
def close():
mongo_client.close()
if __name__ == "__main__":
exit()

113
src/DoresA/db_redis.py Normal file
View File

@@ -0,0 +1,113 @@
from binascii import crc32
from struct import unpack
from datetime import datetime
from redis import Redis
from redis import RedisError
import logging
logger = logging.getLogger('redis')
logger.setLevel(logging.INFO)
logger.debug('connecting redis')
redis_host = 'localhost'
# TODO name ports properly
redis_start_port_first_seen = 2337
redis_start_port_last_seen = 2340
redis_port_reverse = 2343
redis_port_4 = 2344
redis_port_ttl = 2345
bucket_mod = 1048576 # = 2 ** 20
def _get_redis_shard(rrname):
bucket = crc32(rrname.encode('utf-8')) % bucket_mod # convert string to byte array
# split 32-bit crc32 integer into three regions
shard = 0 if bucket < 349525 else 1 if bucket < 349525 * 2 else 2
return bucket, shard
def get_stats_for_domain(rrname, rrtype='A'):
bucket, shard = _get_redis_shard(rrname)
redis_f = Redis(redis_host, port=redis_start_port_first_seen + shard)
redis_l = Redis(redis_host, port=redis_start_port_last_seen + shard)
redis_r = Redis(redis_host, port=redis_port_reverse)
redis_t = Redis(redis_host, port=redis_port_ttl)
ttls_b = redis_t.lrange('t:{}:{}'.format(rrname, rrtype), 0, -1)
ttls = map(lambda ttl: unpack('<L', ttl)[0], ttls_b)
rrtype = rrtype.upper()
# remove trailing slash
rrname = rrname.rstrip('/')
try:
results = []
for result in redis_r.smembers('r:{}:{}'.format(rrname, rrtype)):
if rrtype == 'A': # unpack IPv4 addresses
ip = unpack('>L', result)[0]
result = '.'.join([str(tuple) for tuple in ((ip & (0xff << 8 * i)) >> 8 * i for i in range(4))])
logger.debug('rrname: ' + str(rrname))
logger.debug('res: ' + str(result))
logger.debug('id: ' + str('f' + str(bucket)))
t_f = float(unpack('<L', redis_f.hget('f' + str(bucket), rrname + ':' + result))[0])
t_l = float(unpack('<L', redis_l.hget('l' + str(bucket), rrname + ':' + result))[0])
t_f = datetime.utcfromtimestamp(t_f).strftime('%Y-%m-%dT%H:%M:%SZ')
t_l = datetime.utcfromtimestamp(t_l).strftime('%Y-%m-%dT%H:%M:%SZ')
results.append({
'rrname': rrname,
'rrtype': rrtype.replace('rrtype_', ''),
'rdata': result,
'ttls': list(map(int, ttls)), # TODO do we need to convert iterable of type map to list? (e.g. for length)
'time_first': t_f,
'time_last': t_l
})
return results
except RedisError as e:
logger.error(e)
def get_stats_for_ip(rdata):
redis_v = Redis(redis_host, port=redis_port_4)
try:
results = []
for result in redis_v.smembers('v:{}'.format(rdata)):
result = result.decode('utf-8') # convert to string (python 3)
bucket, shard = _get_redis_shard(result)
redis_f = Redis(redis_host, port=redis_start_port_first_seen + shard)
redis_l = Redis(redis_host, port=redis_start_port_last_seen + shard)
t_f = float(unpack('<L', redis_f.hget('f' + str(bucket), result + ':' + rdata))[0])
t_l = float(unpack('<L', redis_l.hget('l' + str(bucket), result + ':' + rdata))[0])
t_f = datetime.utcfromtimestamp(t_f).strftime('%Y-%m-%dT%H:%M:%SZ')
t_l = datetime.utcfromtimestamp(t_l).strftime('%Y-%m-%dT%H:%M:%SZ')
results.append({
'rrname': result,
'rrtype': '',
'rdata': rdata,
'time_first': t_f,
'time_last': t_l
})
return results
except RedisError as e:
logger.error(e)
def test():
asd = get_stats_for_domain('ZN015105.ppp.dion.ne.jp')
# asd = get_stats_for_ip('172.217.27.14')
logger.info(asd)
if __name__ == "__main__":
test()

View File

@@ -2,14 +2,8 @@ import MySQLdb as mariadb
import time
import os
import logging
from pymongo import MongoClient
mongodb_host = 'localhost'
mongodb_db_name = 'doresa'
# mongodb_collection_name = 'pdns_logs'
mongodb_collection_name = 'may' # tmp TODO remove
sql_host = 'localhost'
sql_port = 3306
sql_db_name = 'doresa'
@@ -19,9 +13,8 @@ sql_pw = '3qfACEZzbXY4b'
sql_table_name = 'pdns_logs_test'
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('db')
logger.setLevel(logging.DEBUG)
if 'MYSQL_HOST' in os.environ:
sql_host = os.environ['MYSQL_HOST']
@@ -39,11 +32,6 @@ if 'MYSQL_PASSWORD' in os.environ:
sql_pw = os.environ['MYSQL_PASSWORD']
mongo_client = MongoClient(mongodb_host, 27017)
mongo_db = mongo_client[mongodb_db_name]
pdns_logs_mongo = mongo_db[mongodb_collection_name]
sql_connection = mariadb.connect(host=sql_host, user=sql_user_name, passwd=sql_pw, db=sql_db_name, port=sql_port)
sql_cursor = sql_connection.cursor()
@@ -122,7 +110,6 @@ def mariadb_get_logs_for_domain(domain, id_upto, from_time=None, to_time=None):
def mariadb_get_logs_for_ip(ip, id_upto, from_time=None, to_time=None):
# we need a second connection for this query as this usually (always) run in parallel to the first query
sql_connection_tmp = mariadb.connect(host=sql_host, user=sql_user_name, passwd=sql_pw, db=sql_db_name, port=sql_port)
sql_cursor_tmp = sql_connection_tmp.cursor()
# get_distinct_ttl = 'SELECT * FROM ' + sql_table_name + \
# ' WHERE timestamp BETWEEN \'{}\' and \'{}\' '.format(from_time, to_time) + \
# 'AND domain=\'' + str(ip) + '\';'
@@ -134,7 +121,6 @@ def mariadb_get_logs_for_ip(ip, id_upto, from_time=None, to_time=None):
result = sql_connection_tmp.use_result()
logs_for_ip = result.fetch_row(maxrows=0, how=1) # TODO this can consume a lot of memory, think of alternatives
# sql_cursor_tmp.close()
sql_connection_tmp.close()
return logs_for_ip
@@ -159,32 +145,13 @@ def mariadb_create_table():
sql_cursor.execute(create_table)
def mongodb_insert_log(log_entry):
db_entry = {'timestamp': log_entry[0], 'domain': log_entry[1], 'type': log_entry[2], 'record': log_entry[3], 'ttl': log_entry[4]}
pdns_logs_mongo.insert_one(db_entry)
def mongodb_insert_logs(log_entries):
db_entries = []
for log_entry in log_entries:
db_entries.append(
{'timestamp': log_entry[0], 'domain': log_entry[1], 'type': log_entry[2], 'record': log_entry[3], 'ttl': log_entry[4]}
)
pdns_logs_mongo.insert_many(db_entries)
def convert_timestamp_to_sql_datetime(timestamp):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
def close():
# mariadb
sql_cursor.close()
sql_connection.close()
# mongodb
mongo_client.close()
#mariadb_create_table()

View File

@@ -2,8 +2,8 @@ import re
import logging
from geoip2 import database, errors
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('ip')
logger.setLevel(logging.DEBUG)
def get_country_by_ip(ip):

View File

@@ -1,10 +1,10 @@
import db
import db_sql
import datetime
def test():
f = '%Y-%m-%d %H:%M:%S'
results = db.mariadb_get_logs(datetime.date(2017, 4, 7).strftime(f), datetime.date(2017, 4, 8).strftime(f))
results = db_sql.mariadb_get_logs(datetime.date(2017, 4, 7).strftime(f), datetime.date(2017, 4, 8).strftime(f))
row = results.fetch_row(how=1)
while row:

File diff suppressed because it is too large Load Diff

View File

@@ -1 +1 @@
{"last_check":"2017-11-07T13:21:55Z","pypi_version":"9.0.1"}
{"last_check":"2017-11-29T13:45:46Z","pypi_version":"9.0.1"}

View File

@@ -1,3 +1,4 @@
beautifulsoup4==4.6.0
certifi==2017.11.5
chardet==3.0.4
cycler==0.10.0
@@ -12,14 +13,16 @@ mysqlclient==1.3.12
nltk==3.2.5
numpy==1.13.1
pandas==0.20.3
progress==1.3
progressbar2==3.34.3
pyenchant==1.6.11
pymongo==3.5.1
pyparsing==2.2.0
python-dateutil==2.6.1
python-geoip==1.2
python-utils==2.2.0
python-weka-wrapper3==0.1.3
pytz==2017.2
redis==2.10.6
requests==2.18.4
scikit-learn==0.19.0
scipy==0.19.1

View File

@@ -1,29 +1,69 @@
from sklearn.datasets import load_iris
from sklearn import tree
import logging
import datetime
# logfile = 'analysis_' + datetime.datetime.now().strftime('%Y-%m-%d_%H:%M') + '.log' # https://stackoverflow.com/questions/1943747/python-logging-before-you-run-logging-basicconfig
# logging.basicConfig(filename=logfile, filemode='w') # important to set basicConfig only once for all modules
logging.basicConfig()
import logging
import datetime
import gzip
import csv
import numpy as np
import graphviz
import datetime
import logging
import time
import db
import db_redis
import domain
import ip
import ttl
import csv_tools
import progressbar
# import db_sql
from sklearn.datasets import load_iris
from sklearn import tree
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('train')
logger.setLevel(logging.DEBUG)
db_format_time = '%Y-%m-%d %H:%M:%S'
train_start = datetime.date(2017, 5, 1)
train_end = datetime.date(2017, 5, 8)
train_end = datetime.date(2017, 5, 4)
id_upto = 379283817
# record types that should be analysed (e.g. only A)
record_types = ['A']
# id_upto = db.mariadb_get_nearest_id(train_end.strftime(db_format_time))
def train():
start = time.time()
for day in range(csv_tools.analysis_days_amount):
log_files_hour = csv_tools.get_log_files_for_hours_of_day(csv_tools.analysis_days[day])
progress_bar = progressbar.ProgressBar()
for hour in progress_bar(range(24)):
for hour_files in log_files_hour[hour]:
with gzip.open(hour_files, 'rt', newline='') as file:
reader = csv.reader(file)
for row in reader:
if row[2] in record_types:
entity = {'timestamp': row[0], 'domain': row[1], 'type': row[2],
'record': row[3], 'ttl': row[4]}
try:
prepare_features_redis(entity)
# pass
except Exception as e:
logger.error(e)
logger.error('Exception occured processing entity: ' + str(entity))
def get_logs_from_db():
results = db.mariadb_get_logs(id_upto)
@@ -31,43 +71,133 @@ def get_logs_from_db():
logger.debug("# entity: " + row[0]['domain'])
features = prepare_features(row[0])
features = prepare_features_redis(row[0])
logger.info(str(features))
logger.debug(str(features))
# while row:
# logger.debug("# entity: " + row[0]['domain'])
#
# features = prepare_features(row[0])
#
# logger.info(str(features))
# logger.debug(str(features))
#
# row = results.fetch_row(how=1)
def prepare_features(entity):
# get all logs for the same domain
def prepare_features_redis(entity):
checkpoint = time.time()
domain_stats = db_redis.get_stats_for_domain(entity['domain'])
ip_stats = db_redis.get_stats_for_ip(entity['record'])
logger.debug('redis took' + str(time.time() - checkpoint))
logger.debug(domain_stats)
if len(domain_stats) != 1:
logger.debug('no stats in redis for entity: ' + entity)
domain_stats = domain_stats[0]
# TODO
ips = []
# feature 5: Number of distinct IP addresses
distinct_ips = len(ips)
# feature 6: Number of distinct countries
distinct_countries = len([ip.get_country_by_ip(ip_str) for ip_str in ips])
# feature 7: Number of (distinct) domains share the IP with
distinct_domains_with_same_ip = len(ip_stats)
# feature 8: Reverse DNS query results
# 5 atomic feature
# atomic 1: ratio of IP addresses that cannot be matched with a domain name (NX domains)
ratio_ips_nx = 0
# atomic 2: ratio of ips that are used for DSL lines
ratio_ips_dsl = 0
# atomic 3: ratio of ips that belong to hosting services
ratio_ips_hoster = 0
# atomic 4: ratio of ips that belong to known ISPs
ratio_ips_isp = 0
# atomic 5: ips that can be matched with a valid domain name
ratio_ips_valid = 0
# TODO add atomics to 'all_features'
reverse_dns_result = 0
# feature 9: Average TTL
average_ttl = sum(domain_stats['ttls']) / len(domain_stats['ttls'])
# feature 10: Standard Deviation of TTL
standard_deviation = ttl.standard_deviation(domain_stats['ttls']) # TODO distinct ttls for std deviation?
# feature 11: Number of distinct TTL values
distinct_ttl = len(list(set(domain_stats['ttls'])))
# feature 12: Number of TTL change
ttl_changes = ttl.changes(domain_stats['ttls'])
# feature 13: Percentage usage of specific TTL ranges
# specific ranges: [0, 1], [1, 100], [100, 300], [300, 900], [900, inf]
# TODO check if 5 individual features make a difference
specific_ttl_ranges = ttl.specific_range(entity['ttl'])
# feature 14: % of numerical characters
numerical_characters_percent = domain.ratio_numerical_to_alpha(entity['domain'])
# feature 15: % of the length of the LMS
lms_percent = domain.ratio_lms_to_fqdn(entity['domain'])
all_features = np.array([
distinct_ips, distinct_countries,
distinct_domains_with_same_ip, reverse_dns_result, average_ttl, standard_deviation, distinct_ttl, ttl_changes,
specific_ttl_ranges, numerical_characters_percent, lms_percent
])
logger.debug(all_features)
exit()
return all_features
def prepare_features_mysql(entity):
checkpoint = time.time()
logger.debug('get logs for domain start')
# get all logs for the same domain
# BIG TODO check if we need the ip addresses of a specific response (not of all [different] responses) somewhere
logs_for_domain = db.mariadb_get_logs_for_domain(entity['domain'], id_upto)
logger.debug('get logs for domain done' + str(time.time() - checkpoint) + ' s')
logger.info('get logs for domain done' + str(time.time() - checkpoint) + ' s')
# TODO do this efficient
ttls = [log['ttl'] for log in logs_for_domain]
logger.info('ttls ' + str(ttls))
logger.debug('ttls ' + str(ttls))
ips = [log['record'] for log in logs_for_domain] # TODO check if valid ip address
logger.info(ips)
logger.debug(ips)
response_timestamps = [log['timestamp'] for log in logs_for_domain]
logger.info(response_timestamps)
logger.debug(response_timestamps)
domains_with_same_ip = []
# get all logs for the same ip if valid ip
if ip.is_valid_ipv4(entity['record']) or ip.is_valid_ipv6(entity['record']):
checkpoint = time.time()
logger.debug('get logs for ip start')
logger.info('get logs for ip start')
logs_for_ip = db.mariadb_get_logs_for_ip(entity['record'], id_upto)
logger.debug('get logs for ip done' + str(time.time() - checkpoint) + ' s')
logger.info('get logs for ip done' + str(time.time() - checkpoint) + ' s')
domains_with_same_ip = [log['domain'] for log in logs_for_ip]
# feature 1: Short Life
@@ -174,12 +304,12 @@ def prepare_features(entity):
def test():
start = time.time()
logger.debug('starting training ' + str(start))
logger.info('starting training ' + str(start))
get_logs_from_db()
train()
logger.debug('total duration: ' + str(time.time() - start) + 's')
db.close()
logger.info('total duration: ' + str(time.time() - start) + 's')
cleanup()
# db.mariadb_get_distinct_ttl('d2s45lswxaswrw.cloudfront.net', train_start.strftime(db_format_time), train_end.strftime(db_format_time))
@@ -197,5 +327,9 @@ def flow():
graph.render('test', view=True)
def cleanup():
db.close()
if __name__ == "__main__":
test()

View File

@@ -1,4 +1,8 @@
import numpy as np
import logging
logger = logging.getLogger('ttl')
logger.setLevel(logging.DEBUG)
def standard_deviation(array):
@@ -20,6 +24,12 @@ def changes(array):
def specific_range(ttl):
specific_ttl_ranges = 4 # default is [900, inf]
try:
ttl = int(ttl)
except ValueError:
logger.error('ttl not a number')
return specific_ttl_ranges
if 0 < ttl <= 1:
specific_ttl_ranges = 0
elif 1 < ttl <= 100: