gunzip files on redis import

This commit is contained in:
2017-11-27 16:53:52 +01:00
parent 3b33df87fe
commit 55da18115a
2 changed files with 39 additions and 12 deletions

0
src/DoresA/db-redis.py Normal file
View File

View File

@@ -3,7 +3,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <hiredis/hiredis.h> #include <hiredis/hiredis.h>
// #include <zlib.h> // TODO gz
#include <zlib.h>
#define SOCK_F "/home/felix/redis/redis/redis_local_f.sock" #define SOCK_F "/home/felix/redis/redis/redis_local_f.sock"
#define SOCK_F2 "/home/felix/redis/redis/redis_local_f2.sock" #define SOCK_F2 "/home/felix/redis/redis/redis_local_f2.sock"
@@ -13,6 +14,7 @@
#define SOCK_L3 "/home/felix/redis/redis/redis_local_l3.sock" #define SOCK_L3 "/home/felix/redis/redis/redis_local_l3.sock"
#define SOCK_R "/home/felix/redis/redis/redis_local_r.sock" #define SOCK_R "/home/felix/redis/redis/redis_local_r.sock"
#define SOCK_V "/home/felix/redis/redis/redis_local_v.sock" #define SOCK_V "/home/felix/redis/redis/redis_local_v.sock"
#define SOCK_T "/home/felix/redis/redis/redis_local_t.sock"
//#define MOD 524288 //#define MOD 524288
#define MOD 1048576 #define MOD 1048576
@@ -43,6 +45,7 @@ int main(int argc, char **argv) {
redisContext *c_l3; redisContext *c_l3;
redisContext *c_r; redisContext *c_r;
redisContext *c_v; redisContext *c_v;
redisContext *c_t;
redisReply **reply = 0; redisReply **reply = 0;
if (argc < 2) { if (argc < 2) {
@@ -55,11 +58,12 @@ int main(int argc, char **argv) {
char rdata[4096]; char rdata[4096];
char rrname[4096]; char rrname[4096];
char rrtype[4096]; char rrtype[4096];
int ts; time_t ts;
uint32_t ttl;
// TODO gz // TODO gz
// gzFile f = gzopen(argv[1], "r") // FILE *f = fopen(argv[1], "r");
FILE *f = fopen(argv[1], "r"); gzFile f = gzopen(argv[1], "r");
if (!f) { if (!f) {
printf("file %s could not be opened\n", argv[1]); printf("file %s could not be opened\n", argv[1]);
@@ -74,15 +78,16 @@ int main(int argc, char **argv) {
c_l3 = redisConnectUnix(SOCK_L3); c_l3 = redisConnectUnix(SOCK_L3);
c_r = redisConnectUnix(SOCK_R); c_r = redisConnectUnix(SOCK_R);
c_v = redisConnectUnix(SOCK_V); c_v = redisConnectUnix(SOCK_V);
c_t = redisConnectUnix(SOCK_T);
if (!c_f || !c_f2 || !c_f3 || !c_l || !c_l2 || !c_l3 || !c_r || !c_v) { if (!c_f || !c_f2 || !c_f3 || !c_l || !c_l2 || !c_l3 || !c_r || !c_v || !c_t) {
perror("uh oh:"); perror("uh oh:");
return 1; return 1;
} }
// TODO gz // TODO gz
// while(gzgets(f, line, 4096)) { // while(fgets(line, 4096, f)) {
while(fgets(line, 4096, f)) { while(gzgets(f, line, 4096)) {
if (!strstr(line, "\n")) { if (!strstr(line, "\n")) {
CONT("no newline in line buffer found. read incomplete"); CONT("no newline in line buffer found. read incomplete");
@@ -133,6 +138,12 @@ int main(int argc, char **argv) {
} }
strcpy(rdata, tmp); strcpy(rdata, tmp);
free(tmp); free(tmp);
if (!(tmp = getfielddel(line, ',', '"', 4))) {
CONT("TTL could not be parsed");
}
ttl = atoi(tmp);
free(tmp);
} }
} }
@@ -141,7 +152,7 @@ int main(int argc, char **argv) {
printf("WARNING timestamp malformed: %s", line); printf("WARNING timestamp malformed: %s", line);
if (argc > 2 && !strcmp(argv[2], "-v")) if (argc > 2 && !strcmp(argv[2], "-v"))
printf("ts %u, rrname %s, rrtype %s, rdata %s\n", ts, rrname, rrtype, rdata); printf("ts %lld, rrname %s, rrtype %s, rdata %s, ttl %u\n", (long long)ts, rrname, rrtype, rdata, ttl);
//continue; // TODO remove XXX //continue; // TODO remove XXX
@@ -151,8 +162,8 @@ int main(int argc, char **argv) {
unsigned int bucket; unsigned int bucket;
char bucket_c[8]; char bucket_c[8];
char *pdns_r, *pdns_v, *pdns_fl; char *pdns_r, *pdns_v, *pdns_t, *pdns_fl;
size_t pdns_r_l, pdns_v_l, pdns_fl_l; size_t pdns_r_l, pdns_v_l, pdns_fl_l, pdns_t_l;
pdns_r_l = strlen("r:") + strlen(rrname) + strlen(":") + strlen(rrtype); pdns_r_l = strlen("r:") + strlen(rrname) + strlen(":") + strlen(rrtype);
pdns_r = malloc(pdns_r_l + 1); pdns_r = malloc(pdns_r_l + 1);
@@ -166,6 +177,10 @@ int main(int argc, char **argv) {
pdns_fl = malloc(pdns_fl_l + 1); pdns_fl = malloc(pdns_fl_l + 1);
sprintf(pdns_fl, "%s:%s", rrname, rdata); sprintf(pdns_fl, "%s:%s", rrname, rdata);
pdns_t_l = pdns_r_l;
pdns_t = malloc(pdns_t_l + 1);
sprintf(pdns_t, "t:%s:%s", rrname, rrtype);
if (!strcmp(rrtype, "A")) { if (!strcmp(rrtype, "A")) {
sscanf(rdata, "%u.%u.%u.%u", &o[0], &o[1], &o[2], &o[3]); sscanf(rdata, "%u.%u.%u.%u", &o[0], &o[1], &o[2], &o[3]);
for (i=0; i<4; i++) for (i=0; i<4; i++)
@@ -177,6 +192,8 @@ int main(int argc, char **argv) {
redisAppendCommand(c_v, "SADD %b %b", pdns_v, pdns_v_l, rrname, strlen(rrname)); redisAppendCommand(c_v, "SADD %b %b", pdns_v, pdns_v_l, rrname, strlen(rrname));
redisAppendCommand(c_t, "RPUSH %b %b", pdns_t, pdns_t_l, &ttl, (size_t) sizeof(uint32_t));
count++; count++;
@@ -209,6 +226,10 @@ int main(int argc, char **argv) {
if (redisGetReply(c_v, (void *)&reply) == REDIS_ERR_PROTOCOL) if (redisGetReply(c_v, (void *)&reply) == REDIS_ERR_PROTOCOL)
printf("protocol error\n"); printf("protocol error\n");
freeReplyObject(reply); freeReplyObject(reply);
if (redisGetReply(c_t, (void *)&reply) == REDIS_ERR_PROTOCOL)
printf("protocol error\n");
freeReplyObject(reply);
} }
for (i=0; i<count_fl; i++) { for (i=0; i<count_fl; i++) {
@@ -248,6 +269,7 @@ int main(int argc, char **argv) {
free(pdns_r); free(pdns_r);
free(pdns_v); free(pdns_v);
free(pdns_t);
free(pdns_fl); free(pdns_fl);
} }
@@ -261,6 +283,10 @@ int main(int argc, char **argv) {
if (redisGetReply(c_v, (void *)&reply) == REDIS_ERR_PROTOCOL) if (redisGetReply(c_v, (void *)&reply) == REDIS_ERR_PROTOCOL)
printf("protocol error\n"); printf("protocol error\n");
freeReplyObject(reply); freeReplyObject(reply);
if (redisGetReply(c_t, (void *)&reply) == REDIS_ERR_PROTOCOL)
printf("protocol error\n");
freeReplyObject(reply);
} }
for (i=0; i<count_fl; i++) { for (i=0; i<count_fl; i++) {
@@ -298,12 +324,13 @@ int main(int argc, char **argv) {
printf("%s\n", argv[1]); printf("%s\n", argv[1]);
// TODO gz // TODO gz
// gzclose(f); gzclose(f);
fclose(f); // fclose(f);
redisFree(c_f); redisFree(c_f);
redisFree(c_l); redisFree(c_l);
redisFree(c_r); redisFree(c_r);
redisFree(c_v); redisFree(c_v);
redisFree(c_t);
return 0; return 0;
} }