diff --git a/src/basic_fun_jmg.cpp b/src/basic_fun_jmg.cpp index 2e6a02118..65b5e4772 100644 --- a/src/basic_fun_jmg.cpp +++ b/src/basic_fun_jmg.cpp @@ -706,15 +706,19 @@ namespace lib { { // normal file GDLStream& actUnit = fileUnits[ lun-1]; - if( !actUnit.IsOpen()) + if( !actUnit.IsOpen() && actUnit.SockNum()==-1) return fileStatus; // OPEN tag is init to zero (SpDByte::GetInstance()) struct stat buffer; - // int status = //status will be bad on the units used by SPAWN, UNIT=XXX; Do not check status - stat(actUnit.Name().c_str(), &buffer); + int status=0; + if (status =stat(actUnit.Name().c_str(), &buffer) !=0) status=fstat(lun, &buffer); //if (status) perror(strerror(errno)); fileStatus->InitTag("NAME", DStringGDL( actUnit.Name())); - fileStatus->InitTag("OPEN", DByteGDL( 1)); + fileStatus->InitTag("OPEN", DByteGDL( 1)); + + //early return for sockets! these are not files! + if (actUnit.SockNum()!=-1) return fileStatus; + if (big) fileStatus->InitTag("SIZE", DLong64GDL( buffer.st_size));//size)); else fileStatus->InitTag("SIZE", DLongGDL( buffer.st_size));//size)); DByte is_a_tty=isatty(lun); diff --git a/src/basic_pro.cpp b/src/basic_pro.cpp index 9cef2fabe..955b94e27 100644 --- a/src/basic_pro.cpp +++ b/src/basic_pro.cpp @@ -652,7 +652,9 @@ namespace lib { void openu(EnvT* e) { open_lun(e, fstream::in | fstream::out); } - +#ifndef _WIN32 +#include +#endif void socket(EnvT* e) { int nParam = e->NParam(3); @@ -675,7 +677,17 @@ namespace lib { DUInt port; BaseGDL* p2 = e->GetParDefined(2); if (p2->Type() == GDL_STRING) { + DString s; + e->AssureScalarPar(2,s); +#ifndef _WIN32 // look up /etc/services + struct servent *servent=getservbyname(s.c_str(),NULL); + if (servent==NULL) e->Throw("Unable to connect to host. Unit: "+i2s(lun)+", File: "+host+"."+s); + else port=servent->s_port; + endservent(); +#else + e->Throw("Unable to connect to host. Unit: "+i2s(lun)+", File: "+host+"."+s); +#endif } else if (p2->Type() == GDL_UINT) { e->AssureScalarPar(2, port); } else if (p2->Type() == GDL_INT) { @@ -726,7 +738,7 @@ namespace lib { try { fileUnits[lun - 1].Socket(host, port, swapEndian, - c_timeout, r_timeout, c_timeout); + c_timeout, r_timeout, c_timeout, width); } catch (GDLException& ex) { DString errorMsg = ex.toString() + " Unit: " + i2s(lun) + ", File: " + fileUnits[lun - 1].Name(); @@ -986,29 +998,38 @@ namespace lib { " Unit: " + i2s(lun)); is = &cin; } else if (sockNum != -1) { - // Socket Read + // GD: Socket Read: we use an intermediate buffer seen as a istringstream. (yes all this is too complicated!) + // So we NEED to get the EXACT amount of bytes to be "read". In order to get the rest to be read next time. + // the code was wrong in this respect. + // Get total amount of bytes to transfer. Should be factorized between the various cases. + SizeT nBytes=0; + for (SizeT i = 1; i < nParam; i++) { + BaseGDL* p = e->GetPar(i); + if (p == NULL) nBytes+=sizeof(DFloat); // will be a DFloatGDL + else nBytes = p->NBytes(); + if (p->Type() == GDL_STRUCT) nBytes = static_cast (p)->NBytesToTransfer(); //p->NBytes does not give sum of length of struct elements, due to alignment.We decompose. + } swapEndian = fileUnits[lun - 1].SwapEndian(); - compress = fileUnits[lun - 1].Compress(); - string *recvBuf = &fileUnits[lun - 1].RecvBuf(); - - // Setup recv buffer & string - const int MAXRECV = 2048 * 8; - char buf[MAXRECV + 1]; - - // Read socket until finished & store in recv string - while (1) { - memset(buf, 0, MAXRECV + 1); - int status = recv(sockNum, buf, MAXRECV, 0); - // cout << "Bytes received: " << status << endl; - if (status == 0) break; - for (SizeT i = 0; i < status; i++) - recvBuf->push_back(buf[i]); - } + recvBuf->clear(); + recvBuf->reserve(nBytes+1); //make recvBuf great again // Get istringstream, write recv string, & assign to istream istringstream *iss = &fileUnits[lun - 1].ISocketStream(); + // Read socket until finished & store in recv string + char c; + int nread; + for (auto i=0; i< nBytes;) { + nread = read(sockNum, &c, 1);//, 0); + if (nread < 0) { + e->Throw("read associated Socket error."); + } + if (nread) { + recvBuf->push_back(c); + i++; + } + } iss->str(*recvBuf); is = iss; } else { @@ -1083,11 +1104,9 @@ namespace lib { DLong nRec2; memcpy(&nRec2, hdr, 4); -// 2018 April 14 -// G.Jung I don't think this works right for stuctures. -// I have a method (RealBytes) that computes the actual byte count, -// it needs entries across several different files. + SizeT nBytes = p->NBytes(); + if (p->Type() == GDL_STRUCT) nBytes = static_cast (p)->NBytesToTransfer(); //p->NBytes does not give sum of length of struct elements, due to alignment.We decompose. // In variable length VMS files, each record is prefixed // with a count byte that contains the number of bytes diff --git a/src/basic_pro.hpp b/src/basic_pro.hpp index 34564c621..39733d4d5 100644 --- a/src/basic_pro.hpp +++ b/src/basic_pro.hpp @@ -60,7 +60,7 @@ namespace lib { void read_pro(EnvT* e); void readf_pro(EnvT* e); void reads(EnvT* e); - void read_is(std::istream* is, EnvT* e, int parOffset); + std::streampos read_is(std::istream* is, EnvT* e, int parOffset); void on_error(EnvT* e); void catch_pro(EnvT* e); diff --git a/src/io.cpp b/src/io.cpp index edb492691..bcbaab85d 100644 --- a/src/io.cpp +++ b/src/io.cpp @@ -18,7 +18,6 @@ #include "includefirst.hpp" #include // std::remove(...) - #include "objects.hpp" #include "io.hpp" #ifdef __MINGW32__ @@ -660,10 +659,14 @@ void GDLStream::Open(const string& name_, width = width_; } +#ifdef __MINGW32__ + +// Use obsolete code, as I've no time to adapt new code to Windows. + void GDLStream::Socket(const string& host, - DUInt port, bool swapEndian_, - DDouble c_timeout_, DDouble r_timeout_, - DDouble w_timeout_) { + DUInt port, bool swapEndian_, + DDouble c_timeout_, DDouble r_timeout_, + DDouble w_timeout_, SizeT width) { if (iSocketStream == NULL) iSocketStream = new istringstream; @@ -680,7 +683,7 @@ void GDLStream::Socket(const string& host, int on = 1; if (setsockopt(sockNum, SOL_SOCKET, SO_REUSEADDR, - (const char*) &on, sizeof (on)) == -1) { + (const char*) &on, sizeof (on)) == -1) { throw GDLIOException("Error opening file."); } @@ -697,15 +700,73 @@ void GDLStream::Socket(const string& host, // cout << inet_ntoa(*((struct in_addr *)h->h_addr)) << endl; int status = inet_pton(AF_INET, inet_ntoa(*((struct in_addr *) h->h_addr)), - &m_addr.sin_addr); + &m_addr.sin_addr); status = connect(sockNum, (sockaddr *) & m_addr, sizeof (m_addr)); swapEndian = swapEndian_; - // BIG limit on socket send width to avoid leading \n in CheckNL - width = 32768; + // GD: ?????? // BIG limit on socket send width to avoid leading \n in CheckNL + //width = 32768; } +#else +//New code suppressing call to obsolete functions +#include +void GDLStream::Socket(const string& host, + DUInt port, bool swapEndian_, + DDouble c_timeout_, DDouble r_timeout_, + DDouble w_timeout_, SizeT width) { + if (iSocketStream == NULL) + iSocketStream = new istringstream; + + if (recvBuf == NULL) + recvBuf = new string; + + name = host + "." + i2s(port); + + sockNum = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); + + c_timeout = c_timeout_; + r_timeout = r_timeout_; + w_timeout = w_timeout_; + + int on = 1; + if (setsockopt(sockNum, SOL_SOCKET, SO_REUSEADDR, + (const char*) &on, sizeof (on)) == -1) { + throw GDLIOException("Error opening file."); + } + + sockaddr_in m_addr; + m_addr.sin_family = AF_INET; + m_addr.sin_port = htons(port); + + // Convert host to IPv4 format + struct addrinfo hints, *result; + int err; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; /* Sequenced, reliable, connection-based + byte streams. */ + + if ((err = getaddrinfo(host.c_str(),NULL,&hints, &result)) != 0) { // get the host info + std::cerr<ai_addr))->sin_addr.s_addr; + +// printf("ip address : %s\n", inet_ntoa(addr)); + + int status ; if (status = inet_pton(AF_INET, inet_ntoa(addr), &m_addr.sin_addr) != 1) perror(__func__); + + if (status = connect(sockNum, (sockaddr *) & m_addr, sizeof (m_addr)) != 0) perror(__func__); + fcntl(sockNum,F_SETFD, FD_CLOEXEC); + swapEndian = swapEndian_; + +// GD ????? // BIG limit on socket send width to avoid leading \n in CheckNL +// width = 32768; +} +#endif void AnyStream::Flush() { if (fStream != NULL) { @@ -731,7 +792,6 @@ void GDLStream::Close() { if (deleteOnClose) std::remove(name.c_str()); } - name = ""; f77 = false; swapEndian = false; compress = false; @@ -741,7 +801,8 @@ void GDLStream::Close() { xdrs = NULL; width = defaultStreamWidth; - + //Do not forget sockets! + if (sockNum != -1) close(sockNum); sockNum = -1; c_timeout = 0.0; r_timeout = 0.0; diff --git a/src/io.hpp b/src/io.hpp index 58e2e3fb8..35c2d7fac 100644 --- a/src/io.hpp +++ b/src/io.hpp @@ -150,7 +150,6 @@ class GDLStream DDouble w_timeout; SizeT width; - std::streampos lastSeekPos; // for F77 @@ -205,7 +204,7 @@ class GDLStream void Socket( const std::string& host, DUInt port, bool swapEndian_, - DDouble c_timeout, DDouble r_timeout, DDouble w_timeout); + DDouble c_timeout, DDouble r_timeout, DDouble w_timeout, SizeT width); void Flush(); diff --git a/src/libinit.cpp b/src/libinit.cpp index 03fe70181..55a087cec 100644 --- a/src/libinit.cpp +++ b/src/libinit.cpp @@ -461,13 +461,15 @@ void LibInit() new DLibPro(lib::openu,string("OPENU"),3,openKey); new DLibPro(lib::get_lun,string("GET_LUN"),1); - const string socketKey[]={"ERROR","GET_LUN","STDIO", + const string socketWarnKey[]={"ACCEPT","LISTEN","RAWIO","PORT",KLISTEND}; + const string socketKey[]={"ERROR","GET_LUN", + "STDIO", //not used but not signaled also. "SWAP_ENDIAN","SWAP_IF_BIG_ENDIAN", "SWAP_IF_LITTLE_ENDIAN","WIDTH", "CONNECT_TIMEOUT","READ_TIMEOUT", "WRITE_TIMEOUT", KLISTEND}; - new DLibPro(lib::socket,string("SOCKET"),3,socketKey); + new DLibPro(lib::socket,string("SOCKET"),3,socketKey,socketWarnKey); new DLibPro(lib::flush_lun,string("FLUSH"),-1); diff --git a/src/read.cpp b/src/read.cpp index 913ed7a9b..652feaa9e 100644 --- a/src/read.cpp +++ b/src/read.cpp @@ -167,93 +167,71 @@ namespace lib { using namespace std; - void readf_pro( EnvT* e) - { - SizeT nParam=e->NParam(); - if( nParam < 1) - e->Throw( "Incorrect number of arguments."); + void readf_pro(EnvT* e) { + SizeT nParam = e->NParam(); + if (nParam < 1) + e->Throw("Incorrect number of arguments."); DLong lun; - e->AssureLongScalarPar( 0, lun); + e->AssureLongScalarPar(0, lun); istream* is; - bool stdLun = check_lun( e, lun); - if( stdLun) - { - if( lun != 0) - e->Throw( "Cannot read from stdout and stderr." - " Unit: "+i2s( lun)); - is = &cin; - } - else - { - if( fileUnits[ lun-1].F77()) - e->Throw( "Formatted IO not allowed with F77_UNFORMATTED " - "files. Unit: "+i2s( lun)); - - int sockNum = fileUnits[ lun-1].SockNum(); - //cout << "sockNum: " << sockNum << endl; - - if (sockNum == -1) { - // *** File Read *** // - if( fileUnits[ lun-1].Compress()) - is = &fileUnits[ lun-1].IgzStream(); - else - is = &fileUnits[ lun-1].IStream(); - - } else { - // *** Socket Read *** // - string *recvBuf = &fileUnits[ lun-1].RecvBuf(); - - // Setup recv buffer & string - const int MAXRECV = 2048*8; - char buf[MAXRECV+1]; - - // Read socket until finished & store in recv string - int totalread = 0; - while (1) { - memset(buf, 0, MAXRECV+1); - int status = recv(sockNum, buf, MAXRECV, 0); - // cout << "Bytes received: " << status << endl; - if (status == 0) break; - - recvBuf->append(buf, status); - - // for( SizeT i=0; ipush_back(buf[i]); - - totalread += status; - //cout << "recvBuf size: " << recvBuf->size() << endl; - //cout << "Total bytes read: " << totalread << endl << endl; - } - // if (totalread > 0) cout << "Total bytes read: " << totalread << endl; - - // Get istringstream, write recv string, & assign to istream - istringstream *iss = &fileUnits[ lun-1].ISocketStream(); - iss->str(*recvBuf); - is = iss; - } + bool stdLun = check_lun(e, lun); + if (stdLun) { + if (lun != 0) + e->Throw("Cannot read from stdout and stderr." + " Unit: " + i2s(lun)); + is = &cin; + } else { + if (fileUnits[ lun - 1].F77()) + e->Throw("Formatted IO not allowed with F77_UNFORMATTED " + "files. Unit: " + i2s(lun)); + + int sockNum = fileUnits[ lun - 1].SockNum(); + // cout << "sockNum: " << sockNum << endl; + + if (sockNum == -1) { + // *** File Read *** // + if (fileUnits[ lun - 1].Compress()) + is = &fileUnits[ lun - 1].IgzStream(); + else + is = &fileUnits[ lun - 1].IStream(); + + } else { + string *recvBuf = &fileUnits[ lun - 1].RecvBuf(); + char c; + while (1) { + int nread = recv(sockNum, &c, 1, MSG_DONTWAIT); //cannot reproduce behaviour of IDL since we use buffered processing in read_is(). + // we need to use lower level C reads, not read_is() in this special case OR find a way to get a socket behave like a file. +// int nread = read(sockNum, &c, 1); //, 0); //IDL reads byte by byte to test for \n and stop reading + if (nread == 0) break; //closed + if (nread < 1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; //nothing more yet. + else e->Throw("Error accessing underlying socket, reason: "+std::string(strerror(errno))); + } + recvBuf->push_back(c); + } + //ALL READ. + istringstream *iss = &fileUnits[ lun - 1].ISocketStream(); + iss->str(*recvBuf); + is = iss; + streampos one = 1; + streampos readpos=read_is(is, e, 1); + recvBuf->erase(0,readpos-one); //consume already read. Note this is a readf, so at every call, one line is consumed. + return; } + } - read_is( is, e, 1); - - // If socket strip off leading line - if (lun > 0 && fileUnits[ lun-1].SockNum() != -1) { - string *recvBuf = &fileUnits[ lun-1].RecvBuf(); - int pos = is->tellg(); - recvBuf->erase(0, pos); - - // int pos = recvBuf->find("\n", 0); - //recvBuf->erase(0, pos+1); + read_is(is, e, 1); } - } void read_pro( EnvT* e) { read_is( &cin, e, 0); } -void read_is(istream* is, EnvT* e, int parOffset) { + +streampos read_is(istream* is, EnvT* e, int parOffset) { // PROMPT keyword BaseGDL* prompt = e->GetKW(4); if (prompt != NULL && !prompt->Scalar()) @@ -281,7 +259,7 @@ void read_is(istream* is, EnvT* e, int parOffset) { bool noPrompt = true; int nParam = e->NParam(); - if (nParam == parOffset) return; + if (nParam == parOffset) { return is->tellg() ;} ostringstream oss; @@ -382,7 +360,7 @@ void read_is(istream* is, EnvT* e, int parOffset) { } if (sigControlC) - return; + return is->tellg(); } else #endif @@ -412,6 +390,7 @@ void read_is(istream* is, EnvT* e, int parOffset) { DStringGDL gdlString(""); gdlString.FromStream(*is); } + return is->tellg(); } void reads(EnvT * e) {