/* * server.cpp * * Simulation of Chandy-Lamport algorithm * * This file is a part of Distributed Systems term thesis * Department KIV, ZCU Plzen * Author: Martin Sloup, msloup@students.zcu.cz */ #include #include #include #include #include #include #include #include #include "server.h" #include "client.h" #include "util.h" #include "packet.h" #include "marker.h" using namespace std; long branch_id; int server_port; pthread_t server_thread; long amount = DEFAULT_AMOUNT; ofstream fileConn; ofstream fileMark; void write_to_conn_log(packet *p); void write_to_mark_log(); void server_handle(int s) { packet p; if (recv(s, &p, sizeof(packet), NULL) != sizeof(packet)) { return; } write_to_conn_log(&p); long value = ntohl(p.value); long source_branch_id = ntohl(p.branch_id); printf("From %ld: %c %ld\n", source_branch_id, p.operation, value); if (p.operation == OPER_GLOBAL_STATE) { printf("Global amount: %ld: %ld\n", source_branch_id, value); } marker_begin(); if (marker_is_marked()) { // is in marking state if (p.operation == OPER_AMOUNT_TRANSFER) { // increase amount amount+= value; //if is not received marker from channel yet, increase global amount if (!marker_is_marked_channel(s)) { marker_add_amount(value); } } else if (p.operation == OPER_MARKER) { // if it is marker, handle it marker_handle(s, (value==1)); } } else { if (p.operation == OPER_AMOUNT_TRANSFER) { //increase amount amount+= value; } else if (p.operation == OPER_MARKER) { // handle marker marker_handle(s, (value==1)); } } write_to_mark_log(); marker_end(); if (p.operation == OPER_FORWARD_MARKER) { // if is received forwarded marker then start new marking marker_start_marking(); } } void *server_thread_run(void *args) { int srv = socket(AF_INET, SOCK_STREAM, 0); if (srv < 0) { fatal("Creating socket failed"); } sockaddr_in serverInfo; serverInfo.sin_family = PF_INET; serverInfo.sin_port = htons(server_port); serverInfo.sin_addr.s_addr = INADDR_ANY; // bind to a server socket if (bind(srv, (struct sockaddr *) &serverInfo, sizeof(sockaddr_in)) < 0) { fatal("Error occurs while binding to server socket"); } // listen on server socket if (listen(srv, 10) < 0) { fatal("Error occurs while listening on server socket"); } int max_socket; fd_set socks; vector::iterator it; while(true) { FD_ZERO(&socks); FD_SET(srv, &socks); max_socket = srv; // add all clients to fd_set for (it = clients.begin(); it != clients.end(); it++) { FD_SET((*it), &socks); if (max_socket < (*it)) { max_socket = (*it); } } struct timeval timeout; timeout.tv_sec = 3; timeout.tv_usec = 0; // wait for new message int n = select(max_socket + 1, &socks, NULL, NULL, &timeout); if (n < 0) { fatal("Select error"); } if (n > 0) { if (FD_ISSET(srv, &socks)) { sockaddr_in clientInfo; socklen_t clientInfoSize = sizeof(sockaddr_in); int c = accept(srv, (sockaddr *) &clientInfo, &clientInfoSize); printf("Accepted new connection...\n"); // add client to client vector clients.push_back(c); } else { // handle a new message for (it = clients.begin(); it!=clients.end(); it++) { if (FD_ISSET((*it), &socks)) { server_handle((*it)); } } } } sleep(0); } return NULL; } void server_start(int port) { server_port = port; // start new server thread pthread_create(&server_thread, NULL, server_thread_run, NULL); write_to_mark_log(); } void server_loop() { pthread_join(server_thread, NULL); } void write_to_conn_log(packet *p) { if (!fileConn.is_open()) { stringstream fileName; fileName << "P" << branch_id << ".log"; fileConn.open(fileName.str().c_str(), std::ios::out | std::ios::trunc); } if (!fileConn.is_open()) return; struct timeval send_at; struct timeval current_time; gettimeofday(¤t_time, NULL); packet_send_at(&send_at, p); string color; stringstream data; switch(p->operation) { case OPER_MARKER: color = "#006600"; data << "M"; break; case OPER_GLOBAL_STATE: color = "#000066"; data << "G" << ntohl(p->value); break; case OPER_FORWARD_MARKER: color = "#660000"; data << "F"; break; default: color = "#000000"; data << ntohl(p->value); break; } fileConn.precision(20); fileConn << "- color: '" << color << "'" << endl; fileConn << " data: " << data.str() << endl; fileConn << " dst: {p: P" << branch_id << ", ts: " << timeval2double(¤t_time) << "}" << endl; fileConn << " src: {p: P" << ntohl(p->branch_id) << ", ts: " << timeval2double(&send_at) << "}" << endl; } void write_to_mark_log() { if (!fileMark.is_open()) { stringstream fileName; fileName << "M" << branch_id << ".log"; fileMark.open(fileName.str().c_str(), std::ios::out | std::ios::trunc); } struct timeval current_time; gettimeofday(¤t_time, NULL); fileMark.precision(20); fileMark << "- {color: '#787878', p: P" << branch_id << ", text: '" << amount; if (marker_is_marked()) { fileMark << ";" << global_amount; } fileMark << "', ts: " << timeval2double(¤t_time) << "}" << endl; }