Skip to content

Commit 3347149

Browse files
release300.2
1 parent c8b2ed1 commit 3347149

14 files changed

+208
-97
lines changed

include/ConstantImp.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -936,7 +936,7 @@ class AbstractFastVector: public Vector{
936936
if(buf == (Y*)data_+start)
937937
return true;
938938
else if(getType() == sourceType)
939-
memcpy(data_+start,buf,sizeof(Y)*len);
939+
memcpy(reinterpret_cast<void *>(data_ + start), reinterpret_cast<const void *>(buf), sizeof(Y) * len);
940940
else
941941
for(int i=0;i<len;++i)
942942
data_[start+i]=(buf[i] == sourceNullVal)? nullVal_: static_cast<T>(buf[i]);
@@ -947,7 +947,7 @@ class AbstractFastVector: public Vector{
947947
inline bool appendData(Y* buf, int len, DATA_TYPE sourceType, Y sourceNullVal){
948948
checkCapacity(len);
949949
if(getType() == sourceType)
950-
memcpy(data_+size_, buf, sizeof(Y) * len);
950+
memcpy(reinterpret_cast<void *>(data_ + size_), reinterpret_cast<const void *>(buf), sizeof(Y) * len);
951951
else
952952
for(int i=0;i<len;++i)
953953
data_[size_+i]=(buf[i] == sourceNullVal)? nullVal_ : static_cast<T>(buf[i]);

include/DBConnectionPoolImpl.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ namespace dolphindb {
77
class DBConnectionPoolImpl{
88
public:
99
struct Task{
10-
Task(const std::string& sc = "", int id = 0, int pr = 4, int pa = 2, bool clearM = false)
11-
: script(sc), identity(id), priority(pr), parallelism(pa), clearMemory(clearM){}
12-
Task(const std::string& function, const std::vector<ConstantSP>& args, int id = 0, int pr = 4, int pa = 2, bool clearM = false)
13-
: script(function), arguments(args), identity(id), priority(pr), parallelism(pa), clearMemory(clearM){ isFunc = true; }
10+
Task(const std::string& sc = "", int id = 0, int pr = 4, int pa = 2, int fetchSize = 0, bool clearM = false)
11+
: script(sc), identity(id), priority(pr), parallelism(pa), fetchSize(fetchSize), clearMemory(clearM) {}
12+
Task(const std::string& function, const std::vector<ConstantSP>& args, int id = 0, int pr = 4, int pa = 2, int fetchSize = 0, bool clearM = false)
13+
: script(function), arguments(args), identity(id), priority(pr), parallelism(pa), fetchSize(fetchSize), clearMemory(clearM){ isFunc = true; }
1414
std::string script;
1515
std::vector<ConstantSP> arguments;
1616
int identity;
1717
int priority;
1818
int parallelism;
19+
int fetchSize;
1920
bool clearMemory;
2021
bool isFunc = false;
2122
};
@@ -33,12 +34,12 @@ class DBConnectionPoolImpl{
3334
}
3435
}
3536
void run(const std::string& script, int identity, int priority=4, int parallelism=64, int fetchSize=0, bool clearMemory = false){
36-
queue_->push(Task(script, identity, priority, parallelism, clearMemory));
37+
queue_->push(Task(script, identity, priority, parallelism, fetchSize, clearMemory));
3738
taskStatus_.setResult(identity, TaskStatusMgmt::Result());
3839
}
3940

4041
void run(const std::string& functionName, const std::vector<ConstantSP>& args, int identity, int priority=4, int parallelism=64, int fetchSize=0, bool clearMemory = false){
41-
queue_->push(Task(functionName, args, identity, priority, parallelism, clearMemory));
42+
queue_->push(Task(functionName, args, identity, priority, parallelism, fetchSize, clearMemory));
4243
taskStatus_.setResult(identity, TaskStatusMgmt::Result());
4344
}
4445

include/SharedMem.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#ifndef __SharedMem_H
2-
#define __IpcMem_H
2+
#define __SharedMem_H
33

44
#ifdef LINUX
55

include/Streaming.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@ namespace dolphindb {
1010
class DBConnection;
1111
class StreamingClientImpl;
1212

13+
struct SubscribeQueue {
14+
MessageQueueSP queue_;
15+
std::shared_ptr<std::atomic<bool>> stopped_{nullptr};
16+
SubscribeQueue()
17+
:queue_(nullptr), stopped_(nullptr) {}
18+
SubscribeQueue(MessageQueueSP queue, std::shared_ptr<std::atomic<bool>> stopped)
19+
:queue_(queue), stopped_(stopped) {}
20+
};
21+
1322
class EXPORT_DECL StreamingClient {
1423
public:
1524
//listeningPort > 0 : listen mode, wait for server connection
@@ -20,7 +29,7 @@ class EXPORT_DECL StreamingClient {
2029
void exit();
2130

2231
protected:
23-
MessageQueueSP subscribeInternal(std::string host, int port, std::string tableName, std::string actionName = DEFAULT_ACTION_NAME,
32+
SubscribeQueue subscribeInternal(std::string host, int port, std::string tableName, std::string actionName = DEFAULT_ACTION_NAME,
2433
int64_t offset = -1, bool resubscribe = true, const VectorSP &filter = nullptr,
2534
bool msgAsTable = false, bool allowExists = false, int batchSize = 1,
2635
std::string userName="", std::string password="",

src/AsynWorker.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ namespace dolphindb {
2222
while(true) {
2323
try {
2424
if(task.isFunc){
25-
result = conn_->run(task.script, task.arguments, task.priority, task.parallelism, 0, task.clearMemory);
25+
result = conn_->run(task.script, task.arguments, task.priority, task.parallelism, task.fetchSize, task.clearMemory);
2626
}
2727
else{
28-
result = conn_->run(task.script, task.priority, task.parallelism, 0, task.clearMemory);
28+
result = conn_->run(task.script, task.priority, task.parallelism, task.fetchSize, task.clearMemory);
2929
}
3030
break;
3131
}

src/CMakeLists.txt

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
if (UNIX)
2+
target_compile_definitions(${CMAKE_PROJECT_NAME} PUBLIC LINUX)
3+
endif()
4+
5+
target_include_directories(${CMAKE_PROJECT_NAME}
6+
PUBLIC ${PROJECT_SOURCE_DIR}/include
7+
PUBLIC ${OPENSSL_INCLUDE_DIR}
8+
)
9+
10+
target_sources(${CMAKE_PROJECT_NAME} PUBLIC
11+
AsynWorker.cpp
12+
BatchTableWriter.cpp
13+
Compress.cpp
14+
Concurrent.cpp
15+
ConstantImp.cpp
16+
ConstantMarshall.cpp
17+
DBConnectionPoolImpl.cpp
18+
DFSChunkMeta.cpp
19+
DictionaryImp.cpp
20+
DolphinDB.cpp
21+
DolphinDBImp.cpp
22+
DomainImp.cpp
23+
EventHandler.cpp
24+
Exceptions.cpp
25+
Format.cpp
26+
Guid.cpp
27+
int128.cpp
28+
LZ4.cpp
29+
Matrix.cpp
30+
MultithreadedTableWriter.cpp
31+
ScalarImp.cpp
32+
SetImp.cpp
33+
SharedMem.cpp
34+
Streaming.cpp
35+
StreamingUtil.cpp
36+
SymbolBase.cpp
37+
SysIO.cpp
38+
TableImp.cpp
39+
TaskStatusMgmt.cpp
40+
Util.cpp
41+
Vector.cpp
42+
)

src/Concurrent.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ void Thread::setAffinity(int id) {
460460
CPU_ZERO(&mask);
461461
CPU_SET(id, &mask);
462462
if (sched_setaffinity(0, sizeof(mask), &mask) == -1) {
463-
throw RuntimeException("BindCore failed, error code " + errno);
463+
throw RuntimeException("BindCore failed, error code " + std::to_string(errno));
464464
}
465465
#endif
466466
}

src/MultithreadedTableWriter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ MultithreadedTableWriter::MultithreadedTableWriter(const std::string& hostName,
4646
if (pCompressMethods != nullptr && pCompressMethods->size() > 0) {
4747
for (auto one : *pCompressMethods) {
4848
if (one != COMPRESS_DELTA && one != COMPRESS_LZ4) {
49-
throw RuntimeException("Unsupported compression method " + one);
49+
throw RuntimeException("Unsupported compression method " + std::to_string(one));
5050
}
5151
}
5252
saveCompressMethods_ = *pCompressMethods;

0 commit comments

Comments
 (0)