/* * client.cpp * * Created on: 30.11.2010 * Author: msloup */ #include #include #include #include #include #include #include #include #include "util.h" #include "protocol.h" #include "store.h" #include "log.h" #define LINE_LEN 1024 #define TIMEOUT 500 using namespace std; vector database; bool force; int client_id = 0; // convert string (input) to long (output), return false on error bool strtol(char *input, long *output) { char *end; *output = strtol(input, &end, 10); return (end != input); } int connect(char *hostname, int port) { // create a socket int c = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (c < 0) { fatal("Creating socket error."); } // prepare hostname_sockaddr using the result from gethostbyname method sockaddr_in hostname_sockaddr; struct hostent *hostnameInfo; hostnameInfo = gethostbyname(hostname); hostname_sockaddr.sin_family = PF_INET; memcpy(&hostname_sockaddr.sin_addr, hostnameInfo->h_addr, hostnameInfo->h_length); hostname_sockaddr.sin_port = htons(port); printf("Connecting to %s:%d...\n", hostname, port); // connect to a server if (connect(c, (sockaddr*) &hostname_sockaddr, sizeof(sockaddr_in)) < 0) { return -1; } printf("Connected.\n"); return c; } void disconnect(int client) { struct protocol_header header; database.clear(); // send a close command and close socket too header = create_header(OP_CLOSE, 0, client_id); if (write(client, &header, PROTOCOL_HDR_LEN) != PROTOCOL_HDR_LEN) { fatal("Sending packet failed"); } close(client); } // waits till socket change or timeout bool wait_for_response(int client, int timeout_in_sec) { fd_set socks; FD_ZERO(&socks); FD_SET(client, &socks); struct timeval timeout; timeout.tv_sec = timeout_in_sec; timeout.tv_usec = 0; if (select(client + 1, &socks, NULL, NULL, &timeout) > 0) { return FD_ISSET(client, &socks); } return false; } void load(int client) { struct protocol_header header; database.clear(); // create a get header and send it header = create_header(OP_GET, 0, client_id); if (write(client, &header, PROTOCOL_HDR_LEN) != PROTOCOL_HDR_LEN) { fatal("Sending packet failed"); } // waits for a response if (!wait_for_response(client, TIMEOUT)) { fatal("Connection timeout"); } // read the response header if (read(client, &header, PROTOCOL_HDR_LEN) != PROTOCOL_HDR_LEN) { fatal("Reading header packet failed"); } // add response to log log_receive(header, true); client_id = ntohl(header.client_id); int count = ntohs(header.count); struct protocol_item *items = new protocol_item[count]; // read response items if (read(client, items, count * PROTOCOL_ITM_LEN) != count * PROTOCOL_ITM_LEN) { fatal("Reading item packet failed"); } // store to database cout << "Received: "; for(int i = 0; i < count; i++) { database.push_back(protocol_item_to_item(items[i])); cout << database.back().value << "(" << database.back().ts << ") "; } cout << endl; delete [] items; } bool save(int client) { vector::iterator it; int count = 0; // count of used items for(it = database.begin(); it != database.end(); it++) { if ((*it).used) count++; } char *buffer = new char[PROTOCOL_HDR_LEN + count * PROTOCOL_ITM_LEN]; struct protocol_header *header = (struct protocol_header *) buffer; // create a set header *header = create_header(OP_SET, count, client_id); struct protocol_item *items = (struct protocol_item*) (buffer + PROTOCOL_HDR_LEN); // fill request with database items unsigned int i = 0; cout << "Sending: "; for(it = database.begin(); it != database.end(); it++) { if ((*it).used) { *items = item_to_protocol_item(i, (*it)); cout << i << "=" << (*it).value << "(" << (*it).ts << "); "; items++; } i++; } cout << endl; // send a request if (send(client, buffer, PROTOCOL_HDR_LEN + count * PROTOCOL_ITM_LEN, 0) != PROTOCOL_HDR_LEN + count * PROTOCOL_ITM_LEN) { warning("Sending packet failed"); return false; } delete buffer; struct protocol_header response_header; // wait for a response if (!wait_for_response(client, TIMEOUT)) { fatal("Connection timeout"); } // read the response if (read(client, &response_header, PROTOCOL_HDR_LEN) != PROTOCOL_HDR_LEN) { fatal("Reading header packet failed"); } // add response to log log_receive(response_header, true); client_id = ntohl(response_header.client_id); bool result = (response_header.operation == OP_OK); if (result) { cout << "Ok." << endl; } else { cout << "Fail." << endl; } database.clear(); return result; } void execute_line(int client, const char *cline, int line_nr) { char line[255]; strcpy((char *)&line, cline); char *token_start = (char *)&line; //trim line if (strlen(line) > 0 && line[strlen(line) - 1] == '\n') line[strlen(line) - 1] = 0; if (strlen(line) > 0 && line[strlen(line) - 1] == '\r') line[strlen(line) - 1] = 0; // skip comments or empty lines if (strlen(line) < 1 || line[0]=='#' || line[0]==';') { return; } cout << "Executing: " << line << endl; // parse command and validate if is allowed or not in context char *command = strsep(&token_start, " "); // SLEEP if (strcasecmp(command, "SLEEP") == 0) { char *parameter = strsep(&token_start, " "); usleep(atol(parameter) * 1000); } else { // if is database empty, COMMIT and ADD command is prohibited if (database.empty()) { if (strcasecmp(command, "COMMIT") == 0 || strcasecmp(command, "ADD") == 0) { fatal("Input error on line %d: Command '%s' not allowed here", line_nr, command); } else if (strcasecmp(command, "BEGIN") == 0) { // load database from server load(client); } else { // unknown command? fatal("Input error on line %d: Unknown command '%s'", line_nr, command); } } else { // BEGIN is prohibited on non empty database if (strcasecmp(command, "BEGIN") == 0) { fatal("Input error on line %d: Command '%s'not allowed here", line_nr, command); } else if (strcasecmp(command, "COMMIT") == 0) { // store local database to server if (!save(client)) { if (force) { warning("Commit failed on line %d. Variables has been already modified", line_nr); } else { fatal("Commit failed on line %d. Variables has been already modified", line_nr); } } } else if (strcasecmp(command, "ADD") == 0) { // parse parameters char *parameter1 = strsep(&token_start, " "); char *parameter2 = strsep(&token_start, " "); char *parameter3 = strsep(&token_start, " "); // validate if (parameter1 == NULL || parameter2 == NULL || parameter3 == NULL) { fatal("Not enough parameters for command '%s' on line %d", command, line_nr); } long parameter1_int; long parameter2_int; long parameter3_int; // convert it to long, validate again :) if (!strtol(parameter1, ¶meter1_int) || !strtol(parameter2, ¶meter2_int) || !strtol(parameter3, ¶meter3_int)) { fatal("Wrong parameter for '%s' on line %d", command, line_nr); } // and validate :), now if is positive if (parameter1_int < 0 || parameter2_int < 0 || parameter3_int < 0) { fatal("Parameters for '%s' must be greater or equal zero on line %d", command, line_nr); } int count = database.size(); // and validate, there its bounds if (parameter1_int >= count || parameter2_int >= count || parameter3_int >= count) { fatal("Index is out of bounds on line %d", line_nr); } // execute ADD :) database.at(parameter3_int).value = database.at(parameter1_int).value + database.at(parameter2_int).value; database.at(parameter1_int).used = true; database.at(parameter2_int).used = true; database.at(parameter3_int).used = true; } else { // unknown command? fatal("Input error on line %d: Unknown command '%s'", line_nr, command); } } } } void execute(char *filename, char *hostname, int port) { string line; // at least open connection to server int client = connect(hostname, port); if (client < 0) { fatal("Connection failed"); } // open file fstream input; int line_counter = 1; try { input.open(filename, fstream::in); while (getline(input, line)) { execute_line(client, line.c_str(), line_counter); line_counter++; } input.close(); } catch (ifstream::failure e) { fatal("Error while reading transaction batch script: %s", e.what()); } disconnect(client); } void show_help(char *exe_file) { cout << "Usage:" << endl; cout << " " << exe_file << " [-f] " << endl; cout << endl; cout << " filename - transaction batch script filename" << endl; cout << " hostname - transaction server hostname to connect" << endl; cout << " port - transaction server port to connect" << endl; cout << endl; cout << " -f - cotinue executing batch file on error while commit" << endl; cout << endl; } int main(int argc, char **argv) { force = false; if (argc < 4 || argc > 5) { show_help(argv[0]); exit(0); } int i = 1; if (strcasecmp(argv[i], "-f") == 0) { force = true; i++; } execute(argv[i], argv[i + 1], atoi(argv[i + 2])); }