Skip to content

Add Apache Iceberg Streaming Writes and Batch Reads (MVP) #928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ cmake .. \
-DENABLE_DEBUG_FUNCS=ON \
-DENABLE_URL_FUNCS=ON \
-DENABLE_AVRO=ON \
-DENABLE_AWS_S3=ON \
-DENABLE_AWS_MSK_IAM=ON \
-DENABLE_CURL=${is_not_darwin} \
-DENABLE_PULSAR=${is_not_darwin}
Expand Down
2 changes: 1 addition & 1 deletion contrib/aws-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ set(ENABLE_AWS_S3_DEFAULT OFF)
set(ENABLE_AWS_MSK_IAM_DEFAULT OFF) # proton: added

if(ENABLE_LIBRARIES AND (OS_LINUX OR OS_DARWIN) AND TARGET OpenSSL::Crypto)
set(ENABLE_AWS_S3_DEFAULT OFF) # proton: added https://github.com/timeplus-io/proton/issues/918
set(ENABLE_AWS_S3_DEFAULT ON)
set(ENABLE_AWS_MSK_IAM_DEFAULT ON) # proton: added
endif()

Expand Down
2 changes: 1 addition & 1 deletion docker/packager/packager
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def parse_env_variables(build_type, compiler, sanitizer, package_type, image_typ
cmake_flags.append('-DENABLE_CYRUS_SASL=ON')
cmake_flags.append('-DENABLE_KRB5=ON')
cmake_flags.append('-DENABLE_BROTLI=ON')
cmake_flags.append('-DENABLE_S3=ON')
cmake_flags.append('-DENABLE_AWS_S3=ON')
cmake_flags.append('-DENABLE_AVRO=ON')
cmake_flags.append('-DENABLE_AWS_MSK_IAM=ON')

Expand Down
6 changes: 6 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_dele

add_headers_and_sources(dbms Disks/IO)
add_headers_and_sources(dbms Disks/ObjectStorages)

add_headers_and_sources(dbms Storages/NamedCollections)

if (USE_LIBPQXX)
add_headers_and_sources(dbms Core/PostgreSQL)
endif()
Expand All @@ -97,6 +100,8 @@ if (TARGET ch_contrib::azure_sdk)
add_headers_and_sources(dbms Disks/ObjectStorages/AzureBlobStorage)
endif()

add_headers_and_sources(dbms Databases/ApacheIceberg)

add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
add_headers_and_sources(dbms Disks/ObjectStorages/Web)

Expand Down Expand Up @@ -208,6 +213,7 @@ add_object_library(clickhouse_server_http Server/HTTP)
add_object_library(clickhouse_formats Formats)
add_object_library(clickhouse_processors Processors)
# proton : starts
add_object_library(clickhouse_storages_iceberg Storages/Iceberg)
add_object_library(clickhouse_processors_streaming Processors/Streaming)

add_object_library(clickhouse_formats_avro Formats/Avro)
Expand Down
1 change: 1 addition & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@
M(676, CANNOT_PARSE_IPV6) \
M(677, THREAD_WAS_CANCELED) \
M(678, IO_URING_INIT_FAILED) \
M(736, ICEBERG_CATALOG_ERROR) \
M(679, IO_URING_SUBMIT_ERROR) \
M(997, UNSUPPORTED) \
M(998, INVALID_INTEGER_STRING) \
Expand Down
11 changes: 11 additions & 0 deletions src/Common/Priority.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include <base/types.h>

/// Common type for priority values.
/// Separate type (rather than `Int64` is used just to avoid implicit conversion errors and to default-initialize
struct Priority
{
Int64 value = 0; /// Note that lower value means higher priority.
constexpr operator Int64() const { return value; } /// NOLINT
};
26 changes: 26 additions & 0 deletions src/Common/SettingsChanges.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,30 @@ Field * SettingsChanges::tryGet(std::string_view name)
return &change->value;
}

bool SettingsChanges::insertSetting(std::string_view name, const Field & value)
{
auto it = std::find_if(begin(), end(), [&name](const SettingChange & change) { return change.name == name; });
if (it != end())
return false;
emplace_back(name, value);
return true;
}

void SettingsChanges::setSetting(std::string_view name, const Field & value)
{
if (auto * setting_value = tryGet(name))
*setting_value = value;
else
insertSetting(name, value);
}

bool SettingsChanges::removeSetting(std::string_view name)
{
auto it = std::find_if(begin(), end(), [&name](const SettingChange & change) { return change.name == name; });
if (it == end())
return false;
erase(it);
return true;
}

}
7 changes: 7 additions & 0 deletions src/Common/SettingsChanges.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ class SettingsChanges : public std::vector<SettingChange>
bool tryGet(std::string_view name, Field & out_value) const;
const Field * tryGet(std::string_view name) const;
Field * tryGet(std::string_view name);

/// Inserts element if doesn't exists and returns true, otherwise just returns false
bool insertSetting(std::string_view name, const Field & value);
/// Sets element to value, inserts if doesn't exist
void setSetting(std::string_view name, const Field & value);
/// If element exists - removes it and returns true, otherwise returns false
bool removeSetting(std::string_view name);
};

}
7 changes: 7 additions & 0 deletions src/Core/BaseSettingsFwdMacros.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#pragma once

#define DECLARE_SETTING_TRAIT(CLASS_NAME, TYPE) using CLASS_NAME##TYPE = SettingField##TYPE CLASS_NAME##Impl::*;

#define DECLARE_SETTING_SUBSCRIPT_OPERATOR(CLASS_NAME, TYPE) \
const SettingField##TYPE & operator[](CLASS_NAME##TYPE t) const; \
SettingField##TYPE & operator[](CLASS_NAME##TYPE t);
11 changes: 11 additions & 0 deletions src/Core/BaseSettingsFwdMacrosImpl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#define IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR(CLASS_NAME, TYPE) \
const SettingField##TYPE & CLASS_NAME::operator[](CLASS_NAME##TYPE t) const \
{ \
return impl.get()->*t; \
} \
SettingField##TYPE & CLASS_NAME::operator[](CLASS_NAME##TYPE t) \
{ \
return impl.get()->*t; \
}
3 changes: 3 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,7 @@ IMPLEMENT_SETTING_ENUM(EscapingRule, ErrorCodes::BAD_ARGUMENTS,
{"JSON", FormatSettings::EscapingRule::JSON},
{"XML", FormatSettings::EscapingRule::XML},
{"Raw", FormatSettings::EscapingRule::Raw}})

IMPLEMENT_SETTING_ENUM(IcebergCatalogType, ErrorCodes::BAD_ARGUMENTS,
{{"rest", IcebergCatalogType::REST}})
}
7 changes: 7 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,11 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparin

DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)

enum class IcebergCatalogType : uint8_t
{
REST,
};

DECLARE_SETTING_ENUM(IcebergCatalogType)

}
23 changes: 23 additions & 0 deletions src/Core/UUID.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,29 @@ namespace UUIDHelpers
/// Generate random UUID.
UUID generateV4();

constexpr size_t HighBytes = (std::endian::native == std::endian::little) ? 0 : 1;
constexpr size_t LowBytes = (std::endian::native == std::endian::little) ? 1 : 0;

inline uint64_t getHighBytes(const UUID & uuid)
{
return uuid.toUnderType().items[HighBytes];
}

inline uint64_t & getHighBytes(UUID & uuid)
{
return uuid.toUnderType().items[HighBytes];
}

inline uint64_t getLowBytes(const UUID & uuid)
{
return uuid.toUnderType().items[LowBytes];
}

inline uint64_t & getLowBytes(UUID & uuid)
{
return uuid.toUnderType().items[LowBytes];
}

const UUID Nil{};
}

Expand Down
16 changes: 16 additions & 0 deletions src/Databases/ApacheIceberg/ApacheIcebergStorageType.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <Core/Types.h>

namespace DB
{

enum class ApacheIcebergStorageType : uint8_t
{
S3,
Azure,
Local,
HDFS,
};

}
Loading
Loading