/* * server.cpp * * Created on: 28.11.2010 * Author: msloup */ #include #include #include #include #include #include #include #include #include #include "protocol.h" #include "store.h" #include "util.h" #include "log.h" #define DEFAULT_DATABASE "server.db" #define OP_EXEC_TIME_US 20000 using namespace std; int last_client_id = 0; pthread_mutex_t mutex; pthread_t thread[2000]; void handle_set_request(int client, int count, int client_id) { struct protocol_item *protocol_items = new protocol_item[count]; if (count > 0) { if (read(client, protocol_items, count * PROTOCOL_ITM_LEN) != count * PROTOCOL_ITM_LEN) { warning("Not enough items received in packet"); return; } } int size = store_size(); struct protocol_header header; // test validity for (int i = 0; i < count; i++) { struct item item = protocol_item_to_item(protocol_items[i]); if (i >= size || item.ts != store_get(protocol_item_index(protocol_items[i])).ts) { header = create_header(OP_ERROR, 0, client_id); warning("Received items are more than items in database or received timestamp not equal with database one."); if (write(client, &header, PROTOCOL_HDR_LEN) != PROTOCOL_HDR_LEN) { warning("Error occurs while sending packet"); } return; } } // all is ok, set item values to database and increase ts if value is changed for (int i = 0; i < count; i++) { int index = protocol_item_index(protocol_items[i]); struct item item = protocol_item_to_item(protocol_items[i]); if (store_get(index).value != item.value) { item.ts++; store_set(index, item); } } cout << "Database state: "; vector::iterator it; for(it = store_begin(); it != store_end(); it++) { cout << (*it).value << "(" << (*it).ts << ") "; } cout << endl; header = create_header(OP_OK, 0, client_id); if (write(client, &header, PROTOCOL_HDR_LEN) != PROTOCOL_HDR_LEN) { warning("Error occurs while sending packet"); } } void handle_get_request(int client, int client_id) { int count = store_size(); int size = PROTOCOL_HDR_LEN + count * PROTOCOL_ITM_LEN; char *buffer = new char[size]; struct protocol_header *header = (struct protocol_header *) buffer; *header = create_header(OP_OK, count, client_id); struct protocol_item *items = (struct protocol_item*) (buffer + PROTOCOL_HDR_LEN); int i = 0; vector::iterator it; for(it = store_begin(); it != store_end(); it++) { items[i] = item_to_protocol_item(i, (*it)); i++; } if (send(client, buffer, size, 0) < 0) { warning("Sending packet failed"); return; } delete buffer; } void *handle_request(void *arg) { struct protocol_header header; bool terminated = false; int client = (int) arg; int client_id; while(!terminated) { if (read(client, &header, PROTOCOL_HDR_LEN) != PROTOCOL_HDR_LEN) { warning("Error occurs while receiving packet"); terminated = true; continue; } printf("Received operation: %c; items count: %d\n", header.operation, ntohs(header.count)); // atomic operation pthread_mutex_lock(&mutex); client_id = ntohl(header.client_id); if (client_id == 0) { client_id = ++last_client_id; header.client_id = htonl(client_id); } usleep(OP_EXEC_TIME_US * ((random() % 12) + 1)); log_receive(header, false); switch(header.operation) { case OP_GET: handle_get_request(client, client_id); break; case OP_SET: handle_set_request(client, ntohs(header.count), client_id); break; case OP_CLOSE: terminated = true; break; default: warning("Received unknown operation"); break; } pthread_mutex_unlock(&mutex); } close(client); return NULL; } void server_loop(int port) { int srv = socket(AF_INET, SOCK_STREAM, 0); if (srv < 0) { fatal("Creating socket failed"); } struct sockaddr_in serverInfo; serverInfo.sin_family = PF_INET; serverInfo.sin_port = htons(port); serverInfo.sin_addr.s_addr = INADDR_ANY; // bind to a server socket if (bind(srv, (struct sockaddr *) &serverInfo, sizeof(sockaddr_in)) < 0) { fatal("Error occurs while binding to server socket"); } // listen on server socket if (listen(srv, 10) < 0) { fatal("Error occurs while listening on server socket"); } int client; struct sockaddr_in clientInfo; socklen_t clientInfoSize = sizeof(struct sockaddr_in); int t = 0; while((client = accept(srv, (sockaddr *) &clientInfo, &clientInfoSize)) > 0) { pthread_create(&thread[t++], NULL, handle_request, (void *) client); } close(srv); } void show_help(char *exe_file) { cout << "Usage:" << endl; cout << " " << exe_file << " []" << endl; cout << endl; cout << " port - port to bind server" << endl; cout << " database - database filename with variable values (default: " << DEFAULT_DATABASE << ")" << endl; } int main(int argc, char **argv) { // hammer on child process zombies signal(SIGCLD, SIG_IGN); if (argc < 2 || argc > 3) { show_help(argv[0]); exit(0); } if (argc == 3) { store_load(argv[2]); } else { store_load(DEFAULT_DATABASE); } server_loop(atoi(argv[1])); }