Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/ServiceDiscovery/Services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool Services::SendAlarm(const std::string& message, unsigned int level, const s
+ ",\"severity\":0"
+ ",\"message\":\"" + message + "\"}";

ok = ok && m_backend_client.SendMulticast(cmd_string, &err);
ok = ok && m_backend_client.SendMulticast(MulticastType::Log,cmd_string, &err);

if(!ok){
std::clog<<"SendAlarm (log) error: "<<err<<std::endl;
Expand Down Expand Up @@ -546,7 +546,7 @@ bool Services::SendLog(const std::string& message, unsigned int severity, const

std::string err="";

if(!m_backend_client.SendMulticast(cmd_string, &err)){
if(!m_backend_client.SendMulticast(MulticastType::Log,cmd_string, &err)){
std::clog<<"SendLog error: "<<err<<std::endl;
return false;
}
Expand All @@ -556,13 +556,14 @@ bool Services::SendLog(const std::string& message, unsigned int severity, const
}


bool Services::SendMonitoringData(const std::string& json_data, const std::string& device, unsigned int timestamp){
bool Services::SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device, unsigned int timestamp){

const std::string& name = (device=="") ? m_name : device;

std::string cmd_string = std::string{"{ \"topic\":\"monitoring\""}
+ ", \"time\":"+std::to_string(timestamp)
+ ", \"device\":\""+ name +"\""
+ ", \"subject\":\""+ subject +"\""
+ ", \"data\":\""+ json_data +"\" }";

if(cmd_string.length()>655355){
Expand All @@ -572,7 +573,7 @@ bool Services::SendMonitoringData(const std::string& json_data, const std::strin

std::string err="";

if(!m_backend_client.SendMulticast(cmd_string, &err)){
if(!m_backend_client.SendMulticast(MulticastType::Monitoring,cmd_string, &err)){
std::clog<<"SendMonitoringData error: "<<err<<std::endl;
return false;
}
Expand Down Expand Up @@ -640,7 +641,7 @@ bool Services::SendTemporaryROOTplot(const std::string& plot_name, const std::st

std::string err="";

if(!m_backend_client.SendMulticast(cmd_string, &err)){
if(!m_backend_client.SendMulticast(MulticastType::Log,cmd_string, &err)){
std::clog<<"SendROOTplot error: "<<err<<std::endl;
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceDiscovery/Services.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace ToolFramework {

bool SendLog(const std::string& message, unsigned int severity=2, const std::string& device="", const unsigned int timestamp=0);
bool SendAlarm(const std::string& message, unsigned int level=0, const std::string& device="", const unsigned int timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT);
bool SendMonitoringData(const std::string& json_data, const std::string& device="", unsigned int timestamp=0);
bool SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device="", unsigned int timestamp=0);
bool SendCalibrationData(const std::string& json_data, const std::string& description, const std::string& device="", unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT);
bool GetCalibrationData(std::string& json_data, int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT);
bool SendDeviceConfig(const std::string& json_data, const std::string& author, const std::string& description, const std::string& device="", unsigned int timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT);
Expand Down
55 changes: 34 additions & 21 deletions src/ServiceDiscovery/ServicesBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,18 @@ bool ServicesBackend::InitMulticast(){
/* Multicast Setup */
/* ----------------------------------------- */

int multicast_port = 55554;
std::string multicast_address = "239.192.1.1"; // FIXME suitable default?
int log_port = 55554;
int mon_port = 55553;
std::string multicast_address = "239.192.1.1";

// FIXME add to config file
m_variables.Get("multicast_port",multicast_port);
m_variables.Get("log_port",log_port);
m_variables.Get("mon_port",mon_port);
m_variables.Get("multicast_address",multicast_address);

// set up multicast socket for sending logging & monitoring data
multicast_socket = socket(AF_INET, SOCK_DGRAM, 0);
if(multicast_socket<=0){
log_socket = socket(AF_INET, SOCK_DGRAM, 0);
mon_socket = socket(AF_INET, SOCK_DGRAM, 0);
if(log_socket<=0 || mon_socket<=0){
Log(std::string{"Failed to open multicast socket with error "}+strerror(errno),v_error,verbosity);
return false;
}
Expand All @@ -288,34 +290,39 @@ bool ServicesBackend::InitMulticast(){
struct linger l;
l.l_onoff = 0; // whether to linger
l.l_linger = 0; // seconds to linger for
get_ok = setsockopt(multicast_socket, SOL_SOCKET, SO_LINGER,(char*) &l, sizeof(l));
get_ok = setsockopt(log_socket, SOL_SOCKET, SO_LINGER,(char*) &l, sizeof(l));
get_ok = get_ok || setsockopt(mon_socket, SOL_SOCKET, SO_LINGER,(char*) &l, sizeof(l));
if(get_ok!=0){
Log(std::string{"Failed to set multicast socket linger with error "}+strerror(errno),v_error,verbosity);
return false;
}

// set the socket to non-blocking mode - seems like a good idea...? XXX
get_ok = fcntl(multicast_socket, F_SETFL, O_NONBLOCK);
get_ok = fcntl(log_socket, F_SETFL, O_NONBLOCK);
get_ok = get_ok || fcntl(mon_socket, F_SETFL, O_NONBLOCK);
if(get_ok!=0){
Log(std::string{"Failed to set multicast socket to non-blocking with error "}
+strerror(errno),v_error,verbosity);
//return false;
}

// format destination address from IP string
bzero((char *)&multicast_addr, sizeof(multicast_addr)); // init to 0
multicast_addr.sin_family = AF_INET;
multicast_addr.sin_port = htons(multicast_port);
bzero((char *)&log_addr, sizeof(log_addr)); // init to 0
log_addr.sin_family = AF_INET;
log_addr.sin_port = htons(log_port);
mon_addr = log_addr;
mon_addr.sin_port = htons(mon_port);
// convert destination address string to binary
get_ok = inet_aton(multicast_address.c_str(), &multicast_addr.sin_addr);
if(get_ok==0){
get_ok = inet_aton(multicast_address.c_str(), &log_addr.sin_addr);
get_ok = get_ok && inet_aton(multicast_address.c_str(), &mon_addr.sin_addr);
if(get_ok==0){ // returns 0 on failure, not success
Log("Bad multicast address '"+multicast_address+"'",v_error,verbosity);
return false;
}
multicast_addrlen = sizeof(multicast_addr);
multicast_addrlen = sizeof(log_addr);

// apparently we can poll with zmq?
multicast_poller = zmq::pollitem_t{ NULL, multicast_socket, ZMQ_POLLOUT, 0 };
//multicast_poller = zmq::pollitem_t{ NULL, multicast_socket, ZMQ_POLLOUT, 0 };

return true;
}
Expand Down Expand Up @@ -364,25 +371,30 @@ bool ServicesBackend::BackgroundThread(std::future<void> signaller){
return true;
}

bool ServicesBackend::SendMulticast(std::string command, std::string* err){
bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std::string* err){
// multicast send. These do not wait for a response, so no timeout.
// only immediately evident errors are reported. receipt is not confirmed.
if(verbosity>10) std::cout<<"ServicesBackend::SendMulticast invoked with command '"<<command<<"'"<<std::endl;
// type: 0=logging, 1=monitoring
int multicast_socket = (type==MulticastType::Log) ? log_socket : mon_socket;
struct sockaddr_in* multicast_addr = (type==MulticastType::Log) ? &log_addr : &mon_addr;

// check for listeners...?
zmq::poll(&multicast_poller,1, 0); // timeout 0 = return immediately... XXX
/*
// check for listeners...? - seems redundant, multicast can always send
zmq::poll(&multicast_poller,1, 0); // timeout 0 = return immediately...
if(multicast_poller.revents & ZMQ_POLLOUT){
*/

// got a listener - ship it
int cnt = sendto(multicast_socket, command.c_str(), command.length()+1, 0, (struct sockaddr*)&multicast_addr, multicast_addrlen);
int cnt = sendto(multicast_socket, command.c_str(), command.length()+1, 0, (struct sockaddr*)multicast_addr, multicast_addrlen);
if(cnt < 0){
std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)};
Log(errmsg,v_error,verbosity);
if(err) *err= errmsg; //zmq_strerror(errno);
return false;
}

}
//}

return true;
}
Expand Down Expand Up @@ -770,7 +782,8 @@ bool ServicesBackend::Finalise(){
if(utilities) utilities->RemoveService("slowcontrol_read");

Log("ServicesBackend Closing multicast socket",v_debug,verbosity);
close(multicast_socket);
close(log_socket);
close(mon_socket);

Log("ServicesBackend Deleting Utilities class",v_debug,verbosity);
if(utilities){
Expand Down
12 changes: 8 additions & 4 deletions src/ServiceDiscovery/ServicesBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ struct Command {

enum class SlowControlMsg { Query, Log, Alarm, Monitoring, Calibration, Config };

enum class MulticastType { Log, Monitoring };

class ServicesBackend {
public:
ServicesBackend(){};
Expand All @@ -60,7 +62,7 @@ class ServicesBackend {
bool SendCommand(const std::string& topic, const std::string& command, std::string* results=nullptr, const unsigned int* timeout_ms=nullptr, std::string* err=nullptr);

// multicasts
bool SendMulticast(std::string command, std::string* err=nullptr);
bool SendMulticast(MulticastType type, std::string command, std::string* err=nullptr);


private:
Expand All @@ -85,10 +87,12 @@ class ServicesBackend {
std::vector<zmq::pollitem_t> in_polls;
std::vector<zmq::pollitem_t> out_polls;

// multicast socket file descriptor
int multicast_socket=-1;
// multicast socket file descriptors
int log_socket=-1;
int mon_socket=-1;
// multicast destination address structure
struct sockaddr_in multicast_addr;
struct sockaddr_in log_addr;
struct sockaddr_in mon_addr;
socklen_t multicast_addrlen;
// apparently works with zmq poller?
zmq::pollitem_t multicast_poller;
Expand Down