Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/basic_fun_jmg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,15 +706,19 @@ namespace lib {
{ // normal file
GDLStream& actUnit = fileUnits[ lun-1];

if( !actUnit.IsOpen())
if( !actUnit.IsOpen() && actUnit.SockNum()==-1)
return fileStatus; // OPEN tag is init to zero (SpDByte::GetInstance())

struct stat buffer;
// int status = //status will be bad on the units used by SPAWN, UNIT=XXX; Do not check status
stat(actUnit.Name().c_str(), &buffer);
int status=0;
if (status =stat(actUnit.Name().c_str(), &buffer) !=0) status=fstat(lun, &buffer); //if (status) perror(strerror(errno));

fileStatus->InitTag("NAME", DStringGDL( actUnit.Name()));
fileStatus->InitTag("OPEN", DByteGDL( 1));
fileStatus->InitTag("OPEN", DByteGDL( 1));

//early return for sockets! these are not files!
if (actUnit.SockNum()!=-1) return fileStatus;

if (big) fileStatus->InitTag("SIZE", DLong64GDL( buffer.st_size));//size));
else fileStatus->InitTag("SIZE", DLongGDL( buffer.st_size));//size));
DByte is_a_tty=isatty(lun);
Expand Down
65 changes: 42 additions & 23 deletions src/basic_pro.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,9 @@ namespace lib {
void openu(EnvT* e) {
open_lun(e, fstream::in | fstream::out);
}

#ifndef _WIN32
#include <netdb.h>
#endif
void socket(EnvT* e) {
int nParam = e->NParam(3);

Expand All @@ -675,7 +677,17 @@ namespace lib {
DUInt port;
BaseGDL* p2 = e->GetParDefined(2);
if (p2->Type() == GDL_STRING) {
DString s;
e->AssureScalarPar<DStringGDL>(2,s);
#ifndef _WIN32
// look up /etc/services
struct servent *servent=getservbyname(s.c_str(),NULL);
if (servent==NULL) e->Throw("Unable to connect to host. Unit: "+i2s(lun)+", File: "+host+"."+s);
else port=servent->s_port;
endservent();
#else
e->Throw("Unable to connect to host. Unit: "+i2s(lun)+", File: "+host+"."+s);
#endif
} else if (p2->Type() == GDL_UINT) {
e->AssureScalarPar<DUIntGDL>(2, port);
} else if (p2->Type() == GDL_INT) {
Expand Down Expand Up @@ -726,7 +738,7 @@ namespace lib {

try {
fileUnits[lun - 1].Socket(host, port, swapEndian,
c_timeout, r_timeout, c_timeout);
c_timeout, r_timeout, c_timeout, width);
} catch (GDLException& ex) {
DString errorMsg = ex.toString() + " Unit: " + i2s(lun) +
", File: " + fileUnits[lun - 1].Name();
Expand Down Expand Up @@ -986,29 +998,38 @@ namespace lib {
" Unit: " + i2s(lun));
is = &cin;
} else if (sockNum != -1) {
// Socket Read
// GD: Socket Read: we use an intermediate buffer seen as a istringstream. (yes all this is too complicated!)
// So we NEED to get the EXACT amount of bytes to be "read". In order to get the rest to be read next time.
// the code was wrong in this respect.
// Get total amount of bytes to transfer. Should be factorized between the various cases.
SizeT nBytes=0;
for (SizeT i = 1; i < nParam; i++) {
BaseGDL* p = e->GetPar(i);
if (p == NULL) nBytes+=sizeof(DFloat); // will be a DFloatGDL
else nBytes = p->NBytes();
if (p->Type() == GDL_STRUCT) nBytes = static_cast<DStructGDL*> (p)->NBytesToTransfer(); //p->NBytes does not give sum of length of struct elements, due to alignment.We decompose.
}
swapEndian = fileUnits[lun - 1].SwapEndian();

compress = fileUnits[lun - 1].Compress();

string *recvBuf = &fileUnits[lun - 1].RecvBuf();

// Setup recv buffer & string
const int MAXRECV = 2048 * 8;
char buf[MAXRECV + 1];

// Read socket until finished & store in recv string
while (1) {
memset(buf, 0, MAXRECV + 1);
int status = recv(sockNum, buf, MAXRECV, 0);
// cout << "Bytes received: " << status << endl;
if (status == 0) break;
for (SizeT i = 0; i < status; i++)
recvBuf->push_back(buf[i]);
}
recvBuf->clear();
recvBuf->reserve(nBytes+1); //make recvBuf great again

// Get istringstream, write recv string, & assign to istream
istringstream *iss = &fileUnits[lun - 1].ISocketStream();
// Read socket until finished & store in recv string
char c;
int nread;
for (auto i=0; i< nBytes;) {
nread = read(sockNum, &c, 1);//, 0);
if (nread < 0) {
e->Throw("read associated Socket error.");
}
if (nread) {
recvBuf->push_back(c);
i++;
}
}
iss->str(*recvBuf);
is = iss;
} else {
Expand Down Expand Up @@ -1083,11 +1104,9 @@ namespace lib {

DLong nRec2;
memcpy(&nRec2, hdr, 4);
// 2018 April 14
// G.Jung I don't think this works right for stuctures.
// I have a method (RealBytes) that computes the actual byte count,
// it needs entries across several different files.

SizeT nBytes = p->NBytes();
if (p->Type() == GDL_STRUCT) nBytes = static_cast<DStructGDL*> (p)->NBytesToTransfer(); //p->NBytes does not give sum of length of struct elements, due to alignment.We decompose.

// In variable length VMS files, each record is prefixed
// with a count byte that contains the number of bytes
Expand Down
2 changes: 1 addition & 1 deletion src/basic_pro.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace lib {
void read_pro(EnvT* e);
void readf_pro(EnvT* e);
void reads(EnvT* e);
void read_is(std::istream* is, EnvT* e, int parOffset);
std::streampos read_is(std::istream* is, EnvT* e, int parOffset);

void on_error(EnvT* e);
void catch_pro(EnvT* e);
Expand Down
81 changes: 71 additions & 10 deletions src/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "includefirst.hpp"

#include <cstdio> // std::remove(...)

#include "objects.hpp"
#include "io.hpp"
#ifdef __MINGW32__
Expand Down Expand Up @@ -660,10 +659,14 @@ void GDLStream::Open(const string& name_,
width = width_;
}

#ifdef __MINGW32__

// Use obsolete code, as I've no time to adapt new code to Windows.

void GDLStream::Socket(const string& host,
DUInt port, bool swapEndian_,
DDouble c_timeout_, DDouble r_timeout_,
DDouble w_timeout_) {
DUInt port, bool swapEndian_,
DDouble c_timeout_, DDouble r_timeout_,
DDouble w_timeout_, SizeT width) {
if (iSocketStream == NULL)
iSocketStream = new istringstream;

Expand All @@ -680,7 +683,7 @@ void GDLStream::Socket(const string& host,

int on = 1;
if (setsockopt(sockNum, SOL_SOCKET, SO_REUSEADDR,
(const char*) &on, sizeof (on)) == -1) {
(const char*) &on, sizeof (on)) == -1) {
throw GDLIOException("Error opening file.");
}

Expand All @@ -697,15 +700,73 @@ void GDLStream::Socket(const string& host,
// cout << inet_ntoa(*((struct in_addr *)h->h_addr)) << endl;

int status = inet_pton(AF_INET, inet_ntoa(*((struct in_addr *) h->h_addr)),
&m_addr.sin_addr);
&m_addr.sin_addr);

status = connect(sockNum, (sockaddr *) & m_addr, sizeof (m_addr));

swapEndian = swapEndian_;

// BIG limit on socket send width to avoid leading \n in CheckNL
width = 32768;
// GD: ?????? // BIG limit on socket send width to avoid leading \n in CheckNL
//width = 32768;
}
#else
//New code suppressing call to obsolete functions
#include <fcntl.h>
void GDLStream::Socket(const string& host,
DUInt port, bool swapEndian_,
DDouble c_timeout_, DDouble r_timeout_,
DDouble w_timeout_, SizeT width) {
if (iSocketStream == NULL)
iSocketStream = new istringstream;

if (recvBuf == NULL)
recvBuf = new string;

name = host + "." + i2s(port);

sockNum = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);

c_timeout = c_timeout_;
r_timeout = r_timeout_;
w_timeout = w_timeout_;

int on = 1;
if (setsockopt(sockNum, SOL_SOCKET, SO_REUSEADDR,
(const char*) &on, sizeof (on)) == -1) {
throw GDLIOException("Error opening file.");
}

sockaddr_in m_addr;
m_addr.sin_family = AF_INET;
m_addr.sin_port = htons(port);

// Convert host to IPv4 format
struct addrinfo hints, *result;
int err;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Sequenced, reliable, connection-based
byte streams. */

if ((err = getaddrinfo(host.c_str(),NULL,&hints, &result)) != 0) { // get the host info
std::cerr<<gai_strerror(err)<<std::endl;
throw GDLIOException("Unable to lookup host.");
}
struct in_addr addr;
addr.s_addr = ((struct sockaddr_in *)(result->ai_addr))->sin_addr.s_addr;

// printf("ip address : %s\n", inet_ntoa(addr));

int status ; if (status = inet_pton(AF_INET, inet_ntoa(addr), &m_addr.sin_addr) != 1) perror(__func__);

if (status = connect(sockNum, (sockaddr *) & m_addr, sizeof (m_addr)) != 0) perror(__func__);
fcntl(sockNum,F_SETFD, FD_CLOEXEC);
swapEndian = swapEndian_;

// GD ????? // BIG limit on socket send width to avoid leading \n in CheckNL
// width = 32768;
}
#endif

void AnyStream::Flush() {
if (fStream != NULL) {
Expand All @@ -731,7 +792,6 @@ void GDLStream::Close() {
if (deleteOnClose)
std::remove(name.c_str());
}
name = "";
f77 = false;
swapEndian = false;
compress = false;
Expand All @@ -741,7 +801,8 @@ void GDLStream::Close() {
xdrs = NULL;

width = defaultStreamWidth;

//Do not forget sockets!
if (sockNum != -1) close(sockNum);
sockNum = -1;
c_timeout = 0.0;
r_timeout = 0.0;
Expand Down
3 changes: 1 addition & 2 deletions src/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ class GDLStream
DDouble w_timeout;

SizeT width;

std::streampos lastSeekPos;

// for F77
Expand Down Expand Up @@ -205,7 +204,7 @@ class GDLStream

void Socket( const std::string& host,
DUInt port, bool swapEndian_,
DDouble c_timeout, DDouble r_timeout, DDouble w_timeout);
DDouble c_timeout, DDouble r_timeout, DDouble w_timeout, SizeT width);

void Flush();

Expand Down
6 changes: 4 additions & 2 deletions src/libinit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,15 @@ void LibInit()
new DLibPro(lib::openu,string("OPENU"),3,openKey);
new DLibPro(lib::get_lun,string("GET_LUN"),1);

const string socketKey[]={"ERROR","GET_LUN","STDIO",
const string socketWarnKey[]={"ACCEPT","LISTEN","RAWIO","PORT",KLISTEND};
const string socketKey[]={"ERROR","GET_LUN",
"STDIO", //not used but not signaled also.
"SWAP_ENDIAN","SWAP_IF_BIG_ENDIAN",
"SWAP_IF_LITTLE_ENDIAN","WIDTH",
"CONNECT_TIMEOUT","READ_TIMEOUT",
"WRITE_TIMEOUT",
KLISTEND};
new DLibPro(lib::socket,string("SOCKET"),3,socketKey);
new DLibPro(lib::socket,string("SOCKET"),3,socketKey,socketWarnKey);

new DLibPro(lib::flush_lun,string("FLUSH"),-1);

Expand Down
Loading
Loading