From 95a6f1fe97fcb8e74bf9c510c92d9e3503c59871 Mon Sep 17 00:00:00 2001 From: Bogomil Vasilev Date: Sat, 17 Sep 2016 15:02:58 +0300 Subject: [PATCH] Nothing to see here, move along... --- Makefile | 10 +++++++--- agent_pool.c | 5 ++++- client_pool.c | 8 ++++++++ confparser.c | 19 +++++++++++++++++-- confparser.h | 3 +++ protocol.h | 1 + rmps.c | 22 +++++++++++++--------- rmps.conf | 1 + 8 files changed, 54 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 3fd5882..35ce074 100644 --- a/Makefile +++ b/Makefile @@ -6,9 +6,11 @@ CCFLAGS += -Wall \ -Wmissing-declarations \ -pedantic \ -fstack-protector-strong +CCFLAGS += `mysql_config --cflags` LDFLAGS = -O1 -lcrypto -lssl -lpthread LDFLAGS += -Wl,-z,relro,-z,now +LDFLAGS += `mysql_config --libs` SOURCES = main.c \ confparser.c \ @@ -16,7 +18,9 @@ SOURCES = main.c \ enum_functions.c \ log.c \ agent_pool.c \ - client_pool.c + client_pool.c \ + sql.c \ + job_queue.c OBJECTS = $(SOURCES:.c=.o) EXECUTABLE = rmpsd @@ -26,11 +30,11 @@ all: $(SOURCES) $(EXECUTABLE) $(EXECUTABLE): $(OBJECTS) @echo ' LD $@' - @$(CC) $(LDFLAGS) $(OBJECTS) -o $@ + @$(CC) $(LDFLAGS) $(OBJECTS) $(DBFLAGS) -o $@ .c.o: @echo ' CC $@' - @$(CC) $(CCFLAGS) -c $< -o $@ + @$(CC) $(CCFLAGS) $(DBFLAGS) -c $< -o $@ clean: rm -rf $(OBJECTS) $(EXECUTABLE) diff --git a/agent_pool.c b/agent_pool.c index af5b31c..87b3191 100644 --- a/agent_pool.c +++ b/agent_pool.c @@ -85,7 +85,8 @@ static void* servlet(void *args) /* Serve the connection -- threadable */ SSL_free(agent->ssl); /* release SSL state */ close(agent->sd); /* close connection */ agent->busy = 0; - return 0; + pthread_detach(pthread_self()); + pthread_exit(NULL); } static void send_reject_msg(SSL *ssl) @@ -157,4 +158,6 @@ void* agent_pool(void *args) } pthread_attr_destroy(&attr); pthread_mutex_destroy(&mutex); + pthread_detach(pthread_self()); + pthread_exit(NULL); } diff --git a/client_pool.c b/client_pool.c index 732b139..f084e27 100644 --- a/client_pool.c +++ b/client_pool.c @@ -1,5 +1,6 @@ #include "client_pool.h" #include "log.h" +#include "job_queue.h" #include "protocol.h" #include #include @@ -30,6 +31,9 @@ static void* servlet(void *args) /* Serve the connection -- threadable */ log(WARNING, "SSL_accept() failed. Reason below:"); log_ssl(); } else { + int queue_id = start_msg_queue(); + if (queue_id == FAIL) + goto exit; do { //buf.meta.type = GET_MEMORY; //sleep(1); @@ -45,6 +49,7 @@ static void* servlet(void *args) /* Serve the connection -- threadable */ log(VERBOSE, "Client msg: \"%s\"", buf.chunk.data); /* TODO: Insert msg handler here */ + add_msg_to_queue(queue_id, buf); continue; } @@ -59,6 +64,7 @@ static void* servlet(void *args) /* Serve the connection -- threadable */ log(INFO, "Client [%s] disconnected.", client->ip); } while (bytes); } +exit: SSL_free(client->ssl); /* release SSL state */ close(client->sd); /* close connection */ client->busy = 0; @@ -134,4 +140,6 @@ void* client_pool(void *args) } pthread_attr_destroy(&attr); pthread_mutex_destroy(&mutex); + pthread_detach(pthread_self()); + pthread_exit(NULL); } diff --git a/confparser.c b/confparser.c index d7ee504..f88644b 100644 --- a/confparser.c +++ b/confparser.c @@ -16,7 +16,8 @@ struct conf_table conf = { { "", /* db.type */ "", /* db.hostname */ - "" /* db.port */ + "", /* db.port */ + "" /* db.pass */ }, { "127.0.0.1", /* rmps.agent_ip */ @@ -41,11 +42,22 @@ struct conf_table conf = { } }; +const char* conf_db_pass(void) +{ + return conf.db.pass; +} + +const char* conf_db_hostname(void) +{ + return conf.db.hostname; +} + void confexport(void) { printf( "db.type=%s\n" "db.hostname=%s\n" "db.port=%s\n" + "db.pass=%s\n" "rmps.agent_ip=%s\n" "rmps.agent_port=%s\n" "rmps.client_ip=%s\n" @@ -65,6 +77,7 @@ void confexport(void) conf.db.type, conf.db.hostname, conf.db.port, + conf.db.pass, conf.rmps.agent_ip, conf.rmps.agent_port, conf.rmps.client_ip, @@ -259,7 +272,9 @@ static int test_conf_syntax(void) } ok = 0; failed = 1; - } else if (!strcmp(buf, "rmps.agent_ip")) { + } else if (!strcmp(buf, "db.pass")) + strcpy(conf.db.pass, tmp + 1); + else if (!strcmp(buf, "rmps.agent_ip")) { /* TODO */ } else if (!strcmp(buf, "rmps.agent_port")) { if ((i = strlen(tmp + 1)) < 6) { diff --git a/confparser.h b/confparser.h index 5ce0486..d670af0 100644 --- a/confparser.h +++ b/confparser.h @@ -11,6 +11,7 @@ struct conf_db { char type[15]; char hostname[HOSTNAMESIZE]; char port[6]; + char pass[60]; /* random decision */ }; struct conf_rmps { @@ -46,6 +47,8 @@ struct conf_table { extern struct conf_table conf; extern int confparse(void); extern void confexport(void); +extern const char* conf_db_pass(void); +extern const char* conf_db_hostname(void); #endif /* CONFPARSER_H */ diff --git a/protocol.h b/protocol.h index af3dac6..91370b3 100644 --- a/protocol.h +++ b/protocol.h @@ -25,6 +25,7 @@ struct msg_meta_t { unsigned chunks; short is_recv; short locking; + short isjob; }; struct msg_chunk_t { diff --git a/rmps.c b/rmps.c index 31f4e6d..97d8cec 100644 --- a/rmps.c +++ b/rmps.c @@ -2,6 +2,7 @@ #include "confparser.h" #include "agent_pool.h" #include "client_pool.h" +#include "job_queue.h" #include "rmps.h" #include #include @@ -172,25 +173,24 @@ static int open_listener(int port) sd = socket(addr.sin_family, SOCK_STREAM, 0); if (sd < 0) { log(ERROR, "Failed to create socket"); - log(INFO, "RMPS failed to start, shutting down..."); - atexit(cleanup); + goto exit; } if (set_reuse_addr(sd) < 0) { log(ERROR, "Failed to set reuse on address - Aborting...", port); - log(INFO, "RMPS failed to start, shutting down..."); - atexit(cleanup); + goto exit; } if (bind(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0) { log(ERROR, "Failed to bind on port: %d - Aborting...", port); - log(INFO, "RMPS failed to start, shutting down..."); - atexit(cleanup); + goto exit; } if (listen(sd, 10) != 0) { log(ERROR, "Failed to start listener on port %d - Aborting...", port); - log(INFO, "RMPS failed to start, shutting down..."); - atexit(cleanup); + goto exit; } return sd; +exit: + log(INFO, "RMPS failed to start, shutting down..."); + atexit(cleanup); } /* Init server and create context */ @@ -296,7 +296,11 @@ void launch_rmps(struct conf_table *conf, int fork_flag) pool_args[1].size = conf->rmps.client_poolsize; log(VERBOSE, "Creating client thread pool (mutex)."); pthread_create(&pool[1], NULL, client_pool, &pool_args[1]); - + if (start_job_queue(conf->rmps.agent_poolsize) == FAIL) { + log(ERROR, "On start_job_queue(), RMPS failed to start, shutting down..."); + atexit(cleanup); + return; + } pthread_join(pool[0], NULL); pthread_join(pool[1], NULL); } diff --git a/rmps.conf b/rmps.conf index d413a9c..ba91bbb 100644 --- a/rmps.conf +++ b/rmps.conf @@ -10,6 +10,7 @@ db.type=mysql db.hostname=127.0.0.1 db.port=3306 +db.pass=rmps # RMPS Core settings rmps.agent_ip=127.0.0.1