diff --git a/Makefile b/Makefile index 7c3dc4d..bf3a10e 100755 --- a/Makefile +++ b/Makefile @@ -16,7 +16,8 @@ ToolFrameworkInclude= -I $(Dependencies)/ToolFrameworkCore/include ToolDAQFrameworkLib= -L $(Dependencies)/ToolDAQFramework/lib -lDAQStore -lDAQDataModelBase -lServiceDiscovery ToolDAQFrameworkInclude= -I $(Dependencies)/ToolDAQFramework/include -CXXFLAGS= -g -fdiagnostics-color=always -Wno-attributes -O3 +#CXXFLAGS= -g -O0 -fno-omit-frame-pointer -fdiagnostics-color=always -Wno-attributes +CXXFLAGS= -fdiagnostics-color=always -Wno-attributes -O3 all: middleman diff --git a/ReceiveSQL.cpp b/ReceiveSQL.cpp index c593866..dac7e47 100755 --- a/ReceiveSQL.cpp +++ b/ReceiveSQL.cpp @@ -81,14 +81,27 @@ bool ReceiveSQL::Execute(){ if(in_polls.at(i).revents & ZMQ_POLLIN){ switch(i){ case 0: { - //pollsmsg+="M"; too many... + //log + //pollsmsg+="L"; too many... break; } case 1: { - pollsmsg+="R"; + //mon + //pollsmsg+="M"; too many... break; } case 2: { + //read + pollsmsg+="R"; + break; + } + case 3: { + // middleman + pollsmsg+="MM"; + break; + } + case 4: { + // write pollsmsg+="W"; break; } @@ -196,7 +209,8 @@ bool ReceiveSQL::Finalise(){ if(utilities) utilities->RemoveService("middleman"); Log("Closing multicast socket",3); - close(multicast_socket); + close(log_socket); + close(mon_socket); Log("Deleting Utilities",3); if(utilities){ delete utilities; utilities=nullptr; } @@ -293,16 +307,18 @@ bool ReceiveSQL::InitMulticast(Store& m_variables){ /* Multicast Setup */ /* ----------------------------------------- */ - int multicast_port = 55554; - std::string multicast_address = "239.192.1.1"; // FIXME suitable default? + int log_port = 55554; + int mon_port = 55553; + std::string multicast_address = "239.192.1.1"; - // FIXME add to config file - m_variables.Get("multicast_port",multicast_port); + m_variables.Get("log_port",log_port); + m_variables.Get("mon_port",mon_port); m_variables.Get("multicast_address",multicast_address); // set up multicast socket for sending logging & monitoring data - multicast_socket = socket(AF_INET, SOCK_DGRAM, 0); - if(multicast_socket<=0){ + log_socket = socket(AF_INET, SOCK_DGRAM, 0); + mon_socket = socket(AF_INET, SOCK_DGRAM, 0); + if(log_socket<=0 || mon_socket<=0){ Log(std::string{"Failed to open multicast socket with error "}+strerror(errno),v_error); return false; } @@ -311,45 +327,63 @@ bool ReceiveSQL::InitMulticast(Store& m_variables){ struct linger l; l.l_onoff = 0; // whether to linger l.l_linger = 0; // seconds to linger for - get_ok = setsockopt(multicast_socket, SOL_SOCKET, SO_LINGER, (char*) &l, sizeof(l)); - int a =1; - setsockopt(multicast_socket, SOL_SOCKET, SO_REUSEADDR, &a, sizeof(a)); + get_ok = setsockopt(log_socket, SOL_SOCKET, SO_LINGER, (char*) &l, sizeof(l)); + get_ok = get_ok || setsockopt(mon_socket, SOL_SOCKET, SO_LINGER, (char*) &l, sizeof(l)); if(get_ok!=0){ Log(std::string{"Failed to set multicast socket linger with error "}+strerror(errno),v_error); return false; } + // disable blocking connections to this ip+port fomr TIME_WAIT after closure. + // this is intended to prevent delivery of delayed packets to the wrong application, + // but means a new middleman instance won't be able to bind for 30-120 seconds after another closes. + int a =1; + get_ok = setsockopt(log_socket, SOL_SOCKET, SO_REUSEADDR, &a, sizeof(a)); + get_ok = get_ok || setsockopt(mon_socket, SOL_SOCKET, SO_REUSEADDR, &a, sizeof(a)); + if(get_ok!=0){ + Log(std::string{"Failed to set multicast socket reuseaddr with error "}+strerror(errno),v_error); + return false; + } // set the socket to non-blocking mode - seems like a good idea...? XXX - get_ok = fcntl(multicast_socket, F_SETFL, O_NONBLOCK); + get_ok = fcntl(log_socket, F_SETFL, O_NONBLOCK); + get_ok = get_ok || fcntl(mon_socket, F_SETFL, O_NONBLOCK); if(get_ok!=0){ Log(std::string{"Failed to set multicast socket to non-blocking with error "}+strerror(errno),v_error); - //return false; + return false; } // format destination address from IP string - bzero((char *)&multicast_addr, sizeof(multicast_addr)); // init to 0 - multicast_addr.sin_family = AF_INET; - multicast_addr.sin_port = htons(multicast_port); + bzero((char *)&log_addr, sizeof(log_addr)); // init to 0 + log_addr.sin_family = AF_INET; + log_addr.sin_port = htons(log_port); + // sockaddr_in is just a struct - copy logging and just change port member + mon_addr = log_addr; + mon_addr.sin_port = htons(mon_port); + // sending: which multicast group to send to - get_ok = inet_aton(multicast_address.c_str(), &multicast_addr.sin_addr); - if(get_ok==0){ + get_ok = inet_aton(multicast_address.c_str(), &log_addr.sin_addr); + get_ok = get_ok && inet_aton(multicast_address.c_str(), &mon_addr.sin_addr); + if(get_ok==0){ // returns 0 if invalid, unlike other functions Log("Bad multicast address '"+multicast_address+"'",v_error); return false; } - multicast_addrlen = sizeof(multicast_addr); + // used in send & receive; will be the same for both log & mon + multicast_addrlen = sizeof(log_addr); + /* // for two-way comms, we should bind to INADDR_ANY, not a specific multicast address.... maybe? struct sockaddr_in multicast_addr2; bzero((char *)&multicast_addr2, sizeof(multicast_addr2)); // init to 0 multicast_addr2.sin_family = AF_INET; - multicast_addr2.sin_port = htons(multicast_port); + multicast_addr2.sin_port = htons(log_port); multicast_addr2.sin_addr.s_addr = htonl(INADDR_ANY); << like this */ // to listen we need to bind to the socket - get_ok = bind(multicast_socket, (struct sockaddr*)&multicast_addr, sizeof(multicast_addr)); - if(get_ok<0) { + get_ok = (bind(log_socket, (struct sockaddr*)&log_addr, multicast_addrlen) == 0); + get_ok = get_ok && (bind(mon_socket, (struct sockaddr*)&mon_addr, multicast_addrlen) == 0); + if(!get_ok) { Log("Failed to bind to multicast listen socket",v_error); return false; } @@ -362,15 +396,16 @@ bool ReceiveSQL::InitMulticast(Store& m_variables){ Log("Bad multicast group '"+multicast_address+"'",v_error); return false; } - get_ok = setsockopt(multicast_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); - if(get_ok<0){ + get_ok = setsockopt(log_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + get_ok = get_ok || setsockopt(mon_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + if(get_ok!=0){ Log("Failed to join multicast group",v_error); return false; } - // we can poll with zmq ... - in_polls.emplace_back(zmq::pollitem_t{NULL, multicast_socket, ZMQ_POLLIN, 0}); - out_polls.emplace_back(zmq::pollitem_t{NULL, multicast_socket, ZMQ_POLLOUT, 0}); + // we can poll with zmq... + in_polls.emplace_back(zmq::pollitem_t{NULL, log_socket, ZMQ_POLLIN, 0}); + in_polls.emplace_back(zmq::pollitem_t{NULL, mon_socket, ZMQ_POLLIN, 0}); return true; } @@ -640,6 +675,9 @@ bool ReceiveSQL::InitControls(Store& m_variables){ SC_vars.Add("Quit",SlowControlElementType(BUTTON)); SC_vars["Quit"]->SetValue(false); + SC_vars.Add("ResetStats",SlowControlElementType(BUTTON)); + SC_vars["ResetStats"]->SetValue(false); + SC_vars.Add("Status",SlowControlElementType(INFO)); SC_vars["Status"]->SetValue("Initialising"); @@ -820,13 +858,15 @@ bool ReceiveSQL::FindNewClients_v2(){ clientsmap.at(aservice.first)+="W"; } } - std::string clientlist; for(std::pair& aclient : clientsmap){ clientlist += aclient.first+": "+aclient.second+","; } - clientlist.pop_back(); // remove trailing ',' - SC_vars["Clients"]->SetValue(clientlist); + if(clientlist.size()>0){ + // if middleman is only connection, client list will be empty + clientlist.pop_back(); // remove trailing ',' + SC_vars["Clients"]->SetValue(clientlist); + } } else { Log("No new clients found",21); @@ -840,7 +880,7 @@ bool ReceiveSQL::FindNewClients_v2(){ bool ReceiveSQL::GetClientWriteQueries(){ // see if we had any write requests from clients - if(in_polls.at(3).revents & ZMQ_POLLIN){ + if(in_polls.at(4).revents & ZMQ_POLLIN){ Log(">>> got a write query from client",3); ++write_queries_recvd; @@ -1231,22 +1271,22 @@ bool ReceiveSQL::WriteRootPlotToQuery(const std::string& message, BStore& plot, bool ReceiveSQL::WritePlotlyPlotToQuery(const std::string& message, BStore& plot, std::string& db_out, std::string& sql_out) { db_out = "daq"; // FIXME db Postgres& a_database = m_databases.at(db_out); - + std::string name; std::string traces; std::string layout; uint64_t timestamp = 0; uint64_t version = -1; get_ok = plot.Get("name", name); - get_ok &= plot.JsonEncode("traces", traces); - get_ok &= plot.JsonEncode("layout", layout); plot.Get("version", version); plot.Get("timestamp", timestamp); + get_ok &= plot.JsonEncode("traces", traces); + get_ok &= plot.JsonEncode("layout", layout); if (!get_ok) { Log("WritePlotlyPlotToQuery: missing fields in message '" + message + "'", v_error); return false; } - + // SQL sanitization get_ok = a_database.pqxx_quote(name, name); get_ok &= a_database.pqxx_quote(traces, traces); @@ -1261,7 +1301,11 @@ bool ReceiveSQL::WritePlotlyPlotToQuery(const std::string& message, BStore& plot // (the trailing "+0100" is number of [hours][mins] in local timezone relative to UTC) std::string timestring; get_ok = TimeStringFromUnixMs(timestamp, timestring); - if (!get_ok) return false; + if (!get_ok){ + // already reported by function + //Log("WritePlotlyPlotToQuery: error forming timestamp string",v_error); + return false; + } sql_out = "INSERT INTO plotlyplots ( name, time, version, traces, layout ) VALUES ( " + name + ",'" @@ -1270,7 +1314,6 @@ bool ReceiveSQL::WritePlotlyPlotToQuery(const std::string& message, BStore& plot + traces + "," + layout + ") returning version;"; Log(Concat("Resulting SQL: '", sql_out, "', database: '", db_out, "'"), 4); - return true; } @@ -1316,7 +1359,7 @@ bool ReceiveSQL::WriteMessageToQuery(const std::string& topic, const std::string bool ReceiveSQL::GetClientReadQueries(){ // check if we had any read transactions dealt to us - if(in_polls.at(1).revents & ZMQ_POLLIN){ + if(in_polls.at(2).revents & ZMQ_POLLIN){ Log(">>> got a read query from client",3); ++read_queries_recvd; @@ -1628,7 +1671,7 @@ bool ReceiveSQL::ReadPlotlyPlotToQuery(const std::string& message, BStore& reque } else { sql_out += " AND version = " + std::to_string(version); } - + Log(Concat("Resulting SQL: '", sql_out, "', database: '", db_out, "'"), 4); return true; @@ -1675,47 +1718,54 @@ bool ReceiveSQL::GetMulticastMessages(){ // check for incoming message // see if we had any multicast messages - if(in_polls.at(0).revents & ZMQ_POLLIN){ - Log(">>> got a multicast message from client",10); - ++multicasts_recvd; - - // read the messge - char message[655355]; // theoretical maximum UDP buffer size - int cnt = recvfrom(multicast_socket, message, sizeof(message), 0, (struct sockaddr*)&multicast_addr, &multicast_addrlen); - if(cnt <= 0){ - Log(std::string{"Failed to receive on multicast socket with error '"}+strerror(errno)+"'",v_error); - ++multicast_recv_fails; - return false; - } - - Log("Received multicast message from "+std::string{inet_ntoa(multicast_addr.sin_addr)} - +": '"+std::string{message}+"'",12); - - std::string database; - std::string query; - std::string topic; - get_ok = MulticastMessageToQuery(message, topic, database, query); - - if(!get_ok){ - ++multicast_recv_fails; - return false; - } - - // FIXME for now all messages go to daq database, - // probably need to make this a pair at least with first element a DB connection or name - if(topic=="logging" || topic=="monitoring" || topic=="rootplot"){ - in_multicast_queue.emplace_back(query); - Log("Put "+topic+" msg in queue: '"+query+"'",12); + // logging and monitoring come on different sockets (different ports) + // but code to handle them is the same + std::vector sockets{log_socket,mon_socket}; + for(size_t i=0; i<2; ++i){ + if(in_polls.at(i).revents & ZMQ_POLLIN){ + Log(">>> got a multicast message from client",10); + (i==0) ? ++logs_recvd : ++mons_recvd; + int multicast_socket = sockets[i]; + struct sockaddr_in* multicast_addr = (i==0) ? &log_addr : &mon_addr; - } else { - // could not determine multicast type - Log(std::string{"Unrecognised topic '"}+topic+"' in multicast message '"+message+"'",v_error); - ++multicast_recv_fails; - return false; + // read the messge + //char buf[655355]; // theoretical maximum UDP buffer size. move to member + int cnt = recvfrom(multicast_socket, buf, sizeof(buf), 0, (struct sockaddr*)multicast_addr, &multicast_addrlen); + if(cnt <= 0){ + Log(std::string{"Failed to receive on multicast socket with error '"}+strerror(errno)+"'",v_error); + (i==0) ? ++log_recv_fails : ++mon_recv_fails; + return false; + } - } - - } /*else { std::cout<<"no multicast messages"<sin_addr)} + +": '"+std::string{buf}+"'",12); + + std::string database; + std::string query; + std::string topic= (i==0) ? "logging" : "monitoring"; + get_ok = MulticastMessageToQuery(buf, topic, database, query); + + if(!get_ok){ + (i==0) ? ++log_recv_fails : ++mon_recv_fails; + return false; + } + + // FIXME for now all messages go to daq database, + // probably need to make this a pair at least with first element a DB connection or name + if(topic=="logging" || topic=="monitoring" || topic=="rootplot"){ + in_multicast_queue.emplace_back(query); + Log("Put "+topic+" msg in queue: '"+query+"'",12); + + } else { + // could not determine multicast type + Log(std::string{"Unrecognised topic '"}+topic+"' in multicast message '"+buf+"'",v_error); + (i==0) ? ++log_recv_fails : ++mon_recv_fails; + return false; + + } + + } /*else { std::cout<<"no multicast messages"< outputs; @@ -1999,7 +2055,7 @@ bool ReceiveSQL::GetMiddlemanCheckin(){ // but perhaps this message is stale: // re-poll the socket and see if there is another message in the buffer try { - get_ok = zmq::poll(&in_polls.at(2), 1, 0); + get_ok = zmq::poll(&in_polls.at(3), 1, 0); } catch (zmq::error_t& err){ std::cerr<<"ReceiveSQL::GetMiddlemanCheckin poller caught "<second; @@ -2248,28 +2304,23 @@ bool ReceiveSQL::SendNextMulticast(){ // send next logging message to the master, if we have one in the queue if(out_multicast_queue.size()){ - // check we had a listener ready - if(out_polls.at(0).revents & ZMQ_POLLOUT){ - - // OK to send! Get the message - std::string& message = out_multicast_queue.front(); + // Get the message + std::string& message = out_multicast_queue.front(); + + int cnt = sendto(log_socket, message.c_str(), message.length()+1, 0, (struct sockaddr*)&log_addr, multicast_addrlen); + if(cnt < 0){ + std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; + Log(errmsg,v_error); + out_multicast_queue.pop_front(); + ++multicast_send_fails; + return false; - int cnt = sendto(multicast_socket, message.c_str(), message.length()+1, 0, (struct sockaddr*)&multicast_addr, multicast_addrlen); - if(cnt < 0){ - std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; - Log(errmsg,v_error); - out_multicast_queue.pop_front(); - ++multicast_send_fails; - return false; - - } else { - // sent successfully, remove from the to-send queue - out_multicast_queue.pop_front(); - ++multicasts_sent; - - } // end send ok check + } else { + // sent successfully, remove from the to-send queue + out_multicast_queue.pop_front(); + ++multicasts_sent; - } // else no available listeners + } // end send ok check } // else no log messages to send @@ -2285,7 +2336,7 @@ bool ReceiveSQL::BroadcastPresence(){ if(elapsed_time.is_negative()){ - if(out_polls.at(2).revents & ZMQ_POLLOUT){ + if(out_polls.at(1).revents & ZMQ_POLLOUT){ ++mm_broadcasts_sent; uint32_t msg = am_master; @@ -2459,6 +2510,10 @@ bool ReceiveSQL::UpdateControls(){ SC_vars["Quit"]->GetValue(quit); if(quit) DoQuit(quit); + bool reset=false; + SC_vars["ResetStats"]->GetValue(reset); + if(reset) ResetStats(reset); + return true; } @@ -2501,8 +2556,10 @@ bool ReceiveSQL::TrackStats(){ MonitoringStore.Set("write_query_recv_fails", write_query_recv_fails); MonitoringStore.Set("read_queries_recvd", read_queries_recvd); MonitoringStore.Set("read_query_recv_fails", read_query_recv_fails); - MonitoringStore.Set("multicasts_recvd", multicasts_recvd); - MonitoringStore.Set("multicast_recv_fails", multicast_recv_fails); + MonitoringStore.Set("logs_recvd", logs_recvd); + MonitoringStore.Set("mons_recvd", mons_recvd); + MonitoringStore.Set("log_recv_fails", log_recv_fails); + MonitoringStore.Set("mon_recv_fails", mon_recv_fails); MonitoringStore.Set("mm_broadcasts_recvd", mm_broadcasts_recvd); MonitoringStore.Set("mm_broadcast_recv_fails", mm_broadcast_recv_fails); MonitoringStore.Set("write_queries_failed", write_queries_failed); @@ -2542,7 +2599,8 @@ bool ReceiveSQL::TrackStats(){ std::stringstream status; status << " read qrys (rcvd/rcv errs/qry errs):["< messages; - int ret = PollAndReceive(mm_rcv_socket, in_polls.at(2), inpoll_timeout, messages); + int ret = PollAndReceive(mm_rcv_socket, in_polls.at(3), inpoll_timeout, messages); // chech for errors if(ret==-3) Log("Error polling in socket in NegotiateMaster() call!",0); @@ -2766,7 +2825,7 @@ bool ReceiveSQL::NegotiationRequest(){ // they must've opened negotiations at the same time we did. They may be expecting a reply. // send the reply - int ret = PollAndSend(mm_snd_socket, out_polls.at(2), outpoll_timeout, my_id, our_header, our_timestamp); + int ret = PollAndSend(mm_snd_socket, out_polls.at(1), outpoll_timeout, my_id, our_header, our_timestamp); // handle errors if(ret==-3) Log("Error polling out socket in NegotiateMaster() call!",0); @@ -2825,7 +2884,7 @@ bool ReceiveSQL::NegotiationReply(const std::string& their_header, const std::st } // inform the other middleman - int ret = PollAndSend(mm_snd_socket, out_polls.at(2), 500, my_id, our_header, our_timestamp); + int ret = PollAndSend(mm_snd_socket, out_polls.at(1), 500, my_id, our_header, our_timestamp); // handle errors if(ret==-3) Log("Error polling out socket in NegotiateMaster() call!",0); @@ -2884,8 +2943,7 @@ bool ReceiveSQL::UpdateRole(){ // disconnect from the write ports again delete clt_sub_socket; clt_sub_socket=nullptr; - // remove the polls - in_polls.pop_back(); + // remove the poll in_polls.pop_back(); return false; @@ -2910,7 +2968,6 @@ bool ReceiveSQL::UpdateRole(){ delete clt_sub_socket; clt_sub_socket=nullptr; // remove the associated polls in_polls.pop_back(); - in_polls.pop_back(); // drop any outstanding write messages wrt_txn_queue.clear(); @@ -3300,26 +3357,22 @@ bool ReceiveSQL::Log(const std::string& message, uint32_t message_severity){ // ««-------------- ≪ °◇◆◇° ≫ --------------»» bool ReceiveSQL::DoStop(bool stop){ - if(stop){ - // make stop flag file which will trigger finalise and termination - std::string cmd = "touch "+stopfile; - std::system(cmd.c_str()); - SC_vars["Restart"]->SetValue(false); - SC_vars["Status"]->SetValue("Stopping"); - } + if(!stop) return true; + // make stop flag file which will trigger finalise and termination + std::string cmd = "touch "+stopfile; + std::system(cmd.c_str()); + SC_vars["Restart"]->SetValue(false); + SC_vars["Status"]->SetValue("Stopping"); return true; } bool ReceiveSQL::DoQuit(bool quit){ - if(quit){ - // make stop flag file to stop this executable - DoStop(true); - // make quit flag file to prevent run_middleman.sh re-starting us - std::string cmd = "touch "+quitfile; - std::system(cmd.c_str()); - SC_vars["Quit"]->SetValue(false); - SC_vars["Status"]->SetValue("Quitting"); - } + if(!quit) return true; + // make quit flag file to prevent run_middleman.sh re-starting us + std::string cmd = "touch "+quitfile; + std::system(cmd.c_str()); + SC_vars["Quit"]->SetValue(false); + SC_vars["Status"]->SetValue("Quitting"); return true; } @@ -3330,3 +3383,56 @@ bool ReceiveSQL::DoQuit(bool quit){ //https://stackoverflow.com/questions/56961111/questions-about-postgres-track-commit-timestamp-pg-xact-commit-timestamp //https://newbedev.com/how-to-find-out-when-data-was-inserted-to-postgres + +bool ReceiveSQL::ResetStats(bool reset){ + if(!reset) return true; + + min_loop_ms=0; + max_loop_ms=0; + loops=0; + write_queries_recvd=0; + write_query_recv_fails=0; + read_queries_recvd=0; + read_query_recv_fails=0; + logs_recvd=0; + mons_recvd=0; + log_recv_fails=0; + mon_recv_fails=0; + mm_broadcasts_recvd=0; + mm_broadcast_recv_fails=0; + write_queries_failed=0; + multicast_queries_failed=0; + read_queries_failed=0; + reps_sent=0; + rep_send_fails=0; + multicasts_sent=0; + multicast_send_fails=0; + mm_broadcasts_sent=0; + mm_broadcasts_failed=0; + master_clashes=0; + master_clashes_failed=0; + standby_clashes=0; + standby_clashes_failed=0; + self_promotions=0; + self_promotions_failed=0; + promotions=0; + promotions_failed=0; + demotions=0; + demotions_failed=0; + dropped_writes=0; + dropped_reads=0; + dropped_resps=0; + dropped_multicast_in=0; + dropped_logs_out=0; + dropped_monitoring_out=0; + + MonitoringStore.Set("write_queries_recvd", 0); + MonitoringStore.Set("read_queries_recvd", 0); + + last_stats_calc = boost::posix_time::microsec_clock::universal_time(); + std::string timestring; + TimeStringFromUnixSec(0, timestring); + SC_vars["ResetStats"]->SetValue(false); + + return true; +} diff --git a/ReceiveSQL.h b/ReceiveSQL.h index da7843c..621c401 100755 --- a/ReceiveSQL.h +++ b/ReceiveSQL.h @@ -72,6 +72,7 @@ class ReceiveSQL{ bool DoStop(bool stop); bool DoQuit(bool quit); bool TrackStats(); + bool ResetStats(bool reset); bool Finalise(); @@ -130,12 +131,15 @@ class ReceiveSQL{ std::map clt_sub_connections; // multicast socket file descriptor - int multicast_socket=-1; + int log_socket=-1; + int mon_socket=-1; // multicast destination address structure - struct sockaddr_in multicast_addr; + struct sockaddr_in log_addr; + struct sockaddr_in mon_addr; socklen_t multicast_addrlen; // apparently works with zmq poller? zmq::pollitem_t multicast_poller; + char buf[655355]; // buffer for multicast messages // poll timeouts int inpoll_timeout; @@ -227,8 +231,10 @@ class ReceiveSQL{ unsigned long write_query_recv_fails = 0; unsigned long read_queries_recvd = 0; unsigned long read_query_recv_fails = 0; - unsigned long multicasts_recvd = 0; - unsigned long multicast_recv_fails = 0; + unsigned long logs_recvd = 0; + unsigned long mons_recvd = 0; + unsigned long log_recv_fails = 0; + unsigned long mon_recv_fails = 0; unsigned long mm_broadcasts_recvd = 0; unsigned long mm_broadcast_recv_fails = 0; diff --git a/ReceiveSQLConfig b/ReceiveSQLConfig index 213caab..0ec2b18 100644 --- a/ReceiveSQLConfig +++ b/ReceiveSQLConfig @@ -14,10 +14,10 @@ dbname daq clt_sub_port 55556 clt_rtr_port 55555 mm_snd_port 55597 -log_pub_port 55554 -remote_control_port 24011 -multicast_port 55554 +log_port 55554 +mon_port 55553 multicast_address 239.192.1.1 +remote_control_port 24011 # all timeouts in ms clt_sub_socket_timeout 500 @@ -48,6 +48,6 @@ cache_period_ms 10000 # if writes are sent to the read-only port, but we're master, should we accept them? handle_unexpected_writes 0 stats_period_ms 60000 -stdio_verbosity 6 +stdio_verbosity 1 db_verbosity 1 service_discovery_config ServiceDiscoveryConfig