Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
d8eb448
remove ps_core dependency
LiYuRio Jan 11, 2023
d92cf96
fix both with_rpc and with_distributed on
LiYuRio Jan 17, 2023
5bacf03
add cond interceptor
LiYuRio Jan 20, 2023
04627d6
remove max_slot_num
LiYuRio Jan 28, 2023
563055f
fix test case
LiYuRio Jan 28, 2023
006bf14
fix cmake
LiYuRio Jan 30, 2023
28668c2
Modified compute and amplifier interceptor
LiYuRio Jan 13, 2023
88f6413
add multi fetch
LiYuRio Jan 29, 2023
f616a58
fix gc and infinite buffer size
LiYuRio Jan 31, 2023
a19f9dc
fix start message
LiYuRio Feb 1, 2023
8767c9f
eval pipeline
Feb 2, 2023
440ea56
iter 2 success!
Feb 4, 2023
1b5937f
Merge branch 'incubate/frl_train_eval' of https://github.com/LiYuRio/…
Feb 4, 2023
fd9d981
Start task
Feb 4, 2023
fd39162
fix fetch_list
Feb 4, 2023
a44f1de
remove import
Feb 4, 2023
2e2345d
fix start interceptor bug
LiYuRio Feb 4, 2023
cac7a84
Merge branch 'dev_eval' of https://github.com/LiYuRio/Paddle into inc…
Feb 4, 2023
b33018f
fix bug in start interceptor
LiYuRio Feb 5, 2023
580afb6
Merge branch 'dev_eval' of https://github.com/LiYuRio/Paddle into inc…
Feb 5, 2023
4a85c2f
modify interceptor
Feb 5, 2023
577c212
wip: rpc op
LiYuRio Feb 2, 2023
8984c37
fix ssl symbol not found
LiYuRio Oct 10, 2022
8907d76
for mp
Feb 6, 2023
d7e3384
feat: rpc op
HermitSun Feb 6, 2023
0642f0c
refactor: rm call_id
HermitSun Feb 6, 2023
97a2667
refactor: clean request code
HermitSun Feb 7, 2023
bb19ccf
feat: new schema
HermitSun Feb 7, 2023
6812bc1
refactor: robust
HermitSun Feb 7, 2023
416e148
chore: update test script
HermitSun Feb 7, 2023
f2faf37
fix: tensor copy accross devices
HermitSun Feb 7, 2023
71ca2aa
fix: fix path not correctly recorded
HermitSun Feb 7, 2023
b5ca501
fix: typo
HermitSun Feb 7, 2023
9192f5e
fix pipelinepass for mp scene
Feb 8, 2023
f5a6031
feat: add tokenizer
HermitSun Feb 9, 2023
c6f87bc
feat: update schema
HermitSun Feb 9, 2023
b611316
chore: return str script
HermitSun Feb 9, 2023
8ce916a
fix: incorrect attrs
HermitSun Feb 9, 2023
4ed46c6
fix: some incorrect encoding
HermitSun Feb 10, 2023
c323134
fix: utf-8 decode
HermitSun Feb 10, 2023
984afea
refactor: use wstring to support utf tokenize
HermitSun Feb 11, 2023
34b510f
wip: tokenizer
HermitSun Feb 12, 2023
079a43c
update ps core
LiYuRio Jan 13, 2023
9240bca
feat: tokenizers
HermitSun Feb 12, 2023
dee1103
feat: rpc utils
HermitSun Feb 12, 2023
419ee19
wip: libpaddle.so cmake
HermitSun Feb 12, 2023
893c0c9
fix: cannot link icuuc
HermitSun Feb 13, 2023
7a87822
refactor: use link_target instead of set
HermitSun Feb 13, 2023
6e75a69
update rpc apis
sljlp Feb 13, 2023
7512255
refactor: new ops
HermitSun Feb 13, 2023
de52cae
refactor: throw when request fail
HermitSun Feb 13, 2023
c2ea592
refactor: rpc utils
HermitSun Feb 13, 2023
6ec7be6
fix: potential src_id overflow
HermitSun Feb 13, 2023
4d3aba4
Merge branch 'rpc-op' of https://github.com/HermitSun/Paddle into rpc-op
sljlp Feb 13, 2023
7550a40
update
sljlp Feb 13, 2023
21a36d6
fix: int64_t support
HermitSun Feb 14, 2023
57d9ce8
refactor: add type hint
HermitSun Feb 14, 2023
2b6ce43
support batched src ids
sljlp Feb 14, 2023
f0506ad
update
sljlp Feb 14, 2023
e8d8ccd
update
sljlp Feb 14, 2023
462a47d
fix
sljlp Feb 14, 2023
d813d73
update
sljlp Feb 14, 2023
052ed62
update
sljlp Feb 14, 2023
cb8ccf2
add ut
sljlp Feb 14, 2023
bcd935f
update
sljlp Feb 14, 2023
54cca17
fix
sljlp Feb 14, 2023
f7611cf
new parser
sljlp Feb 14, 2023
bdbff2b
fix bug throwing type error in fp16 mode in elementwise_div_grad
sljlp Feb 14, 2023
5018ac8
update
sljlp Feb 15, 2023
8fac6fd
update
sljlp Feb 15, 2023
f24940c
rm tokenizer
sljlp Feb 15, 2023
a2022d0
fix
sljlp Feb 15, 2023
aedd638
cacnel ut
sljlp Feb 16, 2023
07303b0
rm
sljlp Feb 16, 2023
a7bd575
add ut
sljlp Feb 16, 2023
05ec8d3
update
sljlp Feb 16, 2023
6ea256c
delete note
sljlp Feb 17, 2023
05faa20
update
sljlp Feb 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ repos:
rev: v4.1.0
hooks:
- id: check-added-large-files
exclude: |
(?x)^(
paddle/fluid/operators/collective/thirdparty/json.h
)$
- id: check-merge-conflict
- id: check-symlinks
- id: detect-private-key
Expand All @@ -35,7 +39,8 @@ repos:
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|xpu|kps)$
exclude: |
(?x)^(
paddle/fluid/distributed/ps/thirdparty/round_robin.h
paddle/fluid/distributed/ps/thirdparty/round_robin.h|
paddle/fluid/operators/collective/thirdparty/json.h
)$
- repo: local
hooks:
Expand All @@ -62,7 +67,8 @@ repos:
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|xpu|kps|py|sh)$
exclude: |
(?x)^(
paddle/utils/.*
paddle/utils/.*|
paddle/fluid/operators/collective/thirdparty/json.h
)$
- repo: local
hooks:
Expand Down
2 changes: 1 addition & 1 deletion cmake/generic.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ if(NOT APPLE AND NOT WIN32)
link_libraries(${CMAKE_THREAD_LIBS_INIT})
if(WITH_PSLIB OR WITH_DISTRIBUTE)
set(CMAKE_CXX_LINK_EXECUTABLE
"${CMAKE_CXX_LINK_EXECUTABLE} -pthread -ldl -lrt -lz -lssl")
"${CMAKE_CXX_LINK_EXECUTABLE} -pthread -ldl -lrt -lz -lssl -lcrypto")
else()
set(CMAKE_CXX_LINK_EXECUTABLE
"${CMAKE_CXX_LINK_EXECUTABLE} -pthread -ldl -lrt")
Expand Down
13 changes: 13 additions & 0 deletions cmake/third_party.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,19 @@ if(WITH_PSCORE)
list(APPEND third_party_deps extern_rocksdb)
endif()

if(WITH_DISTRIBUTE
AND NOT WITH_PSLIB
AND NOT WITH_PSCORE
AND NOT WITH_RPC)
include(external/snappy)
list(APPEND third_party_deps extern_snappy)

include(external/leveldb)
list(APPEND third_party_deps extern_leveldb)
include(external/brpc)
list(APPEND third_party_deps extern_brpc)
endif()

if(WITH_XBYAK)
include(external/xbyak) # download, build, install xbyak
list(APPEND third_party_deps extern_xbyak)
Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(auto_parallel)
add_subdirectory(collective)
add_subdirectory(store)
add_subdirectory(fleet_executor)
if(WITH_PYTHON)
py_proto_compile(ps_py_proto SRCS the_one_ps.proto)
add_custom_target(
Expand Down Expand Up @@ -29,7 +30,6 @@ if(WITH_PYTHON)
endif()

if(NOT WITH_PSCORE)
add_subdirectory(fleet_executor)
return()
endif()

Expand All @@ -47,4 +47,3 @@ add_subdirectory(common)
add_subdirectory(ps)
add_subdirectory(test)
add_subdirectory(index_dataset)
add_subdirectory(fleet_executor)
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/fleet_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ proto_library(interceptor_message_proto SRCS interceptor_message.proto)

if(WITH_ARM_BRPC)
set(BRPC_DEPS arm_brpc snappy gflags glog)
elseif(WITH_DISTRIBUTE AND WITH_PSCORE)
elseif(WITH_DISTRIBUTE)
set(BRPC_DEPS
brpc
ssl
Expand Down Expand Up @@ -36,6 +36,8 @@ cc_library(
interceptor.cc
compute_interceptor.cc
amplifier_interceptor.cc
cond_interceptor.cc
start_interceptor.cc
source_interceptor.cc
sink_interceptor.cc
message_service.cc
Expand Down Expand Up @@ -66,6 +68,10 @@ if(WITH_DISTRIBUTE)
set_source_files_properties(
amplifier_interceptor.cc PROPERTIES COMPILE_FLAGS
${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(
cond_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(
start_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(
source_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,23 @@ void AmplifierInterceptor::RunOps() {
// run_per_steps_, run_at_offset_
// 4, 0 --> run at step 0, 4, 8, 12
// 4, 3 --> run at step 3, 7, 11, 15
if ((step_ % run_per_steps_) == run_at_offset_) {
if ((cur_scope_id_ % run_per_steps_) == run_at_offset_) {
ComputeInterceptor::RunOps();
}
}

void AmplifierInterceptor::SendDataReadyToDownStream() {
// run multi times, send ready one times to downstream, that is
// input multi times, output one times
if (step_ % send_down_per_steps_ == 0) {
if (cur_scope_id_ % send_down_per_steps_ == 0) {
ComputeInterceptor::SendDataReadyToDownStream();
}
}

void AmplifierInterceptor::ReplyCompletedToUpStream() {
// run multi times, reply one times to upstream, that is
// input one times, output multi times
if (step_ % reply_up_per_steps_ == 0) {
if (cur_scope_id_ % reply_up_per_steps_ == 0) {
ComputeInterceptor::ReplyCompletedToUpStream();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace paddle {
namespace distributed {

class AmplifierInterceptor : public ComputeInterceptor {
class AmplifierInterceptor final : public ComputeInterceptor {
public:
AmplifierInterceptor(int64_t interceptor_id, TaskNode* node);

Expand Down
129 changes: 91 additions & 38 deletions paddle/fluid/distributed/fleet_executor/carrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "paddle/fluid/distributed/fleet_executor/carrier.h"

#include <algorithm>
#include <vector>

#include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
Expand All @@ -24,6 +25,7 @@
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/framework/variable_helper.h"

namespace paddle {
Expand All @@ -33,6 +35,8 @@ USE_INTERCEPTOR(Source);
USE_INTERCEPTOR(Compute);
USE_INTERCEPTOR(Amplifier);
USE_INTERCEPTOR(Sink);
USE_INTERCEPTOR(Cond);
USE_INTERCEPTOR(Start);

void Carrier::Init(
int64_t rank,
Expand All @@ -54,24 +58,40 @@ void Carrier::Init(
framework::Scope* scope,
int64_t num_micro_batches,
const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars) {
const std::vector<std::string>& inference_root_scope_vars,
const std::vector<framework::Scope*>& micro_scope_list) {
rank_ = rank;
interceptor_id_to_rank_ = interceptor_id_to_rank;
interceptor_id_to_node_ = interceptor_id_to_node;
place_ = place;
root_scope_ = scope;
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_);
bool need_create_scope = micro_scope_list.empty();

PADDLE_ENFORCE_NOT_NULL(
root_scope_,
platform::errors::InvalidArgument("root_scope can not be nullptr"));
minibatch_scope_ = &root_scope_->NewScope();
microbatch_scopes_.resize(num_micro_batches);
for (int i = 0; i < num_micro_batches; ++i) {
microbatch_scopes_[i] = &minibatch_scope_->NewScope();
CopyParameters(i, program, inference_root_scope_vars);

VLOG(3) << "===========Num micro batches================="
<< num_micro_batches;
if (need_create_scope) {
minibatch_scope_ = &root_scope_->NewScope();
microbatch_scopes_.resize(num_micro_batches);
for (int i = 0; i < num_micro_batches; ++i) {
microbatch_scopes_[i] = &minibatch_scope_->NewScope();
CopyParameters(i, program, inference_root_scope_vars);
}
} else {
microbatch_scopes_ = micro_scope_list;
for (int i = 0; i < num_micro_batches; ++i) {
CopyParameters(i, program, inference_root_scope_vars);
}
}

// Add source and sink interceptor id to rank
interceptor_id_to_rank_.emplace(SOURCE_ID, rank);
interceptor_id_to_rank_.emplace(SINK_ID, rank);

// TODO(fleet_exe dev): thread pool
thread_num_ = 1;
thread_pool_.SetThreadNum(thread_num_);
Expand All @@ -93,29 +113,30 @@ void Carrier::CopyParameters(
int microbatch_id,
const framework::ProgramDesc& program,
const std::vector<std::string>& inference_root_scope_vars) {
auto& global_block = program.Block(0);

std::map<std::string, int> inference_root_scope_var_map;
for (auto var_name : inference_root_scope_vars) {
inference_root_scope_var_map.insert({var_name, 1});
}
for (auto& var : global_block.AllVars()) {
std::string var_name = var->Name();
bool force_root = inference_root_scope_var_map.find(var_name) !=
inference_root_scope_var_map.end();
if (force_root) {
VLOG(4) << var_name << " will be forced to be created in the root scope.";
}
if ((var->Persistable() || force_root) && microbatch_id == 0) {
auto* ptr = root_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
VLOG(5) << "Create persistable var: " << var->Name()
<< ", which pointer is " << ptr;
} else if (!var->Persistable()) {
auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name());
VLOG(5) << "Create variable " << var->Name() << " for microbatch "
<< microbatch_id << ", which pointer is " << ptr << ".";
InitializeVariable(ptr, var->GetType());
for (size_t i = 0; i < program.Size(); ++i) {
for (auto& var : program.Block(i).AllVars()) {
std::string var_name = var->Name();
bool force_root = inference_root_scope_var_map.find(var_name) !=
inference_root_scope_var_map.end();
if (force_root) {
VLOG(4) << var_name
<< " will be forced to be created in the root scope.";
}
if ((var->Persistable() || force_root) && microbatch_id == 0) {
auto* ptr = root_scope_->Var(var->Name());
InitializeVariable(ptr, var->GetType());
VLOG(5) << "Create persistable var: " << var->Name()
<< ", which pointer is " << ptr;
} else if (!var->Persistable()) {
auto* ptr = microbatch_scopes_[microbatch_id]->Var(var->Name());
VLOG(5) << "Create variable " << var->Name() << " for microbatch "
<< microbatch_id << ", which pointer is " << ptr << ".";
InitializeVariable(ptr, var->GetType());
}
}
}
}
Expand Down Expand Up @@ -159,16 +180,11 @@ void Carrier::Start() {
true,
platform::errors::PreconditionNotMet(
"Using carrier before initialized."));
for (int64_t id : source_interceptor_ids_) {
VLOG(3) << "Carrier Start is sending start to source interceptor " << id
<< ".";
InterceptorMessage start_msg;
// source node data_is_ready is send by carrier, so set src_id=-1
start_msg.set_src_id(-1);
start_msg.set_dst_id(id);
start_msg.set_message_type(DATA_IS_READY);
Send(start_msg);
}
InterceptorMessage start_msg;
start_msg.set_src_id(SOURCE_ID);
start_msg.set_dst_id(SOURCE_ID);
start_msg.set_message_type(START);
Send(start_msg);
// TODO(wangxi): async step
Wait();
dev_ctx_->Wait();
Expand Down Expand Up @@ -270,6 +286,37 @@ void Carrier::CreateInterceptors() {

auto gc = GetGC(place_);

// create source and sink task node
auto max_run_times = microbatch_scopes_.size();
TaskNode* source = new TaskNode(
rank_, SOURCE_ID, max_run_times); // rank, task_id, max_run_times
TaskNode* sink = new TaskNode(rank_, SINK_ID, max_run_times);
// find nodes without upstreams or without downstreams
std::vector<TaskNode*> origin_sources, origin_sinks;
for (const auto& item : interceptor_id_to_node_) {
TaskNode* task_node = item.second;
if (task_node->upstream().empty()) {
origin_sources.emplace_back(task_node);
}
if (task_node->downstream().empty()) {
origin_sinks.emplace_back(task_node);
}
}
// link source node with origin source
for (const auto& node : origin_sources) {
source->AddDownstreamTask(node->task_id(), max_run_times);
node->AddUpstreamTask(SOURCE_ID, max_run_times);
}
// link sink node with origin sink
for (const auto& node : origin_sinks) {
sink->AddUpstreamTask(node->task_id(), max_run_times);
node->AddDownstreamTask(SINK_ID, max_run_times);
}
// create source and sink interceptor
SetInterceptor(SOURCE_ID,
InterceptorFactory::Create("Source", SOURCE_ID, source));
SetInterceptor(SINK_ID, InterceptorFactory::Create("Sink", SINK_ID, sink));

// create each Interceptor
// no auto init since there is no config
for (const auto& item : interceptor_id_to_node_) {
Expand Down Expand Up @@ -303,9 +350,15 @@ void Carrier::CreateInterceptors() {
VLOG(3) << "Create Interceptor with interceptor id: " << interceptor_id
<< " with type: " << task_node->type() << ".";

if (task_node->upstream().empty()) {
source_interceptor_ids_.emplace_back(interceptor_id);
}
PADDLE_ENFORCE_EQ(
task_node->upstream().empty(),
false,
platform::errors::PreconditionNotMet(
"There should not have normal nodes as source nodes"));
PADDLE_ENFORCE_EQ(task_node->downstream().empty(),
false,
platform::errors::PreconditionNotMet(
"There should not have normal nodes as sink nodes"));
}
}

Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/distributed/fleet_executor/carrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "paddle/fluid/distributed/fleet_executor/interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h"
#include "paddle/fluid/distributed/fleet_executor/task_loop_thread_pool.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/errors.h"
Expand Down Expand Up @@ -60,7 +61,8 @@ class Carrier final {
framework::Scope* scope,
int64_t num_micro_batches,
const platform::Place& place,
const std::vector<std::string>& inference_root_scope_vars = {});
const std::vector<std::string>& inference_root_scope_vars = {},
const std::vector<framework::Scope*>& micro_scope_list = {});

void CopyParameters(
int microbatch_id,
Expand Down Expand Up @@ -100,8 +102,6 @@ class Carrier final {
std::unordered_map<int64_t, std::unique_ptr<Interceptor>>
interceptor_idx_to_interceptor_;

std::vector<int64_t> source_interceptor_ids_;

bool is_init_{false};

std::mutex running_mutex_;
Expand Down
Loading