From 7e9bf8cf673ade0442867f9312f90457a79ba401 Mon Sep 17 00:00:00 2001 From: Giloo Date: Fri, 27 Mar 2026 10:18:57 +0100 Subject: [PATCH 1/3] Patched SOCKET and made socket READF, READU and CLOSE working, at last! following #2178, changed obsoleted gethostbyname() to getaddrinfo(). Testing this, I realized that neither READF or READU on sockets were working correctly (i.e. as IDL does) or even working at all. This since the beginnings of GDL. Because the buffering used, necessary because GDL has opted to a generalized C++ interface for I/O using std::streams, was not completely consistent in every case, especially when disguising a socket as a bona fide std::stringstream. For example, sockets were NOT CLOSED by the CLOSE command. Socket reads were buffering the data without knowing the amount of data to be read, allowing for timeouts or missing some data. Hopefully this is now beyond us. --- src/basic_pro.cpp | 49 +++++++++++-------- src/io.cpp | 30 +++++++----- src/read.cpp | 119 ++++++++++++++++++---------------------------- 3 files changed, 93 insertions(+), 105 deletions(-) diff --git a/src/basic_pro.cpp b/src/basic_pro.cpp index 9cef2fabe..921061d1c 100644 --- a/src/basic_pro.cpp +++ b/src/basic_pro.cpp @@ -986,29 +986,38 @@ namespace lib { " Unit: " + i2s(lun)); is = &cin; } else if (sockNum != -1) { - // Socket Read + // GD: Socket Read: we use an intermediate buffer seen as a istringstream. (yes all this is too complicated!) + // So we NEED to get the EXACT amount of bytes to be "read". In order to get the rest to be read next time. + // the code was wrong in this respect. + // Get total amount of bytes to transfer. Should be factorized between the various cases. + SizeT nBytes=0; + for (SizeT i = 1; i < nParam; i++) { + BaseGDL* p = e->GetPar(i); + if (p == NULL) nBytes+=sizeof(DFloat); // will be a DFloatGDL + else nBytes = p->NBytes(); + if (p->Type() == GDL_STRUCT) nBytes = static_cast (p)->NBytesToTransfer(); //p->NBytes does not give sum of length of struct elements, due to alignment.We decompose. + } swapEndian = fileUnits[lun - 1].SwapEndian(); - compress = fileUnits[lun - 1].Compress(); - string *recvBuf = &fileUnits[lun - 1].RecvBuf(); - - // Setup recv buffer & string - const int MAXRECV = 2048 * 8; - char buf[MAXRECV + 1]; - - // Read socket until finished & store in recv string - while (1) { - memset(buf, 0, MAXRECV + 1); - int status = recv(sockNum, buf, MAXRECV, 0); - // cout << "Bytes received: " << status << endl; - if (status == 0) break; - for (SizeT i = 0; i < status; i++) - recvBuf->push_back(buf[i]); - } + recvBuf->clear(); + recvBuf->reserve(nBytes+1); //make recvBuf great again // Get istringstream, write recv string, & assign to istream istringstream *iss = &fileUnits[lun - 1].ISocketStream(); + // Read socket until finished & store in recv string + char c; + int nread; + for (auto i=0; i< nBytes;) { + nread = read(sockNum, &c, 1);//, 0); + if (nread < 0) { + e->Throw("read associated Socket error."); + } + if (nread) { + recvBuf->push_back(c); + i++; + } + } iss->str(*recvBuf); is = iss; } else { @@ -1083,11 +1092,9 @@ namespace lib { DLong nRec2; memcpy(&nRec2, hdr, 4); -// 2018 April 14 -// G.Jung I don't think this works right for stuctures. -// I have a method (RealBytes) that computes the actual byte count, -// it needs entries across several different files. + SizeT nBytes = p->NBytes(); + if (p->Type() == GDL_STRUCT) nBytes = static_cast (p)->NBytesToTransfer(); //p->NBytes does not give sum of length of struct elements, due to alignment.We decompose. // In variable length VMS files, each record is prefixed // with a count byte that contains the number of bytes diff --git a/src/io.cpp b/src/io.cpp index edb492691..1d2019ee6 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -18,7 +18,7 @@ #include "includefirst.hpp" #include // std::remove(...) - +#include #include "objects.hpp" #include "io.hpp" #ifdef __MINGW32__ @@ -672,7 +672,7 @@ void GDLStream::Socket(const string& host, name = host; - sockNum = socket(AF_INET, SOCK_STREAM, 0); + sockNum = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); c_timeout = c_timeout_; r_timeout = r_timeout_; @@ -689,18 +689,26 @@ void GDLStream::Socket(const string& host, m_addr.sin_port = htons(port); // Convert host to IPv4 format - struct hostent *h; - if ((h = gethostbyname(host.c_str())) == NULL) { // get the host info + struct addrinfo hints, *result; + int err; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; /* Sequenced, reliable, connection-based + byte streams. */ + + if ((err = getaddrinfo(host.c_str(),NULL,&hints, &result)) != 0) { // get the host info + std::cerr<ai_addr))->sin_addr.s_addr; - // cout << inet_ntoa(*((struct in_addr *)h->h_addr)) << endl; - - int status = inet_pton(AF_INET, inet_ntoa(*((struct in_addr *) h->h_addr)), - &m_addr.sin_addr); +// printf("ip address : %s\n", inet_ntoa(addr)); - status = connect(sockNum, (sockaddr *) & m_addr, sizeof (m_addr)); + int status ; if (status = inet_pton(AF_INET, inet_ntoa(addr), &m_addr.sin_addr) != 1) perror(__func__); + if (status = connect(sockNum, (sockaddr *) & m_addr, sizeof (m_addr)) != 0) perror(__func__); + fcntl(sockNum,F_SETFD, FD_CLOEXEC); swapEndian = swapEndian_; // BIG limit on socket send width to avoid leading \n in CheckNL @@ -731,7 +739,6 @@ void GDLStream::Close() { if (deleteOnClose) std::remove(name.c_str()); } - name = ""; f77 = false; swapEndian = false; compress = false; @@ -741,7 +748,8 @@ void GDLStream::Close() { xdrs = NULL; width = defaultStreamWidth; - + //Do not forget sockets! + if (sockNum != -1) close(sockNum); sockNum = -1; c_timeout = 0.0; r_timeout = 0.0; diff --git a/src/read.cpp b/src/read.cpp index 913ed7a9b..9a85bcc38 100644 --- a/src/read.cpp +++ b/src/read.cpp @@ -167,92 +167,65 @@ namespace lib { using namespace std; - void readf_pro( EnvT* e) - { - SizeT nParam=e->NParam(); - if( nParam < 1) - e->Throw( "Incorrect number of arguments."); + void readf_pro(EnvT* e) { + SizeT nParam = e->NParam(); + if (nParam < 1) + e->Throw("Incorrect number of arguments."); DLong lun; - e->AssureLongScalarPar( 0, lun); + e->AssureLongScalarPar(0, lun); istream* is; - bool stdLun = check_lun( e, lun); - if( stdLun) - { - if( lun != 0) - e->Throw( "Cannot read from stdout and stderr." - " Unit: "+i2s( lun)); - is = &cin; - } - else - { - if( fileUnits[ lun-1].F77()) - e->Throw( "Formatted IO not allowed with F77_UNFORMATTED " - "files. Unit: "+i2s( lun)); - - int sockNum = fileUnits[ lun-1].SockNum(); - //cout << "sockNum: " << sockNum << endl; - - if (sockNum == -1) { - // *** File Read *** // - if( fileUnits[ lun-1].Compress()) - is = &fileUnits[ lun-1].IgzStream(); - else - is = &fileUnits[ lun-1].IStream(); - - } else { - // *** Socket Read *** // - string *recvBuf = &fileUnits[ lun-1].RecvBuf(); - - // Setup recv buffer & string - const int MAXRECV = 2048*8; - char buf[MAXRECV+1]; - - // Read socket until finished & store in recv string - int totalread = 0; - while (1) { - memset(buf, 0, MAXRECV+1); - int status = recv(sockNum, buf, MAXRECV, 0); - // cout << "Bytes received: " << status << endl; - if (status == 0) break; - - recvBuf->append(buf, status); - - // for( SizeT i=0; ipush_back(buf[i]); - - totalread += status; - //cout << "recvBuf size: " << recvBuf->size() << endl; - //cout << "Total bytes read: " << totalread << endl << endl; - } - // if (totalread > 0) cout << "Total bytes read: " << totalread << endl; - - // Get istringstream, write recv string, & assign to istream - istringstream *iss = &fileUnits[ lun-1].ISocketStream(); - iss->str(*recvBuf); - is = iss; - } + bool stdLun = check_lun(e, lun); + if (stdLun) { + if (lun != 0) + e->Throw("Cannot read from stdout and stderr." + " Unit: " + i2s(lun)); + is = &cin; + } else { + if (fileUnits[ lun - 1].F77()) + e->Throw("Formatted IO not allowed with F77_UNFORMATTED " + "files. Unit: " + i2s(lun)); + + int sockNum = fileUnits[ lun - 1].SockNum(); + // cout << "sockNum: " << sockNum << endl; + + if (sockNum == -1) { + // *** File Read *** // + if (fileUnits[ lun - 1].Compress()) + is = &fileUnits[ lun - 1].IgzStream(); + else + is = &fileUnits[ lun - 1].IStream(); + + } else { + // *** Socket Read *** // + string *recvBuf = &fileUnits[ lun - 1].RecvBuf(); + recvBuf->clear(); + recvBuf->reserve(1000); + char c; + recvBuf->clear(); + while (1) { + int nread = read(sockNum, &c, 1); //, 0); //IDL reads byte by byte to test for \n and stop reading + if (nread < 0) break; //error + if (nread) recvBuf->push_back(c); + if (c == '\n') break; + } + // Get istringstream, write recv string, & assign to istream + istringstream *iss = &fileUnits[ lun - 1].ISocketStream(); + iss->str(*recvBuf); + is = iss; } + } - read_is( is, e, 1); - - // If socket strip off leading line - if (lun > 0 && fileUnits[ lun-1].SockNum() != -1) { - string *recvBuf = &fileUnits[ lun-1].RecvBuf(); - int pos = is->tellg(); - recvBuf->erase(0, pos); - - // int pos = recvBuf->find("\n", 0); - //recvBuf->erase(0, pos+1); + read_is(is, e, 1); } - } void read_pro( EnvT* e) { read_is( &cin, e, 0); } + void read_is(istream* is, EnvT* e, int parOffset) { // PROMPT keyword BaseGDL* prompt = e->GetKW(4); From b8fa9813b7ce426080533180cdb0be4349b7cd8f Mon Sep 17 00:00:00 2001 From: Giloo Date: Fri, 27 Mar 2026 12:09:46 +0100 Subject: [PATCH 2/3] for windows, revert to old code for Socket() --that passes compiler -- as I've no inclination to solve this --- src/io.cpp | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/src/io.cpp b/src/io.cpp index 1d2019ee6..4fca1d4cc 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -18,7 +18,6 @@ #include "includefirst.hpp" #include // std::remove(...) -#include #include "objects.hpp" #include "io.hpp" #ifdef __MINGW32__ @@ -660,6 +659,59 @@ void GDLStream::Open(const string& name_, width = width_; } +#ifdef __MINGW32__ + +// Use obsolete code, as I've no time to adapt new code to Windows. + +void GDLStream::Socket(const string& host, + DUInt port, bool swapEndian_, + DDouble c_timeout_, DDouble r_timeout_, + DDouble w_timeout_) { + if (iSocketStream == NULL) + iSocketStream = new istringstream; + + if (recvBuf == NULL) + recvBuf = new string; + + name = host; + + sockNum = socket(AF_INET, SOCK_STREAM, 0); + + c_timeout = c_timeout_; + r_timeout = r_timeout_; + w_timeout = w_timeout_; + + int on = 1; + if (setsockopt(sockNum, SOL_SOCKET, SO_REUSEADDR, + (const char*) &on, sizeof (on)) == -1) { + throw GDLIOException("Error opening file."); + } + + sockaddr_in m_addr; + m_addr.sin_family = AF_INET; + m_addr.sin_port = htons(port); + + // Convert host to IPv4 format + struct hostent *h; + if ((h = gethostbyname(host.c_str())) == NULL) { // get the host info + throw GDLIOException("Unable to lookup host."); + } + + // cout << inet_ntoa(*((struct in_addr *)h->h_addr)) << endl; + + int status = inet_pton(AF_INET, inet_ntoa(*((struct in_addr *) h->h_addr)), + &m_addr.sin_addr); + + status = connect(sockNum, (sockaddr *) & m_addr, sizeof (m_addr)); + + swapEndian = swapEndian_; + + // BIG limit on socket send width to avoid leading \n in CheckNL + width = 32768; +} +#else +//New code suppressing call to obsolete functions +#include void GDLStream::Socket(const string& host, DUInt port, bool swapEndian_, DDouble c_timeout_, DDouble r_timeout_, @@ -714,7 +766,7 @@ addr.s_addr = ((struct sockaddr_in *)(result->ai_addr))->sin_addr.s_addr; // BIG limit on socket send width to avoid leading \n in CheckNL width = 32768; } - +#endif void AnyStream::Flush() { if (fStream != NULL) { fStream->flush(); From e0638875a02c6ff01540431b038b65b6b373b0ca Mon Sep 17 00:00:00 2001 From: Giloo Date: Wed, 1 Apr 2026 17:39:21 +0200 Subject: [PATCH 3/3] behaviour closer to IDL's. But not 100% reproducible due to the limitation of streams that are not tailored for handling socket data. --- src/basic_fun_jmg.cpp | 12 ++++++++---- src/basic_pro.cpp | 16 ++++++++++++++-- src/basic_pro.hpp | 2 +- src/io.cpp | 17 +++++++++-------- src/io.hpp | 3 +-- src/libinit.cpp | 6 ++++-- src/read.cpp | 32 +++++++++++++++++++------------- 7 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/basic_fun_jmg.cpp b/src/basic_fun_jmg.cpp index 2e6a02118..65b5e4772 100644 --- a/src/basic_fun_jmg.cpp +++ b/src/basic_fun_jmg.cpp @@ -706,15 +706,19 @@ namespace lib { { // normal file GDLStream& actUnit = fileUnits[ lun-1]; - if( !actUnit.IsOpen()) + if( !actUnit.IsOpen() && actUnit.SockNum()==-1) return fileStatus; // OPEN tag is init to zero (SpDByte::GetInstance()) struct stat buffer; - // int status = //status will be bad on the units used by SPAWN, UNIT=XXX; Do not check status - stat(actUnit.Name().c_str(), &buffer); + int status=0; + if (status =stat(actUnit.Name().c_str(), &buffer) !=0) status=fstat(lun, &buffer); //if (status) perror(strerror(errno)); fileStatus->InitTag("NAME", DStringGDL( actUnit.Name())); - fileStatus->InitTag("OPEN", DByteGDL( 1)); + fileStatus->InitTag("OPEN", DByteGDL( 1)); + + //early return for sockets! these are not files! + if (actUnit.SockNum()!=-1) return fileStatus; + if (big) fileStatus->InitTag("SIZE", DLong64GDL( buffer.st_size));//size)); else fileStatus->InitTag("SIZE", DLongGDL( buffer.st_size));//size)); DByte is_a_tty=isatty(lun); diff --git a/src/basic_pro.cpp b/src/basic_pro.cpp index 921061d1c..955b94e27 100644 --- a/src/basic_pro.cpp +++ b/src/basic_pro.cpp @@ -652,7 +652,9 @@ namespace lib { void openu(EnvT* e) { open_lun(e, fstream::in | fstream::out); } - +#ifndef _WIN32 +#include +#endif void socket(EnvT* e) { int nParam = e->NParam(3); @@ -675,7 +677,17 @@ namespace lib { DUInt port; BaseGDL* p2 = e->GetParDefined(2); if (p2->Type() == GDL_STRING) { + DString s; + e->AssureScalarPar(2,s); +#ifndef _WIN32 // look up /etc/services + struct servent *servent=getservbyname(s.c_str(),NULL); + if (servent==NULL) e->Throw("Unable to connect to host. Unit: "+i2s(lun)+", File: "+host+"."+s); + else port=servent->s_port; + endservent(); +#else + e->Throw("Unable to connect to host. Unit: "+i2s(lun)+", File: "+host+"."+s); +#endif } else if (p2->Type() == GDL_UINT) { e->AssureScalarPar(2, port); } else if (p2->Type() == GDL_INT) { @@ -726,7 +738,7 @@ namespace lib { try { fileUnits[lun - 1].Socket(host, port, swapEndian, - c_timeout, r_timeout, c_timeout); + c_timeout, r_timeout, c_timeout, width); } catch (GDLException& ex) { DString errorMsg = ex.toString() + " Unit: " + i2s(lun) + ", File: " + fileUnits[lun - 1].Name(); diff --git a/src/basic_pro.hpp b/src/basic_pro.hpp index 34564c621..39733d4d5 100644 --- a/src/basic_pro.hpp +++ b/src/basic_pro.hpp @@ -60,7 +60,7 @@ namespace lib { void read_pro(EnvT* e); void readf_pro(EnvT* e); void reads(EnvT* e); - void read_is(std::istream* is, EnvT* e, int parOffset); + std::streampos read_is(std::istream* is, EnvT* e, int parOffset); void on_error(EnvT* e); void catch_pro(EnvT* e); diff --git a/src/io.cpp b/src/io.cpp index 4fca1d4cc..bcbaab85d 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -666,7 +666,7 @@ void GDLStream::Open(const string& name_, void GDLStream::Socket(const string& host, DUInt port, bool swapEndian_, DDouble c_timeout_, DDouble r_timeout_, - DDouble w_timeout_) { + DDouble w_timeout_, SizeT width) { if (iSocketStream == NULL) iSocketStream = new istringstream; @@ -706,8 +706,8 @@ void GDLStream::Socket(const string& host, swapEndian = swapEndian_; - // BIG limit on socket send width to avoid leading \n in CheckNL - width = 32768; + // GD: ?????? // BIG limit on socket send width to avoid leading \n in CheckNL + //width = 32768; } #else //New code suppressing call to obsolete functions @@ -715,14 +715,14 @@ void GDLStream::Socket(const string& host, void GDLStream::Socket(const string& host, DUInt port, bool swapEndian_, DDouble c_timeout_, DDouble r_timeout_, - DDouble w_timeout_) { + DDouble w_timeout_, SizeT width) { if (iSocketStream == NULL) iSocketStream = new istringstream; if (recvBuf == NULL) recvBuf = new string; - name = host; + name = host + "." + i2s(port); sockNum = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); @@ -747,7 +747,7 @@ void GDLStream::Socket(const string& host, hints.ai_family = AF_INET; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* Sequenced, reliable, connection-based byte streams. */ - + if ((err = getaddrinfo(host.c_str(),NULL,&hints, &result)) != 0) { // get the host info std::cerr<ai_addr))->sin_addr.s_addr; fcntl(sockNum,F_SETFD, FD_CLOEXEC); swapEndian = swapEndian_; - // BIG limit on socket send width to avoid leading \n in CheckNL - width = 32768; +// GD ????? // BIG limit on socket send width to avoid leading \n in CheckNL +// width = 32768; } #endif + void AnyStream::Flush() { if (fStream != NULL) { fStream->flush(); diff --git a/src/io.hpp b/src/io.hpp index 58e2e3fb8..35c2d7fac 100644 --- a/src/io.hpp +++ b/src/io.hpp @@ -150,7 +150,6 @@ class GDLStream DDouble w_timeout; SizeT width; - std::streampos lastSeekPos; // for F77 @@ -205,7 +204,7 @@ class GDLStream void Socket( const std::string& host, DUInt port, bool swapEndian_, - DDouble c_timeout, DDouble r_timeout, DDouble w_timeout); + DDouble c_timeout, DDouble r_timeout, DDouble w_timeout, SizeT width); void Flush(); diff --git a/src/libinit.cpp b/src/libinit.cpp index 03fe70181..55a087cec 100644 --- a/src/libinit.cpp +++ b/src/libinit.cpp @@ -461,13 +461,15 @@ void LibInit() new DLibPro(lib::openu,string("OPENU"),3,openKey); new DLibPro(lib::get_lun,string("GET_LUN"),1); - const string socketKey[]={"ERROR","GET_LUN","STDIO", + const string socketWarnKey[]={"ACCEPT","LISTEN","RAWIO","PORT",KLISTEND}; + const string socketKey[]={"ERROR","GET_LUN", + "STDIO", //not used but not signaled also. "SWAP_ENDIAN","SWAP_IF_BIG_ENDIAN", "SWAP_IF_LITTLE_ENDIAN","WIDTH", "CONNECT_TIMEOUT","READ_TIMEOUT", "WRITE_TIMEOUT", KLISTEND}; - new DLibPro(lib::socket,string("SOCKET"),3,socketKey); + new DLibPro(lib::socket,string("SOCKET"),3,socketKey,socketWarnKey); new DLibPro(lib::flush_lun,string("FLUSH"),-1); diff --git a/src/read.cpp b/src/read.cpp index 9a85bcc38..652feaa9e 100644 --- a/src/read.cpp +++ b/src/read.cpp @@ -199,22 +199,27 @@ namespace lib { is = &fileUnits[ lun - 1].IStream(); } else { - // *** Socket Read *** // string *recvBuf = &fileUnits[ lun - 1].RecvBuf(); - recvBuf->clear(); - recvBuf->reserve(1000); - char c; - recvBuf->clear(); + char c; while (1) { - int nread = read(sockNum, &c, 1); //, 0); //IDL reads byte by byte to test for \n and stop reading - if (nread < 0) break; //error - if (nread) recvBuf->push_back(c); - if (c == '\n') break; + int nread = recv(sockNum, &c, 1, MSG_DONTWAIT); //cannot reproduce behaviour of IDL since we use buffered processing in read_is(). + // we need to use lower level C reads, not read_is() in this special case OR find a way to get a socket behave like a file. +// int nread = read(sockNum, &c, 1); //, 0); //IDL reads byte by byte to test for \n and stop reading + if (nread == 0) break; //closed + if (nread < 1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; //nothing more yet. + else e->Throw("Error accessing underlying socket, reason: "+std::string(strerror(errno))); + } + recvBuf->push_back(c); } - // Get istringstream, write recv string, & assign to istream + //ALL READ. istringstream *iss = &fileUnits[ lun - 1].ISocketStream(); iss->str(*recvBuf); is = iss; + streampos one = 1; + streampos readpos=read_is(is, e, 1); + recvBuf->erase(0,readpos-one); //consume already read. Note this is a readf, so at every call, one line is consumed. + return; } } @@ -226,7 +231,7 @@ namespace lib { read_is( &cin, e, 0); } -void read_is(istream* is, EnvT* e, int parOffset) { +streampos read_is(istream* is, EnvT* e, int parOffset) { // PROMPT keyword BaseGDL* prompt = e->GetKW(4); if (prompt != NULL && !prompt->Scalar()) @@ -254,7 +259,7 @@ void read_is(istream* is, EnvT* e, int parOffset) { bool noPrompt = true; int nParam = e->NParam(); - if (nParam == parOffset) return; + if (nParam == parOffset) { return is->tellg() ;} ostringstream oss; @@ -355,7 +360,7 @@ void read_is(istream* is, EnvT* e, int parOffset) { } if (sigControlC) - return; + return is->tellg(); } else #endif @@ -385,6 +390,7 @@ void read_is(istream* is, EnvT* e, int parOffset) { DStringGDL gdlString(""); gdlString.FromStream(*is); } + return is->tellg(); } void reads(EnvT * e) {