/* * marker.cpp * * Simulation of Chandy-Lamport algorithm * * This file is a part of Distributed Systems term thesis * Department KIV, ZCU Plzen * Author: Martin Sloup, msloup@students.zcu.cz */ #include #include #include #include #include #include "marker.h" #include "client.h" #include "packet.h" #include "server.h" using namespace std; pthread_t marker_thread; pthread_mutex_t marker_mutex; int marker_source; long global_amount; map marker_channel_marked; bool marker_marking; void marker_begin() { pthread_mutex_lock(&marker_mutex); } void marker_end() { pthread_mutex_unlock(&marker_mutex); } void *marker_thread_run(void *arg) { sleep((random() % 5) + 2); marker_begin(); marker_marking = true; global_amount = amount; marker_source = -1; marker_end(); //send marker to other branches vector::iterator it; for(it = clients.begin(); it != clients.end(); it++) { packet_send((*it), OPER_MARKER, 1); } return NULL; } void marker_reset() { // reset state of marking process map::iterator it; for (it = marker_channel_marked.begin(); it != marker_channel_marked.end(); it++) { (*it).second = false; } marker_marking = false; } void marker_mark_channel(int s) { if (marker_channel_marked.count(s) > 0) { marker_channel_marked[s] = true; } } bool marker_is_marked_channel(int s) { return marker_channel_marked[s]; } bool marker_is_marked_all_channels() { map::iterator it; for (it = marker_channel_marked.begin(); it != marker_channel_marked.end(); it++) { if (!(*it).second) { return false; } } return true; } bool marker_is_marked() { return marker_marking; } void marker_add_amount(long value) { global_amount+= value; } void marker_start_marking() { pthread_create(&marker_thread, NULL, marker_thread_run, NULL); } void marker_start() { // prepare map for storing state of channels vector::iterator it; for (it = clients.begin(); it != clients.end(); it++) { marker_channel_marked.insert(pair((*it), false)); } if (branch_id == 1) { marker_start_marking(); } } void marker_handle(int c, bool source) { //if is marker from source, store the socket for sending global state back if (source) { marker_source = c; } marker_mark_channel(c); if (!marker_marking) { // received marker in non marking state marker_marking = true; global_amount = amount; // send marker to all branches vector::iterator it; for(it = clients.begin(); it != clients.end(); it++) { packet_send((*it), OPER_MARKER, 0); } } // check if all markers are marked if (marker_is_marked_all_channels()) { marker_reset(); if (marker_source == -1) { printf("Global amount: %ld: %ld\n", branch_id, global_amount); // transfer marker rights packet_send(clients.at(random() % clients.size()), OPER_FORWARD_MARKER, 0); } else { // send global state to marker source packet_send(marker_source, OPER_GLOBAL_STATE, global_amount); } } }