Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ The number of columns in inserted MsgPack data. Used for automatic schema infere
)", 0) \
DECLARE(MsgPackUUIDRepresentation, output_format_msgpack_uuid_representation, FormatSettings::MsgPackUUIDRepresentation::EXT, R"(
The way how to output UUID in MsgPack format.
)", 0) \
DECLARE(HashEnumFunction, output_format_hash_function, FormatSettings::HashEnumFunction::SIP_HASH_128, R"(
Hash function for Hash output format. Supported: sipHash128 (default), sipHash64, cityHash128, cityHash64, murmurHash3_128, murmurHash3_64, xxHash64
)", 0) \
DECLARE(UInt64, input_format_max_rows_to_read_for_schema_inference, 25000, R"(
The maximum rows of data to read for automatic schema inference.
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class WriteBuffer;
M(CLASS_NAME, MaxThreads) \
M(CLASS_NAME, Milliseconds) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, MySQLDataTypesSupport) \
M(CLASS_NAME, NonZeroUInt64) \
M(CLASS_NAME, ORCCompression) \
Expand Down
9 changes: 9 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ IMPLEMENT_SETTING_ENUM(MsgPackUUIDRepresentation, ErrorCodes::BAD_ARGUMENTS,
{"str", FormatSettings::MsgPackUUIDRepresentation::STR},
{"ext", FormatSettings::MsgPackUUIDRepresentation::EXT}})

IMPLEMENT_SETTING_ENUM(HashEnumFunction, ErrorCodes::BAD_ARGUMENTS,
{{"sipHash128", FormatSettings::HashEnumFunction::SIP_HASH_128},
{"sipHash64", FormatSettings::HashEnumFunction::SIP_HASH_64},
{"cityHash128", FormatSettings::HashEnumFunction::CITY_HASH_128},
{"cityHash64", FormatSettings::HashEnumFunction::CITY_HASH_64},
{"murmurHash3_128", FormatSettings::HashEnumFunction::MURMUR_HASH3_128},
{"murmurHash3_64", FormatSettings::HashEnumFunction::MURMUR_HASH3_64},
{"xxHash64", FormatSettings::HashEnumFunction::XX_HASH_64}})

IMPLEMENT_SETTING_ENUM(Dialect, ErrorCodes::BAD_ARGUMENTS,
{{"clickhouse", Dialect::clickhouse},
{"kusto", Dialect::kusto},
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)

DECLARE_SETTING_ENUM_WITH_RENAME(MsgPackUUIDRepresentation, FormatSettings::MsgPackUUIDRepresentation)

DECLARE_SETTING_ENUM_WITH_RENAME(HashEnumFunction, FormatSettings::HashEnumFunction)

DECLARE_SETTING_ENUM_WITH_RENAME(ParquetCompression, FormatSettings::ParquetCompression)

DECLARE_SETTING_ENUM_WITH_RENAME(ArrowCompression, FormatSettings::ArrowCompression)
Expand Down
1 change: 1 addition & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.native.decode_types_in_binary_format = settings[Setting::input_format_native_decode_types_in_binary_format];
format_settings.native.write_json_as_string = settings[Setting::output_format_native_write_json_as_string];
format_settings.native.use_flattened_dynamic_and_json_serialization = settings[Setting::output_format_native_use_flattened_dynamic_and_json_serialization];
format_settings.hash.function = settings[Setting::output_format_hash_function];
format_settings.max_parser_depth = settings[Setting::max_parser_depth];
format_settings.date_time_overflow_behavior = settings[Setting::date_time_overflow_behavior];
format_settings.try_infer_variant = settings[Setting::input_format_try_infer_variants];
Expand Down
16 changes: 16 additions & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,22 @@ struct FormatSettings
{
bool escape_special_characters = false;
} markdown{};

enum class HashEnumFunction : uint8_t
{
SIP_HASH_128,
SIP_HASH_64,
CITY_HASH_128,
CITY_HASH_64,
MURMUR_HASH3_128,
MURMUR_HASH3_64,
XX_HASH_64,
};

struct
{
HashEnumFunction function = HashEnumFunction::SIP_HASH_128;
} hash{};
};

}
2 changes: 2 additions & 0 deletions src/Formats/registerFormats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ void registerOutputFormatMarkdown(FormatFactory & factory);
void registerOutputFormatPostgreSQLWire(FormatFactory & factory);
void registerOutputFormatPrometheus(FormatFactory & factory);
void registerOutputFormatSQLInsert(FormatFactory & factory);
void registerOutputFormatHash(FormatFactory & factory);

/// Input only formats.

Expand Down Expand Up @@ -240,6 +241,7 @@ void registerFormats()
registerOutputFormatCapnProto(factory);
registerOutputFormatPrometheus(factory);
registerOutputFormatSQLInsert(factory);
registerOutputFormatHash(factory);

registerInputFormatRegexp(factory);
registerInputFormatJSONAsString(factory);
Expand Down
118 changes: 118 additions & 0 deletions src/Processors/Formats/Impl/HashOutputFormat.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#include <Processors/Formats/Impl/HashOutputFormat.h>
#include <Columns/IColumn.h>
#include <Formats/FormatFactory.h>
#include <Processors/Port.h>

#include <city.h>
#include <MurmurHash3.h>
#include <xxhash.h>

namespace DB
{

HashOutputFormat::HashOutputFormat(WriteBuffer & out_, SharedHeader header_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), format_settings(format_settings_)
{
}

HashOutputFormat::~HashOutputFormat()
{
buf.cancel();
}

String HashOutputFormat::getName() const
{
return "HashOutputFormat";
}

void HashOutputFormat::consume(Chunk chunk)
{
// TODO - hide arena declarion inside of non-SIP scope
Arena arena;
const char * begin = nullptr;

for (const auto & column : chunk.getColumns())
{
for (size_t i = 0; i < column->size(); ++i)

switch (format_settings.hash.function) {

case FormatSettings::HashEnumFunction::SIP_HASH_64:
case FormatSettings::HashEnumFunction::SIP_HASH_128:
column->updateHashWithValue(i, sip);
break;

default:
{
StringRef ref = column->serializeValueIntoArena(i, arena, begin);
buf.write(ref.data, ref.size);
}
}
}
}

void HashOutputFormat::finalizeImpl()
{
std::string hashString;
switch (format_settings.hash.function) {

case FormatSettings::HashEnumFunction::SIP_HASH_128:
hashString = getSipHash128AsHexString(sip);
break;

case FormatSettings::HashEnumFunction::SIP_HASH_64:
hashString = getHexUIntLowercase(sip.get64());
break;

case FormatSettings::HashEnumFunction::CITY_HASH_128:
{
CityHash_v1_0_2::uint128 hash = CityHash_v1_0_2::CityHash128(buf.buffer().begin(), buf.buffer().size());
hashString += getHexUIntLowercase(hash.high64);
hashString += getHexUIntLowercase(hash.low64);
break;
}

case FormatSettings::HashEnumFunction::CITY_HASH_64:
hashString = getHexUIntLowercase(CityHash_v1_0_2::CityHash64(buf.buffer().begin(), buf.buffer().size()));
break;

case FormatSettings::HashEnumFunction::MURMUR_HASH3_128:
{
HashState bytes;
MurmurHash3_x64_128(buf.buffer().begin(), buf.buffer().size(), 0, bytes);
hashString += getHexUIntLowercase(bytes.h1);
hashString += getHexUIntLowercase(bytes.h2);
break;
}

case FormatSettings::HashEnumFunction::MURMUR_HASH3_64:
{
HashState bytes;
MurmurHash3_x64_128(buf.buffer().begin(), buf.buffer().size(), 0, bytes);
hashString = getHexUIntLowercase(bytes.h1 ^ bytes.h2);
break;
}

case FormatSettings::HashEnumFunction::XX_HASH_64:
{
UInt64 hash = XXH64(buf.buffer().begin(), buf.buffer().size(), 0);
hashString = getHexUIntLowercase(hash);
break;
}
}

out.write(hashString.data(), hashString.size());
out.write("\n", 1);
out.next();
}

void registerOutputFormatHash(FormatFactory & factory)
{
factory.registerOutputFormat("Hash",
[](WriteBuffer & buf, const Block & header, const FormatSettings & format_settings)
{
return std::make_shared<HashOutputFormat>(buf, std::make_shared<const Block>(header), format_settings);
});
}

}
32 changes: 32 additions & 0 deletions src/Processors/Formats/Impl/HashOutputFormat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <Common/SipHash.h>
#include <Formats/FormatSettings.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IOutputFormat.h>

namespace DB
{

class HashOutputFormat final : public IOutputFormat
{
public:
HashOutputFormat(WriteBuffer & out_, SharedHeader header_, const FormatSettings & format_settings_);
~HashOutputFormat() override;

String getName() const override;

private:
void consume(Chunk chunk) override;
void finalizeImpl() override;

// TODO - check whether global format settings object can mutate during lifecyle, is it safe to keep a reference instead of copy?
const FormatSettings & format_settings;

// TODO - make as union, as we don't need them all at once
SipHash sip;
BufferWithOwnMemory<WriteBuffer> buf;
};

}
1 change: 1 addition & 0 deletions src/Storages/FileLog/FileLogSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct FileLogSettingsImpl;
M(CLASS_NAME, MaxThreads) \
M(CLASS_NAME, Milliseconds) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
M(CLASS_NAME, ParquetVersion) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/Hive/HiveSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct HiveSettingsImpl;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
M(CLASS_NAME, ParquetVersion) \
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/Kafka/KafkaSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const auto KAFKA_MAX_THREAD_WORK_DURATION_MS = 60000;
// 10min
const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;

/// List of available types supported in RabbitMQSettings object
/// List of available types supported in KafkaSettings object
#define KAFKA_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
M(CLASS_NAME, ArrowCompression) \
M(CLASS_NAME, Bool) \
Expand All @@ -35,6 +35,7 @@ const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, Milliseconds) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
M(CLASS_NAME, ParquetVersion) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/NATS/NATSSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct NATSSettingsImpl;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, Milliseconds) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SettingsChanges;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
M(CLASS_NAME, ParquetVersion) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class SettingsChanges;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ObjectStorageQueueAction) \
M(CLASS_NAME, ObjectStorageQueueMode) \
M(CLASS_NAME, ORCCompression) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct StorageID;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ObjectStorageQueueAction) \
M(CLASS_NAME, ObjectStorageQueueMode) \
M(CLASS_NAME, ORCCompression) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/RabbitMQ/RabbitMQSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct RabbitMQSettingsImpl;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
M(CLASS_NAME, ParquetVersion) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/SetSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct SetSettingsImpl;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, HashEnumFunction) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
M(CLASS_NAME, ParquetVersion) \
Expand Down
10 changes: 10 additions & 0 deletions tests/queries/0_stateless/03577_hash_format.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
75b419a3aa739a211291e7cc119bd3c9
d3b90098d049660862d6dc53ac7505e5
92d47ccfb2950a8e10ac9ddf4314f1bf
92d47ccfb2950a8e10ac9ddf4314f1bf
31fb81f110e17882
2a9f9daff6924303859b32e3c35adbeb
5901aa2f330b940a
27f838a79558f7139d53e0496ef2b454
baabd8eefbaa4347
c71d18d5553075ef
24 changes: 24 additions & 0 deletions tests/queries/0_stateless/03577_hash_format.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
SELECT number FROM system.numbers LIMIT 1 FORMAT Hash;
SELECT number FROM system.numbers LIMIT 20 FORMAT Hash;
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;

SET output_format_hash_function = 'sipHash128';
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;

SET output_format_hash_function = 'sipHash64';
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;

SET output_format_hash_function = 'cityHash128';
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;

SET output_format_hash_function = 'cityHash64';
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;

SET output_format_hash_function = 'murmurHash3_128';
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;

SET output_format_hash_function = 'murmurHash3_64';
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;

SET output_format_hash_function = 'xxHash64';
SELECT number AS hello, toString(number) AS world, (hello, world) AS tuple, nullIf(hello % 3, 0) AS sometimes_nulls FROM system.numbers LIMIT 20 FORMAT Hash;