implement thread pool shutdown

This commit is contained in:
2019-01-21 23:25:17 +02:00
parent 0c5cfbe2cb
commit 598fd7aa74
3 changed files with 64 additions and 9 deletions

View File

@@ -38,6 +38,23 @@ struct agent_args {
static void show_certs(SSL *ssl);
static void *servlet(void *args);
static void send_reject_msg(SSL *ssl);
void shutdown_pool(void *args);
void shutdown_pool(void *args)
{
struct agent_args *agents = args;
int i = 0;
log(VERBOSE, "Shutting down agent pool...");
while (i < conf.rmps.agent_poolsize) {
if (!agents[i].busy && ++i)
continue;
log(VERBOSE, "Shutting down agent thread: %d", i);
SSL_shutdown(agents[i].ssl);
i++;
}
if (agents)
free(agents);
}
static void show_certs(SSL *ssl)
{
@@ -120,8 +137,10 @@ static void *servlet(void *args) /* Serve the connection -- threadable */
static void send_reject_msg(SSL *ssl)
{
char *reply = "FAILURE - The connection queue is full!\n";
struct msg_t buf = {0};
sprintf((char *)buf.chunk.data, "%s", reply);
SSL_write(ssl, reply, strlen(reply));
SSL_write(ssl, &buf, sizeof(struct msg_t));
}
void *agent_pool(void *args)
@@ -129,11 +148,12 @@ void *agent_pool(void *args)
struct pool_data *pool = args;
pthread_mutex_t mutex;
pthread_attr_t attr;
pthread_t *agent_thread = malloc(pool->size * sizeof(*agent_thread));
pthread_t *agent_thread = calloc(pool->size, sizeof(*agent_thread));
struct agent_args *agent_struct =
malloc(pool->size * sizeof(*agent_struct));
int i;
pthread_cleanup_push(shutdown_pool, agent_struct);
memset(agent_thread, 0, sizeof(pthread_t) * pool->size);
memset(agent_struct, 0, sizeof(struct agent_args) * pool->size);
@@ -192,5 +212,6 @@ void *agent_pool(void *args)
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&mutex);
pthread_detach(pthread_self());
pthread_cleanup_pop(1);
pthread_exit(NULL);
}

View File

@@ -304,8 +304,7 @@ static bool test_conf_syntax(char *config)
} else if (!strcmp(line, "rmps.agent_port")) {
i = strlen(val_ptr);
if (i < 6) { /* max 5 digits for network port */
if ((signed int)strspn(val_ptr,
"1234567890") == i) {
if ((signed int)strspn(val_ptr, "1234567890") == i) {
i = atoi(val_ptr);
if (i > 0 && i < 65536) {
asprintf(&conf.rmps.agent_port, "%s", val_ptr);
@@ -316,6 +315,18 @@ static bool test_conf_syntax(char *config)
log(ERROR, "Invalid rmps.agent_port value: %s", val_ptr);
val_ok = false;
failed = true;
} else if (!strcmp(line, "rmps.agent_poolsize")) {
i = strlen(val_ptr);
if ((signed int)strspn(val_ptr, "1234567890") == i) {
i = atoi(val_ptr);
if (i >= 0) {
conf.rmps.agent_poolsize = i;
continue;
}
}
log(ERROR, "Invalid rmps.agent_poolsize value: %s", val_ptr);
val_ok = false;
failed = true;
} else if (!strcmp(line, "rmps.client_ip")) {
asprintf(&conf.rmps.client_ip, "%s", val_ptr);
} else if (!strcmp(line, "rmps.client_port")) {
@@ -332,6 +343,18 @@ static bool test_conf_syntax(char *config)
log(ERROR, "Invalid rmps.client_port value: %s", val_ptr);
val_ok = false;
failed = true;
} else if (!strcmp(line, "rmps.client_poolsize")) {
i = strlen(val_ptr);
if ((signed int)strspn(val_ptr, "1234567890") == i) {
i = atoi(val_ptr);
if (i >= 0) {
conf.rmps.client_poolsize = i;
continue;
}
}
log(ERROR, "Invalid rmps.client_poolsize value: %s", val_ptr);
val_ok = false;
failed = true;
} else if (!strcmp(line, "rmps.logfile")) {
asprintf(&conf.rmps.logfile, "%s", val_ptr);
/*if (fopen_and_mkdir(conf.rmps.logfile) != 0)

View File

@@ -49,6 +49,9 @@ static void load_certificates(SSL_CTX *ctx, const char *certfile,
const char *keyfile, const char *cafile);
static SSL_CTX *init_server_ctx(const char *cipherlist, int mode);
pthread_t agent_pool_thread;
pthread_t client_pool_thread;
static int pid_file_handle;
static void cleanup(void)
@@ -86,6 +89,10 @@ static void signal_handler(int sig)
static void rmps_shutdown(void)
{
pthread_cancel(agent_pool_thread);
pthread_join(agent_pool_thread, NULL);
pthread_cancel(client_pool_thread);
pthread_join(client_pool_thread, NULL);
close(pid_file_handle);
cleanup();
}
@@ -160,6 +167,11 @@ static void daemonize(const char *rundir)
static void spawn_pidfile(const char *pidfile)
{
char str[10];
if (access(pidfile, F_OK) != -1) {
log(ERROR, "PID file %s exists. RMPS is already running!", pidfile);
exit(EXIT_FAILURE);
}
/* Ensure only one copy */
pid_file_handle = open(pidfile, O_RDWR|O_CREAT, 0600);
if (pid_file_handle == -1) {
@@ -340,7 +352,6 @@ int rmps_die(void)
void rmps_launch(int fork_flag)
{
pthread_t pool[2];
struct pool_data pool_args[2];
log(INFO, "Starting up RMPS...");
@@ -368,7 +379,7 @@ void rmps_launch(int fork_flag)
pool_args[0].srv = open_listener(conf.rmps.agent_ip, atoi(conf.rmps.agent_port));
pool_args[0].size = conf.rmps.agent_poolsize;
log(VERBOSE, "Creating agent thread pool (mutex).");
pthread_create(&pool[0], NULL, agent_pool, &pool_args[0]);
pthread_create(&agent_pool_thread, NULL, agent_pool, &pool_args[0]);
pool_args[1].ctx = init_server_ctx(conf.rmps.cipherlist,
SSL_VERIFY_NONE);
@@ -380,12 +391,12 @@ void rmps_launch(int fork_flag)
pool_args[1].srv = open_listener(conf.rmps.client_ip, atoi(conf.rmps.client_port));
pool_args[1].size = conf.rmps.client_poolsize;
log(VERBOSE, "Creating client thread pool (mutex).");
pthread_create(&pool[1], NULL, client_pool, &pool_args[1]);
pthread_create(&client_pool_thread, NULL, client_pool, &pool_args[1]);
if (start_job_queue(conf.rmps.agent_poolsize) == FAIL) {
log(ERROR,
"On start_job_queue(), RMPS failed to start, shutting down...");
exit(EXIT_FAILURE);
}
pthread_join(pool[0], NULL);
pthread_join(pool[1], NULL);
pthread_join(agent_pool_thread, NULL);
pthread_join(client_pool_thread , NULL);
}