#include "agent_pool.h" #include "log.h" #include "protocol.h" #include #include #define MAXJOBS 10 struct agent_args { int busy; SSL *ssl; int sd; char ip[16]; /* IPv4 */ }; static void show_certs(SSL *ssl); static void* servlet(void *args); static void send_reject_msg(SSL *ssl); static void show_certs(SSL *ssl) { X509 *cert; char *line; cert = SSL_get_peer_certificate(ssl); /* Get certificates (if available) */ if (SSL_get_verify_result(ssl)==X509_V_OK) log(VERBOSE, "get_verify_result == ok"); if (cert != NULL) { log(VERBOSE, "Server certificates:"); line = X509_NAME_oneline(X509_get_subject_name(cert), 0, 0); log(VERBOSE, "Subject: %s", line); free(line); line = X509_NAME_oneline(X509_get_issuer_name(cert), 0, 0); log(VERBOSE, "Issuer: %s", line); free(line); X509_free(cert); } else log(VERBOSE, "No certificates from peer"); } static void* servlet(void *args) /* Serve the connection -- threadable */ { struct msg_t buf; int bytes, ret; //unsigned short job[MAXJOBS] = { 0 }; struct agent_args *agent = (struct agent_args*)args; SSL_load_error_strings(); ret = SSL_accept(agent->ssl); /* We check for unclean (ret < 0) and clean (ret == 0) failures */ if (ret <= 0) { log(WARNING, "SSL_accept() failed. Reason below:"); log_ssl(); } else { show_certs(agent->ssl); do { buf.meta.type = GET_MEMORY; sleep(1); SSL_write(agent->ssl, &buf, sizeof(buf)); bytes = SSL_read(agent->ssl, &buf, sizeof(buf)); if (bytes > 0) { if (bytes != sizeof(struct msg_t)) { log( WARNING, "Agent [%s] sent non-standard data!", agent->ip ); continue; } log(VERBOSE, "Client msg: \"%s\"", buf.chunk.data); /* TODO: Insert msg handler here */ continue; } if (SSL_get_shutdown(agent->ssl) == SSL_RECEIVED_SHUTDOWN) log(VERBOSE, "SSL_RECEIVED_SHUTDOWN from agent [%s]", agent->ip); else { log(VERBOSE, "Client didn't send data! SSL error below:"); //log_ssl(); /* We actually don't have anything to log from SSL */ sprintf((char*)buf.chunk.data, "%s", "Where's the data, m8?"); SSL_write(agent->ssl, &buf, sizeof(struct msg_t)); } log(INFO, "Agent [%s] disconnected.", agent->ip); } while (bytes); } SSL_free(agent->ssl); /* release SSL state */ close(agent->sd); /* close connection */ agent->busy = 0; pthread_detach(pthread_self()); pthread_exit(NULL); } static void send_reject_msg(SSL *ssl) { char *reply = "FAILURE - The connection queue is full!\n"; SSL_write(ssl, reply, strlen(reply)); } void* agent_pool(void *args) { struct pool_data *pool = args; pthread_mutex_t mutex; pthread_attr_t attr; pthread_t *agent_thread = (pthread_t*)malloc(pool->size * sizeof(pthread_t)); struct agent_args *agent_struct = (struct agent_args*)malloc(pool->size * sizeof(struct agent_args)); int i; memset(agent_thread, 0, sizeof(pthread_t) * pool->size); memset(agent_struct, 0, sizeof(struct agent_args) * pool->size); pthread_mutex_init(&mutex, NULL); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); while (1) { struct sockaddr_in addr; char address[INET6_ADDRSTRLEN]; socklen_t len = sizeof(addr); SSL *ssl; int agent = accept(pool->srv, (struct sockaddr*)&addr, &len); log(INFO, "Connection: %s:%d", inet_ntop(AF_INET, &addr.sin_addr, address, sizeof(address)), ntohs(addr.sin_port)); for (i = 0; i < pool->size; i++) { if (!agent_struct[i].busy) { agent_struct[i].busy = 1; agent_struct[i].ssl = SSL_new(pool->ctx); agent_struct[i].sd = agent; memcpy( agent_struct[i].ip, inet_ntop(AF_INET, &addr.sin_addr, address, sizeof(address)), sizeof(agent_struct[i].ip) ); SSL_set_fd(agent_struct[i].ssl, agent_struct[i].sd); pthread_create( &agent_thread[i], &attr, servlet, &agent_struct[i] ); break; } } if (i == pool->size) { log( WARNING, "Agent [%s] dropped. Poolsize limit reached.", inet_ntop(AF_INET, &addr.sin_addr, address, sizeof(address)) ); ssl = SSL_new(pool->ctx); SSL_set_fd(ssl, agent); if (SSL_accept(ssl) == FAIL) { SSL_free(ssl); close(agent); continue; } send_reject_msg(ssl); SSL_free(ssl); close(agent); } } pthread_attr_destroy(&attr); pthread_mutex_destroy(&mutex); pthread_detach(pthread_self()); pthread_exit(NULL); }