Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/common/yql_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs";
constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs";
constexpr TStringBuf WatermarksIdleTimeoutUsSetting = "WatermarksIdleTimeoutUs";
constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions";
constexpr TStringBuf WatermarksLateEventsPolicySetting = "WatermarksLateEventsPolicy";
constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod";
constexpr TStringBuf ReadGroup = "ReadGroup";
constexpr TStringBuf SkipJsonErrors = "SkipJsonErrors";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
{"Index": 3, "Name": "Columns", "Type": "TExprBase"},
{"Index": 4, "Name": "Format", "Type": "TCoAtom"},
{"Index": 5, "Name": "Compression", "Type": "TCoAtom"},
{"Index": 6, "Name": "LimitHint", "Type": "TExprBase", "Optional": true},
{"Index": 7, "Name": "Settings", "Type": "TExprList", "Optional": true},
{"Index": 6, "Name": "LimitHint", "Type": "TExprBase"},
{"Index": 7, "Name": "Settings", "Type": "TCoNameValueTupleList"},
{"Index": 8, "Name": "Watermark", "Type": "TCoLambda", "Optional": true}
]
},
Expand Down
27 changes: 15 additions & 12 deletions ydb/library/yql/providers/pq/provider/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,11 @@ SRCS(
PEERDIR(
library/cpp/random_provider
library/cpp/time_provider
yql/essentials/ast
yql/essentials/core
yql/essentials/core/type_ann

ydb/library/yql/dq/expr_nodes
yql/essentials/core/dq_integration
ydb/library/yql/dq/opt
yql/essentials/minikql/comp_nodes
yql/essentials/providers/common/config
ydb/library/yql/providers/common/db_id_async_resolver
yql/essentials/providers/common/dq
yql/essentials/providers/common/proto
yql/essentials/providers/common/provider
ydb/library/yql/providers/common/pushdown
yql/essentials/providers/common/structured_token
yql/essentials/providers/common/transform
ydb/library/yql/providers/dq/common
ydb/library/yql/providers/dq/expr_nodes
ydb/library/yql/providers/dq/provider/exec
Expand All @@ -46,9 +36,22 @@ PEERDIR(
ydb/library/yql/providers/pq/common
ydb/library/yql/providers/pq/expr_nodes
ydb/library/yql/providers/pq/proto
ydb/public/sdk/cpp/src/client/driver

yql/essentials/ast
yql/essentials/core
yql/essentials/core/type_ann
yql/essentials/core/dq_integration
yql/essentials/minikql
yql/essentials/minikql/comp_nodes
yql/essentials/providers/common/config
yql/essentials/providers/common/dq
yql/essentials/providers/common/proto
yql/essentials/providers/common/provider
yql/essentials/providers/common/structured_token
yql/essentials/providers/common/transform
yql/essentials/providers/result/expr_nodes
yql/essentials/public/udf
ydb/public/sdk/cpp/src/client/driver
)

YQL_LAST_ABI_VERSION()
Expand Down
103 changes: 61 additions & 42 deletions ydb/library/yql/providers/pq/provider/yql_pq_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,91 +122,110 @@ class TPqDataSourceProvider : public TDataProviderBase {
.Metadata().Add(sourceMetadata).Build()
.Done();

TExprNode::TPtr columns;
if (auto columnOrder = topicKeyParser.GetColumnOrder()) {
columns = std::move(columnOrder);
} else {
columns = Build<TCoVoid>(ctx, read.Pos()).Done().Ptr();
}

auto format = topicKeyParser.GetFormat();
if (format.empty()) {
format = "raw";
}

auto settings = Build<TExprList>(ctx, read.Pos());
auto settings = Build<TCoNameValueTupleList>(ctx, read.Pos());

bool hasDateTimeFormat = false;
bool hasDateTimeFormatName = false;
bool hasTimestampFormat = false;
bool hasTimestampFormatName = false;
if (topicKeyParser.GetDateTimeFormatName()) {
settings.Add(topicKeyParser.GetDateTimeFormatName());
hasDateTimeFormatName = true;
if (!NCommon::ValidateDateTimeFormatName(topicKeyParser.GetDateTimeFormatName()->Child(1)->Content(), ctx)) {
if (auto dateTimeFormatName = topicKeyParser.GetDateTimeFormatName()) {
if (!NCommon::ValidateDateTimeFormatName(dateTimeFormatName->Child(1)->Content(), ctx)) {
return nullptr;
}
settings.Add(std::move(dateTimeFormatName));
hasDateTimeFormatName = true;
}

if (topicKeyParser.GetDateTimeFormat()) {
settings.Add(topicKeyParser.GetDateTimeFormat());
if (auto dateTimeFormat = topicKeyParser.GetDateTimeFormat()) {
settings.Add(std::move(dateTimeFormat));
hasDateTimeFormat = true;
}

if (topicKeyParser.GetTimestampFormatName()) {
settings.Add(topicKeyParser.GetTimestampFormatName());
hasTimestampFormatName = true;
if (!NCommon::ValidateTimestampFormatName(topicKeyParser.GetTimestampFormatName()->Child(1)->Content(), ctx)) {
if (hasDateTimeFormat && hasDateTimeFormatName) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.datetime.format_name and data.datetime.format together"));
return nullptr;
}

if (!hasDateTimeFormat && !hasDateTimeFormatName) {
settings.Add<TExprList>()
.Add<TCoAtom>().Build("data.datetime.formatname")
.Add<TCoAtom>().Build("POSIX")
.Build();
}

bool hasTimestampFormat = false;
bool hasTimestampFormatName = false;
if (auto timestampFormatName = topicKeyParser.GetTimestampFormatName()) {
if (!NCommon::ValidateTimestampFormatName(timestampFormatName->Child(1)->Content(), ctx)) {
return nullptr;
}
settings.Add(std::move(timestampFormatName));
hasTimestampFormatName = true;
}

if (topicKeyParser.GetTimestampFormat()) {
settings.Add(topicKeyParser.GetTimestampFormat());
if (auto timestampFormat = topicKeyParser.GetTimestampFormat()) {
settings.Add(std::move(timestampFormat));
hasTimestampFormat = true;
}

if (hasDateTimeFormat && hasDateTimeFormatName) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.datetime.format_name and data.datetime.format together"));
return nullptr;
}

if (hasTimestampFormat && hasTimestampFormatName) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), "Don't use data.timestamp.format_name and data.timestamp.format together"));
return nullptr;
}

if (!hasDateTimeFormat && !hasDateTimeFormatName) {
TExprNode::TListType pair;
pair.push_back(ctx.NewAtom(read.Pos(), "data.datetime.formatname"));
pair.push_back(ctx.NewAtom(read.Pos(), "POSIX"));
settings.Add(ctx.NewList(read.Pos(), std::move(pair)));
if (!hasTimestampFormat && !hasTimestampFormatName) {
settings.Add<TExprList>()
.Add<TCoAtom>().Build("data.timestamp.formatname")
.Add<TCoAtom>().Build("POSIX")
.Build();
}

if (!hasTimestampFormat && !hasTimestampFormatName) {
TExprNode::TListType pair;
pair.push_back(ctx.NewAtom(read.Pos(), "data.timestamp.formatname"));
pair.push_back(ctx.NewAtom(read.Pos(), "POSIX"));
settings.Add(ctx.NewList(read.Pos(), std::move(pair)));
if (auto dateFormat = topicKeyParser.GetDateFormat()) {
settings.Add(std::move(dateFormat));
}

if (topicKeyParser.GetDateFormat()) {
settings.Add(topicKeyParser.GetDateFormat());
if (auto watermarkAdjustLateEvents = topicKeyParser.GetWatermarkAdjustLateEvents()) {
settings.Add(std::move(watermarkAdjustLateEvents));
}

if (topicKeyParser.GetSkipJsonErrors()) {
settings.Add(topicKeyParser.GetSkipJsonErrors());
if (auto watermarkDropLateEvents = topicKeyParser.GetWatermarkDropLateEvents()) {
settings.Add(std::move(watermarkDropLateEvents));
}

if (auto watermarkGranularity = topicKeyParser.GetWatermarkGranularity()) {
settings.Add(std::move(watermarkGranularity));
}

if (auto watermarkIdleTimeout = topicKeyParser.GetWatermarkIdleTimeout()) {
settings.Add(std::move(watermarkIdleTimeout));
}

if (auto skipJsonErrors = topicKeyParser.GetSkipJsonErrors()) {
settings.Add(std::move(skipJsonErrors));
}

auto builder = Build<TPqReadTopic>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
.Topic(std::move(topicNode))
.Columns(std::move(columns))
.Format().Value(format).Build()
.Compression().Value(topicKeyParser.GetCompression()).Build()
.LimitHint<TCoVoid>().Build()
.Settings(settings.Done());

if (topicKeyParser.GetColumnOrder()) {
builder.Columns(topicKeyParser.GetColumnOrder());
} else {
builder.Columns<TCoVoid>().Build();
}

if (topicKeyParser.GetWatermark()) {
builder.Watermark(topicKeyParser.GetWatermark());
if (auto watermark = topicKeyParser.GetWatermark()) {
builder.Watermark(std::move(watermark));
}

return Build<TCoRight>(ctx, read.Pos())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
const auto columns = input->Child(TPqReadTopic::idx_Columns);
const auto format = input->Child(TPqReadTopic::idx_Format);
const auto compression = input->Child(TPqReadTopic::idx_Compression);
const auto settings = input->Child(TPqReadTopic::idx_Settings);

if (!EnsureWorldType(*world, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -214,6 +215,20 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

if (!EnsureTuple(*settings, ctx)) {
return TStatus::Error;
}

for (const auto& setting : settings->Children()) {
if (!EnsureTupleMinSize(*setting, 1, ctx)) {
return TStatus::Error;
}

if (!EnsureAtom(setting->Head(), ctx)) {
return TStatus::Error;
}
}

if (TPqReadTopic::idx_Watermark < input->ChildrenSize()) {
auto& watermark = input->ChildRef(TPqReadTopic::idx_Watermark);
const auto status = ConvertToLambda(watermark, ctx, 1, 1);
Expand Down
Loading
Loading