From 0a1ff3283e50db9bde8cfe62a8a99166bd04504d Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 13 Oct 2023 16:59:05 +0800 Subject: [PATCH] vnode/cos: first round size delte and get --- cmake/curl_CMakeLists.txt.in | 3 +- cmake/libs3_CMakeLists.txt.in | 1 + contrib/CMakeLists.txt | 14 +- source/common/src/tglobal.c | 16 +- source/dnode/vnode/CMakeLists.txt | 29 +++- source/dnode/vnode/src/vnd/vnodeCos.c | 205 +++++++++++++++++++++++++- 6 files changed, 251 insertions(+), 17 deletions(-) diff --git a/cmake/curl_CMakeLists.txt.in b/cmake/curl_CMakeLists.txt.in index 1f2291c519..24a1210422 100644 --- a/cmake/curl_CMakeLists.txt.in +++ b/cmake/curl_CMakeLists.txt.in @@ -10,7 +10,8 @@ ExternalProject_Add(curl BUILD_IN_SOURCE TRUE BUILD_ALWAYS 1 #UPDATE_COMMAND "" - CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --without-ssl --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd + #CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --without-ssl --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd + CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --with-ssl=$ENV{HOME}/.cos-local.1 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd #CONFIGURE_COMMAND ./configure --without-ssl BUILD_COMMAND make INSTALL_COMMAND make install diff --git a/cmake/libs3_CMakeLists.txt.in b/cmake/libs3_CMakeLists.txt.in index b77299baf3..b54a084f66 100644 --- a/cmake/libs3_CMakeLists.txt.in +++ b/cmake/libs3_CMakeLists.txt.in @@ -13,6 +13,7 @@ endfunction(update_cflags) ExternalProject_Add(libs3 GIT_REPOSITORY https://github.com/bji/libs3 #GIT_TAG v5.0.16 + DEPENDS curl SOURCE_DIR "${TD_CONTRIB_DIR}/libs3" #BINARY_DIR "" BUILD_IN_SOURCE TRUE diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 39bf85df8f..1588eb4025 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -13,10 +13,8 @@ if(${TD_LINUX}) set(CONTRIB_TMP_FILE3 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in3") configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) -if(${BUILD_WITH_COS}) file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/) cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3}) -endif(${BUILD_WITH_COS}) configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . @@ -27,9 +25,7 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build . set(CONTRIB_TMP_FILE2 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in2") configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2}) -if(${BUILD_WITH_COS}) cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2}) -endif(${BUILD_WITH_COS}) configure_file(${CONTRIB_TMP_FILE2} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . @@ -129,6 +125,9 @@ if(${BUILD_TEST}) cat("${TD_SUPPORT_DIR}/stub_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_TEST}) +# xml2 +cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + # lz4 cat("${TD_SUPPORT_DIR}/lz4_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -195,6 +194,7 @@ if(${BUILD_WITH_S3}) #cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) #cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) #INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include) + cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2}) cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) add_definitions(-DUSE_S3) @@ -295,6 +295,9 @@ target_include_directories( ) unset(CMAKE_PROJECT_INCLUDE_BEFORE) +# xml2 +add_subdirectory(xml2 EXCLUDE_FROM_ALL) + # lz4 add_subdirectory(lz4/build/cmake EXCLUDE_FROM_ALL) target_include_directories( @@ -439,6 +442,7 @@ endif() if(${BUILD_WITH_S3}) INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include) + MESSAGE("build with s3: ${BUILD_WITH_S3}") else() @@ -449,7 +453,7 @@ if(${BUILD_WITH_COS}) #ADD_DEFINITIONS(-DMINIXML_LIBRARY=${CMAKE_BINARY_DIR}/build/lib/libxml.a) option(ENABLE_TEST "Enable the tests" OFF) INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include) - MESSAGE("$ENV{HOME}/.cos-local.1/include") + #MESSAGE("$ENV{HOME}/.cos-local.1/include") set(CMAKE_BUILD_TYPE debug) set(ORIG_CMAKE_PROJECT_NAME ${CMAKE_PROJECT_NAME}) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 92b5ff2828..8ea5528fbb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -15,12 +15,12 @@ #define _DEFAULT_SOURCE #include "tglobal.h" +#include "defines.h" #include "os.h" #include "tconfig.h" #include "tgrant.h" #include "tlog.h" #include "tmisce.h" -#include "defines.h" #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #include "cus_name.h" @@ -221,7 +221,7 @@ float tsFPrecision = 1E-8; // float column precision double tsDPrecision = 1E-16; // double column precision uint32_t tsMaxRange = 500; // max quantization intervals uint32_t tsCurRange = 100; // current quantization intervals -bool tsIfAdtFse = false; // ADT-FSE algorithom or original huffman algorithom +bool tsIfAdtFse = false; // ADT-FSE algorithom or original huffman algorithom char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR // udf @@ -266,6 +266,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int8_t tsS3Https = true; +char tsS3Hostname[TSDB_FQDN_LEN] = ""; + int32_t tsS3BlockSize = 4096; // number of tsdb pages int32_t tsS3BlockCacheSize = 16; // number of blocks @@ -307,6 +310,14 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN); tstrncpy(tsS3Endpoint, cfgGetItem(pCfg, "s3Endpoint")->str, TSDB_FQDN_LEN); tstrncpy(tsS3BucketName, cfgGetItem(pCfg, "s3BucketName")->str, TSDB_FQDN_LEN); + char *proto = strstr(tsS3Endpoint, "https://"); + if (!proto) { + tsS3Https = false; + tstrncpy(tsS3Hostname, tsS3Endpoint + 7, TSDB_FQDN_LEN); + } else { + tstrncpy(tsS3Hostname, tsS3Endpoint + 8, TSDB_FQDN_LEN); + } + char *cos = strstr(tsS3Endpoint, "cos."); if (cos) { char *appid = strrchr(tsS3BucketName, '-'); @@ -1086,7 +1097,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsIfAdtFse = cfgGetItem(pCfg, "IfAdtFse")->bval; tstrncpy(tsCompressor, cfgGetItem(pCfg, "Compressor")->str, sizeof(tsCompressor)); - tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 516c8a8b69..e554d94e4d 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -162,9 +162,34 @@ target_link_libraries( ) if(${TD_LINUX}) - if(${BUILD_WITH_S3}) - endif(${BUILD_WITH_S3}) +if(${BUILD_WITH_S3}) + MESSAGE("build with s3: ${BUILD_WITH_S3}") + target_include_directories( + vnode + + PUBLIC "$ENV{HOME}/.cos-local.1/include" + ) + + set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") + set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.1) + find_library(S3_LIBRARY s3) + find_library(CURL_LIBRARY curl) + find_library(SSL_LIBRARY ssl PATHS $ENV{HOME}/.cos-local.1/lib64) + find_library(CRYPTO_LIBRARY crypto PATHS $ENV{HOME}/.cos-local.1/lib64) + target_link_libraries( + vnode + + # s3 + PUBLIC ${S3_LIBRARY} + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + PUBLIC xml2 + ) + + add_definitions(-DUSE_S3) +endif(${BUILD_WITH_S3}) if(${BUILD_WITH_COS}) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 5f650be97d..8984d33ad8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -2,13 +2,206 @@ #include "vndCos.h" -extern char tsS3Endpoint[]; -extern char tsS3AccessKeyId[]; -extern char tsS3AccessKeySecret[]; -extern char tsS3BucketName[]; -extern char tsS3AppId[]; +extern char tsS3Endpoint[]; +extern char tsS3AccessKeyId[]; +extern char tsS3AccessKeySecret[]; +extern char tsS3BucketName[]; +extern char tsS3AppId[]; +extern char tsS3Hostname[]; +extern int8_t tsS3Https; + +#if defined(USE_S3) + +#include "libs3.h" + +static int verifyPeerG = 0; +static const char *awsRegionG = NULL; +static int forceG = 0; +static int showResponsePropertiesG = 0; +static S3Protocol protocolG = S3ProtocolHTTPS; +// static S3Protocol protocolG = S3ProtocolHTTP; +static S3UriStyle uriStyleG = S3UriStylePath; +static int retriesG = 5; +static int timeoutMsG = 0; + +static int32_t s3Begin() { + S3Status status; + const char *hostname = tsS3Hostname; + const char *env_hn = getenv("S3_HOSTNAME"); + + if (env_hn) { + hostname = env_hn; + } + + if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) { + vError("Failed to initialize libs3: %s\n", S3_get_status_name(status)); + return -1; + } + + protocolG = !tsS3Https; + + return 0; +} + +static void s3End() { S3_deinitialize(); } +int32_t s3Init() { return s3Begin(); } + +void s3CleanUp() { s3End(); } + +static int should_retry() { + /* + if (retriesG--) { + // Sleep before next retry; start out with a 1 second sleep + static int retrySleepInterval = 1 * SLEEP_UNITS_PER_SECOND; + sleep(retrySleepInterval); + // Next sleep 1 second longer + retrySleepInterval++; + return 1; + } + */ + + return 0; +} + +typedef struct { + uint64_t content_length; + int status; + char *buf; + char err_msg[4096]; +} TS3SizeCBD; + +static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) { + //(void)callbackData; + TS3SizeCBD *cbd = callbackData; + if (properties->contentLength > 0) { + cbd->content_length = properties->contentLength; + } else { + cbd->content_length = 0; + } + + return S3StatusOK; +} + +static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) { + TS3SizeCBD *cbd = callbackData; + cbd->status = status; + + int len = 0; + const int elen = sizeof(cbd->err_msg); + if (error) { + if (error->message) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message); + } + if (error->resource) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource); + } + if (error->furtherDetails) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails); + } + if (error->extraDetailsCount) { + len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n"); + for (int i = 0; i < error->extraDetailsCount; i++) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, + error->extraDetails[i].value); + } + } + } +} + +int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; } +void s3DeleteObjectsByPrefix(const char *prefix) {} + +void s3DeleteObjects(const char *object_name[], int nobject) { + int status = 0; + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; + + for (int i = 0; i < nobject; ++i) { + TS3SizeCBD cbd = {0}; + do { + S3_delete_object(&bucketContext, object_name[i], 0, timeoutMsG, &responseHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { + vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + } + } +} + +static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { + TS3SizeCBD *cbd = callbackData; + if (cbd->content_length != bufferSize) { + cbd->status = S3StatusAbortedByCallback; + return S3StatusAbortedByCallback; + } + + char *buf = taosMemoryCalloc(1, bufferSize); + if (buf) { + memcpy(buf, buffer, bufferSize); + + cbd->status = S3StatusOK; + return S3StatusOK; + } else { + cbd->status = S3StatusAbortedByCallback; + return S3StatusAbortedByCallback; + } +} + +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { + int status = 0; + int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; + const char *ifMatch = 0, *ifNotMatch = 0; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; + S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, + &getObjectDataCallback}; + + TS3SizeCBD cbd = {0}; + cbd.content_length = size; + do { + S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if (cbd.status != S3StatusOK) { + vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + return TAOS_SYSTEM_ERROR(EIO); + } + + *ppBlock = cbd.buf; + + return 0; +} + +long s3Size(const char *object_name) { + long size = 0; + int status = 0; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + + S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback}; + + TS3SizeCBD cbd = {0}; + do { + S3_head_object(&bucketContext, object_name, 0, 0, &responseHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { + vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + } + + size = cbd.content_length; + + return size; +} + +void s3EvictCache(const char *path, long object_size) {} + +#elif defined(USE_COS) -#ifdef USE_COS #include "cos_api.h" #include "cos_http_io.h" #include "cos_log.h"