Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
6252748
Add distributed plan, distributed source operator, grpc communication.
mdianjun Dec 6, 2021
0704d1d
Optimize building stages and plan fragment.
mdianjun Dec 6, 2021
ea7d64b
Support distributed ALTER UPDATE/DELETE statement.
Dec 9, 2021
20bb20e
Add insert distributed for replicate merge tree
godliness Dec 8, 2021
89468c4
1.Optimize the execution of plan fragment.
mdianjun Dec 7, 2021
147cc1e
add cancel request support on new distributed plan
Dec 17, 2021
9bb71d7
add kill on all support
Dec 16, 2021
5e8773e
Add distributed sort and limit.
mdianjun Dec 14, 2021
576de4a
Change stage to use multiple parent stages.
mdianjun Dec 15, 2021
a3ef01e
Fix quering system tables on single worker.
mdianjun Dec 16, 2021
c34c5ce
Fix GRPCClient log and exception message
mdianjun Dec 17, 2021
dfe2a48
Fix limit with offset
mdianjun Dec 17, 2021
f261717
Refactor code
mdianjun Dec 17, 2021
9b3087f
Rollback distributed DDL
Dec 21, 2021
7bdc8e7
Support automatical execution on all servers of CREATE/DROP replicate…
Dec 24, 2021
9d4a896
Add distributed aggregate
mdianjun Dec 17, 2021
705c8d5
Add distributed broadcast join
mdianjun Dec 22, 2021
20551ac
Add distributed union
mdianjun Dec 23, 2021
e35e317
Add exception processing when default database is not Replicated engine
mdianjun Dec 24, 2021
decd825
Fix bug that doesn't set current database
mdianjun Dec 24, 2021
c2e77fc
Add distributed insert for table engine mergetree
godliness Dec 24, 2021
e2d10bb
Add distributed materialized view
mdianjun Dec 27, 2021
05c69f2
Read data of external tables on initial node
mdianjun Dec 30, 2021
989d1e9
Skip distributed data which insert into system database
godliness Dec 29, 2021
de9962b
Fix to initialize ClustersWatcher before all servers start
mdianjun Jan 4, 2022
73290cb
Support input function in distributed mode
mdianjun Dec 31, 2021
ab7b706
Only optimize_trivial_count on store workers
mdianjun Jan 4, 2022
fdb9c89
Select one result from system.merge_tree_settings
mdianjun Jan 5, 2022
13b449a
Improve CREATE/DROP database executed distributedly
Jan 5, 2022
ef74935
Set database_replicated_always_detach_permanently default to true
Dec 29, 2021
6a3987c
Add settings for distributed query plan
Jan 5, 2022
16d1740
Reduce thread name
Jan 5, 2022
9f202c5
throw exception
Jan 6, 2022
ec9ee6a
Sync databases metadata with meta-service, and remove local files for…
Jan 6, 2022
c59241d
Add persistent recursive watch
Jan 7, 2022
bb69892
Fix dangling pointer of query_info in GRPCServer
Jan 10, 2022
e20f07f
Fix log message in GRPCClient
mdianjun Jan 11, 2022
0c3cb98
Support IN with subquery
mdianjun Jan 5, 2022
a728ded
Add client support for recursive watch
Jan 12, 2022
026702e
Eliminate the last stage when it has only one source worker which is …
mdianjun Jan 11, 2022
d3d83bf
Fix bug about materialized view
mdianjun Jan 12, 2022
613d931
Fix the checking of join on keys after join sql is rewritten
mdianjun Jan 13, 2022
b8654a7
Set prefer_global_in_and_join default to true
mdianjun Jan 13, 2022
3d0a02f
Using grpc server exception code as client code
Jan 11, 2022
6ff37dc
Ignore znode of database lock when loading metadata from meta-service
Jan 7, 2022
5797de7
Fix watch bug
Jan 14, 2022
ace5f56
Fix SELECT WITH TOTALS/ROLLUP/CUBE
mdianjun Jan 13, 2022
69e3d7c
Fix DistributedSource constructor, not set SourceWithProgress::auto_p…
Jan 17, 2022
431873c
Cleanup QueryInfoWrapper until all consumers are done
mdianjun Jan 17, 2022
d49beaa
Use prewhere optimization on compute node even if no data
mdianjun Jan 17, 2022
a07ff61
Add share sessions
godliness Jan 12, 2022
de874bf
Transfer interpreter params in building plan fragment, and add interp…
mdianjun Jan 17, 2022
f23a8ec
Change empty_result_for_aggregation_by_empty_set to true
godliness Jan 18, 2022
62ddd97
Fix intermediate stage read data multi times
godliness Jan 18, 2022
f2009c8
Fix the tow use cases of StorageValues, one of them doesn't have view…
mdianjun Jan 18, 2022
2afec23
Execute "optimize table" on all nodes
Jan 19, 2022
81452fd
Revert empty_result_for_aggregation_by_empty_set to false
godliness Jan 22, 2022
709d153
Improve subquery, including many bugs fixed.
mdianjun Jan 18, 2022
9aad861
Fix friend class
mdianjun Jan 22, 2022
5c01196
Change private member functions of QueryPlan to public
mdianjun Jan 23, 2022
d897c07
Allow ATTACH/DROP PARTITION; disallow FETCH PARTITION; disallow any A…
Jan 23, 2022
76a464a
Fix hang when alter table where with materialized column
godliness Jan 20, 2022
03e0c20
Fix create as select multi results and no table error
godliness Jan 22, 2022
60f09f7
Add distributed SYSTEM DDL
Jan 24, 2022
7c7f46a
Schedule intermediate stages on local compute node
mdianjun Jan 24, 2022
6589478
Fix group by const error
godliness Jan 24, 2022
556228e
Build same original query plan when trivial count is optimized, and s…
mdianjun Jan 24, 2022
75f22e6
Clean grpc exception periodically thrown by producer; let consumer ca…
mdianjun Jan 24, 2022
e2e5697
Support DISTINCT
mdianjun Jan 25, 2022
9fe470f
Add distributed read/write lock to control the access to database and…
Jan 25, 2022
c1f3350
Remove table system.processes from distributed query plan
godliness Jan 24, 2022
945ac87
Fix grpc client read buffer size limit from 4MB to unlimit
Jan 25, 2022
e3dbde8
Clear garbage node
Jan 25, 2022
3127542
Fix reading some storage(e.g. numbers, memory) from local instead of …
mdianjun Jan 26, 2022
052d008
Eliminate unnecessary stages for sort and limit when shuffle has been…
mdianjun Jan 26, 2022
1b88b4d
Add default database name for SYSTEM
Jan 25, 2022
8b0fc4c
Fix ddl timeout when other ck should fail fast
Jan 26, 2022
fdbae38
Change parent stages order to sequential in order that iterating stag…
mdianjun Jan 27, 2022
5ad6f43
Set current query id if empty
mdianjun Jan 27, 2022
e0ee52c
Fix RENAME query when using distributed lock
Jan 28, 2022
0315aef
Fix create or replace table to insert occur no table error
Jan 28, 2022
12c000d
Fix distributed subquery max_query_size limitation inconsistency
Jan 28, 2022
3d0871d
Fix JOIN and WITH TOTALS
mdianjun Jan 27, 2022
9af4f34
Change stage building in right join and full join
mdianjun Jan 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ bool Client::executeMultiQuery(const String & all_queries_text)
std::vector<String> Client::loadWarningMessages()
{
std::vector<String> messages;
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings", "" /* query_id */,
connection->sendQuery(connection_parameters.timeouts, "SELECT message FROM system.warnings", "zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz" /* query_id */,
QueryProcessingStage::Complete, nullptr, nullptr, false);
while (true)
{
Expand Down
19 changes: 19 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,22 @@ if (ThreadFuzzer::instance().isEffective())
global_context->addWarningMessage("Server was built with sanitizer. It will work slowly.");
#endif

if (config().has("running_mode"))
{
String running_mode = config().getString("running_mode", "compute");
if (running_mode == "compute")
{
global_context->setRunningMode(Context::RunningMode::COMPUTE);
LOG_INFO(log, "Running mode is COMPUTE");
}
else if (running_mode == "store")
{
global_context->setRunningMode(Context::RunningMode::STORE);
LOG_INFO(log, "Running mode is STORE");
} else {
throw Exception("Invalid running_mode", ErrorCodes::INVALID_CONFIG_PARAMETER);
}
}

// Initialize global thread pool. Do it before we fetch configs from zookeeper
// nodes (`from_zk`), because ZooKeeper interface uses the pool. We will
Expand Down Expand Up @@ -1546,6 +1562,9 @@ if (ThreadFuzzer::instance().isEffective())
&CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID));
}

/// Register with clickhouse-keeper and watch on clusters change.
global_context->setClustersWatcher(std::make_unique<ClustersWatcher>(DEFAULT_ZOOKEEPER_CLUSTERS_PATH, global_context));

for (auto & server : *servers)
server.start();
LOG_INFO(log, "Ready for connections.");
Expand Down
144 changes: 144 additions & 0 deletions src/Client/GRPCClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#include <sstream>
#include <Client/GRPCClient.h>
#include <Formats/FormatFactory.h>
#include <Formats/NativeReader.h>
#include <IO/ReadBufferFromString.h>
#include <grpcpp/grpcpp.h>
#include "Common/ErrorCodes.h"
#include "base/logger_useful.h"
#include "clickhouse_grpc.grpc.pb.h"
#include "clickhouse_grpc.pb.h"

namespace Poco
{
class Logger;
}

using GRPCQueryInfo = clickhouse::grpc::QueryInfo;
using GRPCResult = clickhouse::grpc::Result;
using GRPCException = clickhouse::grpc::Exception;
using GRPCTicket = clickhouse::grpc::Ticket;

namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_GRPC_QUERY_INFO;
extern const int GRPC_READ_ERROR;
extern const int GRPC_CANCEL_ERROR;
}

GRPCClient::GRPCClient(const String & addr_, const String & description_)
{
addr = addr_;
log = &Poco::Logger::get("GRPCClient(" + description_ + ")");
}

GRPCResult GRPCClient::executePlanFragment(const GRPCQueryInfo & query_info)
{
auto ch = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
auto stub = clickhouse::grpc::ClickHouse::NewStub(ch);
grpc::ClientContext ctx;
GRPCResult result;
grpc::Status status = stub->ExecutePlanFragment(&ctx, query_info, &result);

if (status.ok())
return result;
else
{
LOG_ERROR(
log,
"Send query info to {} failed, code: {}, plan fragment id: {}.",
addr,
status.error_code(),
query_info.initial_query_id() + "/" + toString(query_info.stage_id()) + "/" + query_info.node_id());
throw Exception(status.error_message() + ", " + result.exception().display_text(), ErrorCodes::INVALID_GRPC_QUERY_INFO, true);
}
}

void GRPCClient::prepareRead(const GRPCTicket & ticket_)
{
ticket = ticket_;
grpc::ChannelArguments arg;
arg.SetMaxReceiveMessageSize(-1);
auto ch = grpc::CreateCustomChannel(addr, grpc::InsecureChannelCredentials(), arg);
std::shared_ptr<grpc::ClientContext> ctx = std::make_shared<grpc::ClientContext>();
auto stub = clickhouse::grpc::ClickHouse::NewStub(ch);
auto reader = stub->FetchPlanFragmentResult(ctx.get(), ticket);
inner_context = std::make_unique<InnerContext>(ch, ctx, stub, reader);
}

GRPCClient::MessageType GRPCClient::read(Block & block)
{
assert(inner_context);

GRPCResult result;
if (inner_context->reader->Read(&result))
{
if (result.exception().code() != 0)
{
LOG_ERROR(
log,
"Read failed, exception.code: {}, exception.text: {}.",
result.exception().code(),
result.exception().display_text());
throw Exception(result.exception().display_text(), result.exception().code(), true);
}

/// Note: totals and extremes are not used and not tested yet.
if (result.totals().size() > 0)
{
ReadBufferFromString b(result.totals());
NativeReader reader(b, 0);
block = reader.read();
return MessageType::Totals;
}

if (result.extremes().size() > 0)
{
ReadBufferFromString b(result.extremes());
NativeReader reader(b, 0);
block = reader.read();
return MessageType::Extremes;
}

if (!result.output().empty())
{
ReadBufferFromString b(result.output());
NativeReader reader(b, 0);
block = reader.read();
}
return MessageType::Data;
}

throw Exception(
"Read from grpc server " + addr + " failed, " + toString(result.exception().code()) + ", " + result.exception().display_text(),
ErrorCodes::GRPC_READ_ERROR,
true);
}

void GRPCClient::cancel()
{
grpc::ClientContext ctx;
GRPCResult result;

auto status = inner_context->stub->CancelPlanFragment(&ctx, ticket, &result);

auto plan_fragment_id = ticket.initial_query_id() + "/" + toString(ticket.stage_id()) + "/" + ticket.node_id();
if (status.ok())
{
if (result.cancelled())
LOG_DEBUG(log, "Cancel success, plan fragment id: {}", plan_fragment_id);
else
{
throw Exception("Cancel failed from " + addr + ", plan fragment id: " + plan_fragment_id + ", code: " + toString(result.exception().code()) + ", " + result.exception().display_text(), ErrorCodes::GRPC_CANCEL_ERROR, true);
}
}
else
{
LOG_ERROR(
log, "Cancel failed, code: {}, plan fragment id: {}.", status.error_code(), plan_fragment_id);
throw Exception(status.error_message() + ", " + result.exception().display_text(), ErrorCodes::GRPC_CANCEL_ERROR, true);
}
}
}
78 changes: 78 additions & 0 deletions src/Client/GRPCClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#pragma once
//#if USE_GRPC
#include <map>
#include <memory>
#include <shared_mutex>

#include <grpcpp/grpcpp.h>
#include <grpcpp/impl/codegen/proto_utils.h>
#include <grpcpp/impl/codegen/sync_stream.h>

#include <Core/Block.h>
#include <base/types.h>
#include <Poco/Net/SocketAddress.h>
#include "clickhouse_grpc.grpc.pb.h"
#include "clickhouse_grpc.pb.h"

using GRPCQueryInfo = clickhouse::grpc::QueryInfo;
using GRPCResult = clickhouse::grpc::Result;
using GRPCTicket = clickhouse::grpc::Ticket;
using GRPCStub = clickhouse::grpc::ClickHouse::Stub;

namespace DB
{
using ReadDataCallback = std::function<void(const Block & block)>;

class GRPCClient
{
public:
enum MessageType
{
Data = 1,
Totals = 2,
Extremes = 3,
MAX = Extremes,
};
public:
GRPCClient(const String & addr_, const String & description_);
~GRPCClient() = default;

/// Send params of plan fragment to remote, and execute it.
GRPCResult executePlanFragment(const GRPCQueryInfo & query_info);

/// Initialize reader and inner context.
void prepareRead(const GRPCTicket & ticket_);

/// Try to read a block from remote.
/// If got EOF, an empty block will be returned, you can use if (!block) to check it.
MessageType read(Block & bock);

/// Cancel plan fragment (ticket associated with the prepareRead)
void cancel();

private:
struct InnerContext
{
InnerContext(
std::shared_ptr<grpc::Channel> & ch_,
std::shared_ptr<grpc::ClientContext> & ctx_,
std::unique_ptr<GRPCStub> & stub_,
std::unique_ptr<grpc::ClientReader<GRPCResult>> & reader_)
: ch(ch_), ctx(ctx_), stub(std::move(stub_)), reader(std::move(reader_))
{
}
~InnerContext() { }

std::shared_ptr<grpc::Channel> ch;
std::shared_ptr<grpc::ClientContext> ctx;
std::unique_ptr<GRPCStub> stub;
std::unique_ptr<grpc::ClientReader<GRPCResult>> reader;
};

Poco::Logger * log;
String addr;
std::unique_ptr<InnerContext> inner_context;
GRPCTicket ticket;
};
}
//#endif
2 changes: 1 addition & 1 deletion src/Client/Suggest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p

void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false);
connection.sendQuery(timeouts, query, "zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false);

while (true)
{
Expand Down
3 changes: 3 additions & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,9 @@
M(623, CAPN_PROTO_BAD_CAST) \
M(624, BAD_FILE_TYPE) \
M(625, IO_SETUP_ERROR) \
M(702, GRPC_READ_ERROR) \
M(703, GRPC_CANCEL_ERROR) \
M(704, CANNOT_DROP_DATABASE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
Expand Down
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
M(ZooKeeperCheck, "") \
M(ZooKeeperClose, "") \
M(ZooKeeperWatchResponse, "") \
M(ZooKeeperAddWatch, "") \
M(ZooKeeperRemomveWatches, "") \
M(ZooKeeperUserExceptions, "") \
M(ZooKeeperHardwareExceptions, "") \
M(ZooKeeperOtherExceptions, "") \
Expand Down
Loading