Nothing to see here, move along...
This commit is contained in:
10
Makefile
10
Makefile
@@ -6,9 +6,11 @@ CCFLAGS += -Wall \
|
|||||||
-Wmissing-declarations \
|
-Wmissing-declarations \
|
||||||
-pedantic \
|
-pedantic \
|
||||||
-fstack-protector-strong
|
-fstack-protector-strong
|
||||||
|
CCFLAGS += `mysql_config --cflags`
|
||||||
|
|
||||||
LDFLAGS = -O1 -lcrypto -lssl -lpthread
|
LDFLAGS = -O1 -lcrypto -lssl -lpthread
|
||||||
LDFLAGS += -Wl,-z,relro,-z,now
|
LDFLAGS += -Wl,-z,relro,-z,now
|
||||||
|
LDFLAGS += `mysql_config --libs`
|
||||||
|
|
||||||
SOURCES = main.c \
|
SOURCES = main.c \
|
||||||
confparser.c \
|
confparser.c \
|
||||||
@@ -16,7 +18,9 @@ SOURCES = main.c \
|
|||||||
enum_functions.c \
|
enum_functions.c \
|
||||||
log.c \
|
log.c \
|
||||||
agent_pool.c \
|
agent_pool.c \
|
||||||
client_pool.c
|
client_pool.c \
|
||||||
|
sql.c \
|
||||||
|
job_queue.c
|
||||||
|
|
||||||
OBJECTS = $(SOURCES:.c=.o)
|
OBJECTS = $(SOURCES:.c=.o)
|
||||||
EXECUTABLE = rmpsd
|
EXECUTABLE = rmpsd
|
||||||
@@ -26,11 +30,11 @@ all: $(SOURCES) $(EXECUTABLE)
|
|||||||
|
|
||||||
$(EXECUTABLE): $(OBJECTS)
|
$(EXECUTABLE): $(OBJECTS)
|
||||||
@echo ' LD $@'
|
@echo ' LD $@'
|
||||||
@$(CC) $(LDFLAGS) $(OBJECTS) -o $@
|
@$(CC) $(LDFLAGS) $(OBJECTS) $(DBFLAGS) -o $@
|
||||||
|
|
||||||
.c.o:
|
.c.o:
|
||||||
@echo ' CC $@'
|
@echo ' CC $@'
|
||||||
@$(CC) $(CCFLAGS) -c $< -o $@
|
@$(CC) $(CCFLAGS) $(DBFLAGS) -c $< -o $@
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -rf $(OBJECTS) $(EXECUTABLE)
|
rm -rf $(OBJECTS) $(EXECUTABLE)
|
||||||
|
|||||||
@@ -85,7 +85,8 @@ static void* servlet(void *args) /* Serve the connection -- threadable */
|
|||||||
SSL_free(agent->ssl); /* release SSL state */
|
SSL_free(agent->ssl); /* release SSL state */
|
||||||
close(agent->sd); /* close connection */
|
close(agent->sd); /* close connection */
|
||||||
agent->busy = 0;
|
agent->busy = 0;
|
||||||
return 0;
|
pthread_detach(pthread_self());
|
||||||
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void send_reject_msg(SSL *ssl)
|
static void send_reject_msg(SSL *ssl)
|
||||||
@@ -157,4 +158,6 @@ void* agent_pool(void *args)
|
|||||||
}
|
}
|
||||||
pthread_attr_destroy(&attr);
|
pthread_attr_destroy(&attr);
|
||||||
pthread_mutex_destroy(&mutex);
|
pthread_mutex_destroy(&mutex);
|
||||||
|
pthread_detach(pthread_self());
|
||||||
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
#include "client_pool.h"
|
#include "client_pool.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
|
#include "job_queue.h"
|
||||||
#include "protocol.h"
|
#include "protocol.h"
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@@ -30,6 +31,9 @@ static void* servlet(void *args) /* Serve the connection -- threadable */
|
|||||||
log(WARNING, "SSL_accept() failed. Reason below:");
|
log(WARNING, "SSL_accept() failed. Reason below:");
|
||||||
log_ssl();
|
log_ssl();
|
||||||
} else {
|
} else {
|
||||||
|
int queue_id = start_msg_queue();
|
||||||
|
if (queue_id == FAIL)
|
||||||
|
goto exit;
|
||||||
do {
|
do {
|
||||||
//buf.meta.type = GET_MEMORY;
|
//buf.meta.type = GET_MEMORY;
|
||||||
//sleep(1);
|
//sleep(1);
|
||||||
@@ -45,6 +49,7 @@ static void* servlet(void *args) /* Serve the connection -- threadable */
|
|||||||
|
|
||||||
log(VERBOSE, "Client msg: \"%s\"", buf.chunk.data);
|
log(VERBOSE, "Client msg: \"%s\"", buf.chunk.data);
|
||||||
/* TODO: Insert msg handler here */
|
/* TODO: Insert msg handler here */
|
||||||
|
add_msg_to_queue(queue_id, buf);
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -59,6 +64,7 @@ static void* servlet(void *args) /* Serve the connection -- threadable */
|
|||||||
log(INFO, "Client [%s] disconnected.", client->ip);
|
log(INFO, "Client [%s] disconnected.", client->ip);
|
||||||
} while (bytes);
|
} while (bytes);
|
||||||
}
|
}
|
||||||
|
exit:
|
||||||
SSL_free(client->ssl); /* release SSL state */
|
SSL_free(client->ssl); /* release SSL state */
|
||||||
close(client->sd); /* close connection */
|
close(client->sd); /* close connection */
|
||||||
client->busy = 0;
|
client->busy = 0;
|
||||||
@@ -134,4 +140,6 @@ void* client_pool(void *args)
|
|||||||
}
|
}
|
||||||
pthread_attr_destroy(&attr);
|
pthread_attr_destroy(&attr);
|
||||||
pthread_mutex_destroy(&mutex);
|
pthread_mutex_destroy(&mutex);
|
||||||
|
pthread_detach(pthread_self());
|
||||||
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
|
|||||||
19
confparser.c
19
confparser.c
@@ -16,7 +16,8 @@ struct conf_table conf = {
|
|||||||
{
|
{
|
||||||
"", /* db.type */
|
"", /* db.type */
|
||||||
"", /* db.hostname */
|
"", /* db.hostname */
|
||||||
"" /* db.port */
|
"", /* db.port */
|
||||||
|
"" /* db.pass */
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"127.0.0.1", /* rmps.agent_ip */
|
"127.0.0.1", /* rmps.agent_ip */
|
||||||
@@ -41,11 +42,22 @@ struct conf_table conf = {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const char* conf_db_pass(void)
|
||||||
|
{
|
||||||
|
return conf.db.pass;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char* conf_db_hostname(void)
|
||||||
|
{
|
||||||
|
return conf.db.hostname;
|
||||||
|
}
|
||||||
|
|
||||||
void confexport(void)
|
void confexport(void)
|
||||||
{
|
{
|
||||||
printf( "db.type=%s\n"
|
printf( "db.type=%s\n"
|
||||||
"db.hostname=%s\n"
|
"db.hostname=%s\n"
|
||||||
"db.port=%s\n"
|
"db.port=%s\n"
|
||||||
|
"db.pass=%s\n"
|
||||||
"rmps.agent_ip=%s\n"
|
"rmps.agent_ip=%s\n"
|
||||||
"rmps.agent_port=%s\n"
|
"rmps.agent_port=%s\n"
|
||||||
"rmps.client_ip=%s\n"
|
"rmps.client_ip=%s\n"
|
||||||
@@ -65,6 +77,7 @@ void confexport(void)
|
|||||||
conf.db.type,
|
conf.db.type,
|
||||||
conf.db.hostname,
|
conf.db.hostname,
|
||||||
conf.db.port,
|
conf.db.port,
|
||||||
|
conf.db.pass,
|
||||||
conf.rmps.agent_ip,
|
conf.rmps.agent_ip,
|
||||||
conf.rmps.agent_port,
|
conf.rmps.agent_port,
|
||||||
conf.rmps.client_ip,
|
conf.rmps.client_ip,
|
||||||
@@ -259,7 +272,9 @@ static int test_conf_syntax(void)
|
|||||||
}
|
}
|
||||||
ok = 0;
|
ok = 0;
|
||||||
failed = 1;
|
failed = 1;
|
||||||
} else if (!strcmp(buf, "rmps.agent_ip")) {
|
} else if (!strcmp(buf, "db.pass"))
|
||||||
|
strcpy(conf.db.pass, tmp + 1);
|
||||||
|
else if (!strcmp(buf, "rmps.agent_ip")) {
|
||||||
/* TODO */
|
/* TODO */
|
||||||
} else if (!strcmp(buf, "rmps.agent_port")) {
|
} else if (!strcmp(buf, "rmps.agent_port")) {
|
||||||
if ((i = strlen(tmp + 1)) < 6) {
|
if ((i = strlen(tmp + 1)) < 6) {
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ struct conf_db {
|
|||||||
char type[15];
|
char type[15];
|
||||||
char hostname[HOSTNAMESIZE];
|
char hostname[HOSTNAMESIZE];
|
||||||
char port[6];
|
char port[6];
|
||||||
|
char pass[60]; /* random decision */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct conf_rmps {
|
struct conf_rmps {
|
||||||
@@ -46,6 +47,8 @@ struct conf_table {
|
|||||||
extern struct conf_table conf;
|
extern struct conf_table conf;
|
||||||
extern int confparse(void);
|
extern int confparse(void);
|
||||||
extern void confexport(void);
|
extern void confexport(void);
|
||||||
|
extern const char* conf_db_pass(void);
|
||||||
|
extern const char* conf_db_hostname(void);
|
||||||
|
|
||||||
#endif /* CONFPARSER_H */
|
#endif /* CONFPARSER_H */
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ struct msg_meta_t {
|
|||||||
unsigned chunks;
|
unsigned chunks;
|
||||||
short is_recv;
|
short is_recv;
|
||||||
short locking;
|
short locking;
|
||||||
|
short isjob;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct msg_chunk_t {
|
struct msg_chunk_t {
|
||||||
|
|||||||
22
rmps.c
22
rmps.c
@@ -2,6 +2,7 @@
|
|||||||
#include "confparser.h"
|
#include "confparser.h"
|
||||||
#include "agent_pool.h"
|
#include "agent_pool.h"
|
||||||
#include "client_pool.h"
|
#include "client_pool.h"
|
||||||
|
#include "job_queue.h"
|
||||||
#include "rmps.h"
|
#include "rmps.h"
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@@ -172,25 +173,24 @@ static int open_listener(int port)
|
|||||||
sd = socket(addr.sin_family, SOCK_STREAM, 0);
|
sd = socket(addr.sin_family, SOCK_STREAM, 0);
|
||||||
if (sd < 0) {
|
if (sd < 0) {
|
||||||
log(ERROR, "Failed to create socket");
|
log(ERROR, "Failed to create socket");
|
||||||
log(INFO, "RMPS failed to start, shutting down...");
|
goto exit;
|
||||||
atexit(cleanup);
|
|
||||||
}
|
}
|
||||||
if (set_reuse_addr(sd) < 0) {
|
if (set_reuse_addr(sd) < 0) {
|
||||||
log(ERROR, "Failed to set reuse on address - Aborting...", port);
|
log(ERROR, "Failed to set reuse on address - Aborting...", port);
|
||||||
log(INFO, "RMPS failed to start, shutting down...");
|
goto exit;
|
||||||
atexit(cleanup);
|
|
||||||
}
|
}
|
||||||
if (bind(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
|
if (bind(sd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
|
||||||
log(ERROR, "Failed to bind on port: %d - Aborting...", port);
|
log(ERROR, "Failed to bind on port: %d - Aborting...", port);
|
||||||
log(INFO, "RMPS failed to start, shutting down...");
|
goto exit;
|
||||||
atexit(cleanup);
|
|
||||||
}
|
}
|
||||||
if (listen(sd, 10) != 0) {
|
if (listen(sd, 10) != 0) {
|
||||||
log(ERROR, "Failed to start listener on port %d - Aborting...", port);
|
log(ERROR, "Failed to start listener on port %d - Aborting...", port);
|
||||||
log(INFO, "RMPS failed to start, shutting down...");
|
goto exit;
|
||||||
atexit(cleanup);
|
|
||||||
}
|
}
|
||||||
return sd;
|
return sd;
|
||||||
|
exit:
|
||||||
|
log(INFO, "RMPS failed to start, shutting down...");
|
||||||
|
atexit(cleanup);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Init server and create context */
|
/* Init server and create context */
|
||||||
@@ -296,7 +296,11 @@ void launch_rmps(struct conf_table *conf, int fork_flag)
|
|||||||
pool_args[1].size = conf->rmps.client_poolsize;
|
pool_args[1].size = conf->rmps.client_poolsize;
|
||||||
log(VERBOSE, "Creating client thread pool (mutex).");
|
log(VERBOSE, "Creating client thread pool (mutex).");
|
||||||
pthread_create(&pool[1], NULL, client_pool, &pool_args[1]);
|
pthread_create(&pool[1], NULL, client_pool, &pool_args[1]);
|
||||||
|
if (start_job_queue(conf->rmps.agent_poolsize) == FAIL) {
|
||||||
|
log(ERROR, "On start_job_queue(), RMPS failed to start, shutting down...");
|
||||||
|
atexit(cleanup);
|
||||||
|
return;
|
||||||
|
}
|
||||||
pthread_join(pool[0], NULL);
|
pthread_join(pool[0], NULL);
|
||||||
pthread_join(pool[1], NULL);
|
pthread_join(pool[1], NULL);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user