diff --git a/src/agent_pool.c b/src/agent_pool.c index e17ad4e..22806fe 100644 --- a/src/agent_pool.c +++ b/src/agent_pool.c @@ -38,6 +38,23 @@ struct agent_args { static void show_certs(SSL *ssl); static void *servlet(void *args); static void send_reject_msg(SSL *ssl); +void shutdown_pool(void *args); + +void shutdown_pool(void *args) +{ + struct agent_args *agents = args; + int i = 0; + log(VERBOSE, "Shutting down agent pool..."); + while (i < conf.rmps.agent_poolsize) { + if (!agents[i].busy && ++i) + continue; + log(VERBOSE, "Shutting down agent thread: %d", i); + SSL_shutdown(agents[i].ssl); + i++; + } + if (agents) + free(agents); +} static void show_certs(SSL *ssl) { @@ -120,8 +137,10 @@ static void *servlet(void *args) /* Serve the connection -- threadable */ static void send_reject_msg(SSL *ssl) { char *reply = "FAILURE - The connection queue is full!\n"; + struct msg_t buf = {0}; + sprintf((char *)buf.chunk.data, "%s", reply); - SSL_write(ssl, reply, strlen(reply)); + SSL_write(ssl, &buf, sizeof(struct msg_t)); } void *agent_pool(void *args) @@ -129,11 +148,12 @@ void *agent_pool(void *args) struct pool_data *pool = args; pthread_mutex_t mutex; pthread_attr_t attr; - pthread_t *agent_thread = malloc(pool->size * sizeof(*agent_thread)); + pthread_t *agent_thread = calloc(pool->size, sizeof(*agent_thread)); struct agent_args *agent_struct = malloc(pool->size * sizeof(*agent_struct)); int i; + pthread_cleanup_push(shutdown_pool, agent_struct); memset(agent_thread, 0, sizeof(pthread_t) * pool->size); memset(agent_struct, 0, sizeof(struct agent_args) * pool->size); @@ -192,5 +212,6 @@ void *agent_pool(void *args) pthread_attr_destroy(&attr); pthread_mutex_destroy(&mutex); pthread_detach(pthread_self()); + pthread_cleanup_pop(1); pthread_exit(NULL); } diff --git a/src/confparser.c b/src/confparser.c index e864470..48def44 100644 --- a/src/confparser.c +++ b/src/confparser.c @@ -304,8 +304,7 @@ static bool test_conf_syntax(char *config) } else if (!strcmp(line, "rmps.agent_port")) { i = strlen(val_ptr); if (i < 6) { /* max 5 digits for network port */ - if ((signed int)strspn(val_ptr, - "1234567890") == i) { + if ((signed int)strspn(val_ptr, "1234567890") == i) { i = atoi(val_ptr); if (i > 0 && i < 65536) { asprintf(&conf.rmps.agent_port, "%s", val_ptr); @@ -316,6 +315,18 @@ static bool test_conf_syntax(char *config) log(ERROR, "Invalid rmps.agent_port value: %s", val_ptr); val_ok = false; failed = true; + } else if (!strcmp(line, "rmps.agent_poolsize")) { + i = strlen(val_ptr); + if ((signed int)strspn(val_ptr, "1234567890") == i) { + i = atoi(val_ptr); + if (i >= 0) { + conf.rmps.agent_poolsize = i; + continue; + } + } + log(ERROR, "Invalid rmps.agent_poolsize value: %s", val_ptr); + val_ok = false; + failed = true; } else if (!strcmp(line, "rmps.client_ip")) { asprintf(&conf.rmps.client_ip, "%s", val_ptr); } else if (!strcmp(line, "rmps.client_port")) { @@ -332,6 +343,18 @@ static bool test_conf_syntax(char *config) log(ERROR, "Invalid rmps.client_port value: %s", val_ptr); val_ok = false; failed = true; + } else if (!strcmp(line, "rmps.client_poolsize")) { + i = strlen(val_ptr); + if ((signed int)strspn(val_ptr, "1234567890") == i) { + i = atoi(val_ptr); + if (i >= 0) { + conf.rmps.client_poolsize = i; + continue; + } + } + log(ERROR, "Invalid rmps.client_poolsize value: %s", val_ptr); + val_ok = false; + failed = true; } else if (!strcmp(line, "rmps.logfile")) { asprintf(&conf.rmps.logfile, "%s", val_ptr); /*if (fopen_and_mkdir(conf.rmps.logfile) != 0) diff --git a/src/rmps.c b/src/rmps.c index 4a2ea76..11d25ff 100644 --- a/src/rmps.c +++ b/src/rmps.c @@ -49,6 +49,9 @@ static void load_certificates(SSL_CTX *ctx, const char *certfile, const char *keyfile, const char *cafile); static SSL_CTX *init_server_ctx(const char *cipherlist, int mode); +pthread_t agent_pool_thread; +pthread_t client_pool_thread; + static int pid_file_handle; static void cleanup(void) @@ -86,6 +89,10 @@ static void signal_handler(int sig) static void rmps_shutdown(void) { + pthread_cancel(agent_pool_thread); + pthread_join(agent_pool_thread, NULL); + pthread_cancel(client_pool_thread); + pthread_join(client_pool_thread, NULL); close(pid_file_handle); cleanup(); } @@ -160,6 +167,11 @@ static void daemonize(const char *rundir) static void spawn_pidfile(const char *pidfile) { char str[10]; + + if (access(pidfile, F_OK) != -1) { + log(ERROR, "PID file %s exists. RMPS is already running!", pidfile); + exit(EXIT_FAILURE); + } /* Ensure only one copy */ pid_file_handle = open(pidfile, O_RDWR|O_CREAT, 0600); if (pid_file_handle == -1) { @@ -340,7 +352,6 @@ int rmps_die(void) void rmps_launch(int fork_flag) { - pthread_t pool[2]; struct pool_data pool_args[2]; log(INFO, "Starting up RMPS..."); @@ -368,7 +379,7 @@ void rmps_launch(int fork_flag) pool_args[0].srv = open_listener(conf.rmps.agent_ip, atoi(conf.rmps.agent_port)); pool_args[0].size = conf.rmps.agent_poolsize; log(VERBOSE, "Creating agent thread pool (mutex)."); - pthread_create(&pool[0], NULL, agent_pool, &pool_args[0]); + pthread_create(&agent_pool_thread, NULL, agent_pool, &pool_args[0]); pool_args[1].ctx = init_server_ctx(conf.rmps.cipherlist, SSL_VERIFY_NONE); @@ -380,12 +391,12 @@ void rmps_launch(int fork_flag) pool_args[1].srv = open_listener(conf.rmps.client_ip, atoi(conf.rmps.client_port)); pool_args[1].size = conf.rmps.client_poolsize; log(VERBOSE, "Creating client thread pool (mutex)."); - pthread_create(&pool[1], NULL, client_pool, &pool_args[1]); + pthread_create(&client_pool_thread, NULL, client_pool, &pool_args[1]); if (start_job_queue(conf.rmps.agent_poolsize) == FAIL) { log(ERROR, "On start_job_queue(), RMPS failed to start, shutting down..."); exit(EXIT_FAILURE); } - pthread_join(pool[0], NULL); - pthread_join(pool[1], NULL); + pthread_join(agent_pool_thread, NULL); + pthread_join(client_pool_thread , NULL); }