vnode/cos: first round size delte and get
This commit is contained in:
parent
8253874398
commit
0a1ff3283e
|
@ -10,7 +10,8 @@ ExternalProject_Add(curl
|
|||
BUILD_IN_SOURCE TRUE
|
||||
BUILD_ALWAYS 1
|
||||
#UPDATE_COMMAND ""
|
||||
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --without-ssl --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd
|
||||
#CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --without-ssl --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd
|
||||
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.1 --with-ssl=$ENV{HOME}/.cos-local.1 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd
|
||||
#CONFIGURE_COMMAND ./configure --without-ssl
|
||||
BUILD_COMMAND make
|
||||
INSTALL_COMMAND make install
|
||||
|
|
|
@ -13,6 +13,7 @@ endfunction(update_cflags)
|
|||
ExternalProject_Add(libs3
|
||||
GIT_REPOSITORY https://github.com/bji/libs3
|
||||
#GIT_TAG v5.0.16
|
||||
DEPENDS curl
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/libs3"
|
||||
#BINARY_DIR ""
|
||||
BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -13,10 +13,8 @@ if(${TD_LINUX})
|
|||
set(CONTRIB_TMP_FILE3 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in3")
|
||||
configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
|
||||
|
||||
if(${BUILD_WITH_COS})
|
||||
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/)
|
||||
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
|
||||
endif(${BUILD_WITH_COS})
|
||||
|
||||
configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
|
||||
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
|
||||
|
@ -27,9 +25,7 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build .
|
|||
set(CONTRIB_TMP_FILE2 "${CMAKE_BINARY_DIR}/deps_tmp_CMakeLists.txt.in2")
|
||||
configure_file("${TD_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2})
|
||||
|
||||
if(${BUILD_WITH_COS})
|
||||
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2})
|
||||
endif(${BUILD_WITH_COS})
|
||||
|
||||
configure_file(${CONTRIB_TMP_FILE2} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
|
||||
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
|
||||
|
@ -129,6 +125,9 @@ if(${BUILD_TEST})
|
|||
cat("${TD_SUPPORT_DIR}/stub_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif(${BUILD_TEST})
|
||||
|
||||
# xml2
|
||||
cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
||||
# lz4
|
||||
cat("${TD_SUPPORT_DIR}/lz4_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
||||
|
@ -195,6 +194,7 @@ if(${BUILD_WITH_S3})
|
|||
#cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
#cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
#INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include)
|
||||
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE2})
|
||||
cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
add_definitions(-DUSE_S3)
|
||||
|
||||
|
@ -295,6 +295,9 @@ target_include_directories(
|
|||
)
|
||||
unset(CMAKE_PROJECT_INCLUDE_BEFORE)
|
||||
|
||||
# xml2
|
||||
add_subdirectory(xml2 EXCLUDE_FROM_ALL)
|
||||
|
||||
# lz4
|
||||
add_subdirectory(lz4/build/cmake EXCLUDE_FROM_ALL)
|
||||
target_include_directories(
|
||||
|
@ -439,6 +442,7 @@ endif()
|
|||
|
||||
if(${BUILD_WITH_S3})
|
||||
INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include)
|
||||
MESSAGE("build with s3: ${BUILD_WITH_S3}")
|
||||
|
||||
else()
|
||||
|
||||
|
@ -449,7 +453,7 @@ if(${BUILD_WITH_COS})
|
|||
#ADD_DEFINITIONS(-DMINIXML_LIBRARY=${CMAKE_BINARY_DIR}/build/lib/libxml.a)
|
||||
option(ENABLE_TEST "Enable the tests" OFF)
|
||||
INCLUDE_DIRECTORIES($ENV{HOME}/.cos-local.1/include)
|
||||
MESSAGE("$ENV{HOME}/.cos-local.1/include")
|
||||
#MESSAGE("$ENV{HOME}/.cos-local.1/include")
|
||||
|
||||
set(CMAKE_BUILD_TYPE debug)
|
||||
set(ORIG_CMAKE_PROJECT_NAME ${CMAKE_PROJECT_NAME})
|
||||
|
|
|
@ -15,12 +15,12 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tglobal.h"
|
||||
#include "defines.h"
|
||||
#include "os.h"
|
||||
#include "tconfig.h"
|
||||
#include "tgrant.h"
|
||||
#include "tlog.h"
|
||||
#include "tmisce.h"
|
||||
#include "defines.h"
|
||||
|
||||
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
|
||||
#include "cus_name.h"
|
||||
|
@ -266,6 +266,9 @@ char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
|
|||
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
|
||||
int8_t tsS3Enabled = false;
|
||||
|
||||
int8_t tsS3Https = true;
|
||||
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
|
||||
|
||||
int32_t tsS3BlockSize = 4096; // number of tsdb pages
|
||||
int32_t tsS3BlockCacheSize = 16; // number of blocks
|
||||
|
||||
|
@ -307,6 +310,14 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
|
|||
tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN);
|
||||
tstrncpy(tsS3Endpoint, cfgGetItem(pCfg, "s3Endpoint")->str, TSDB_FQDN_LEN);
|
||||
tstrncpy(tsS3BucketName, cfgGetItem(pCfg, "s3BucketName")->str, TSDB_FQDN_LEN);
|
||||
char *proto = strstr(tsS3Endpoint, "https://");
|
||||
if (!proto) {
|
||||
tsS3Https = false;
|
||||
tstrncpy(tsS3Hostname, tsS3Endpoint + 7, TSDB_FQDN_LEN);
|
||||
} else {
|
||||
tstrncpy(tsS3Hostname, tsS3Endpoint + 8, TSDB_FQDN_LEN);
|
||||
}
|
||||
|
||||
char *cos = strstr(tsS3Endpoint, "cos.");
|
||||
if (cos) {
|
||||
char *appid = strrchr(tsS3BucketName, '-');
|
||||
|
@ -1086,7 +1097,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsIfAdtFse = cfgGetItem(pCfg, "IfAdtFse")->bval;
|
||||
tstrncpy(tsCompressor, cfgGetItem(pCfg, "Compressor")->str, sizeof(tsCompressor));
|
||||
|
||||
|
||||
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
|
||||
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
|
||||
|
||||
|
|
|
@ -162,9 +162,34 @@ target_link_libraries(
|
|||
)
|
||||
|
||||
if(${TD_LINUX})
|
||||
if(${BUILD_WITH_S3})
|
||||
|
||||
endif(${BUILD_WITH_S3})
|
||||
if(${BUILD_WITH_S3})
|
||||
MESSAGE("build with s3: ${BUILD_WITH_S3}")
|
||||
target_include_directories(
|
||||
vnode
|
||||
|
||||
PUBLIC "$ENV{HOME}/.cos-local.1/include"
|
||||
)
|
||||
|
||||
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
|
||||
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.1)
|
||||
find_library(S3_LIBRARY s3)
|
||||
find_library(CURL_LIBRARY curl)
|
||||
find_library(SSL_LIBRARY ssl PATHS $ENV{HOME}/.cos-local.1/lib64)
|
||||
find_library(CRYPTO_LIBRARY crypto PATHS $ENV{HOME}/.cos-local.1/lib64)
|
||||
target_link_libraries(
|
||||
vnode
|
||||
|
||||
# s3
|
||||
PUBLIC ${S3_LIBRARY}
|
||||
PUBLIC ${CURL_LIBRARY}
|
||||
PUBLIC ${SSL_LIBRARY}
|
||||
PUBLIC ${CRYPTO_LIBRARY}
|
||||
PUBLIC xml2
|
||||
)
|
||||
|
||||
add_definitions(-DUSE_S3)
|
||||
endif(${BUILD_WITH_S3})
|
||||
|
||||
if(${BUILD_WITH_COS})
|
||||
|
||||
|
|
|
@ -7,8 +7,201 @@ extern char tsS3AccessKeyId[];
|
|||
extern char tsS3AccessKeySecret[];
|
||||
extern char tsS3BucketName[];
|
||||
extern char tsS3AppId[];
|
||||
extern char tsS3Hostname[];
|
||||
extern int8_t tsS3Https;
|
||||
|
||||
#if defined(USE_S3)
|
||||
|
||||
#include "libs3.h"
|
||||
|
||||
static int verifyPeerG = 0;
|
||||
static const char *awsRegionG = NULL;
|
||||
static int forceG = 0;
|
||||
static int showResponsePropertiesG = 0;
|
||||
static S3Protocol protocolG = S3ProtocolHTTPS;
|
||||
// static S3Protocol protocolG = S3ProtocolHTTP;
|
||||
static S3UriStyle uriStyleG = S3UriStylePath;
|
||||
static int retriesG = 5;
|
||||
static int timeoutMsG = 0;
|
||||
|
||||
static int32_t s3Begin() {
|
||||
S3Status status;
|
||||
const char *hostname = tsS3Hostname;
|
||||
const char *env_hn = getenv("S3_HOSTNAME");
|
||||
|
||||
if (env_hn) {
|
||||
hostname = env_hn;
|
||||
}
|
||||
|
||||
if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) {
|
||||
vError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
|
||||
return -1;
|
||||
}
|
||||
|
||||
protocolG = !tsS3Https;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void s3End() { S3_deinitialize(); }
|
||||
int32_t s3Init() { return s3Begin(); }
|
||||
|
||||
void s3CleanUp() { s3End(); }
|
||||
|
||||
static int should_retry() {
|
||||
/*
|
||||
if (retriesG--) {
|
||||
// Sleep before next retry; start out with a 1 second sleep
|
||||
static int retrySleepInterval = 1 * SLEEP_UNITS_PER_SECOND;
|
||||
sleep(retrySleepInterval);
|
||||
// Next sleep 1 second longer
|
||||
retrySleepInterval++;
|
||||
return 1;
|
||||
}
|
||||
*/
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
uint64_t content_length;
|
||||
int status;
|
||||
char *buf;
|
||||
char err_msg[4096];
|
||||
} TS3SizeCBD;
|
||||
|
||||
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
|
||||
//(void)callbackData;
|
||||
TS3SizeCBD *cbd = callbackData;
|
||||
if (properties->contentLength > 0) {
|
||||
cbd->content_length = properties->contentLength;
|
||||
} else {
|
||||
cbd->content_length = 0;
|
||||
}
|
||||
|
||||
return S3StatusOK;
|
||||
}
|
||||
|
||||
static void responseCompleteCallback(S3Status status, const S3ErrorDetails *error, void *callbackData) {
|
||||
TS3SizeCBD *cbd = callbackData;
|
||||
cbd->status = status;
|
||||
|
||||
int len = 0;
|
||||
const int elen = sizeof(cbd->err_msg);
|
||||
if (error) {
|
||||
if (error->message) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message);
|
||||
}
|
||||
if (error->resource) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource);
|
||||
}
|
||||
if (error->furtherDetails) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails);
|
||||
}
|
||||
if (error->extraDetailsCount) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n");
|
||||
for (int i = 0; i < error->extraDetailsCount; i++) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name,
|
||||
error->extraDetails[i].value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; }
|
||||
void s3DeleteObjectsByPrefix(const char *prefix) {}
|
||||
|
||||
void s3DeleteObjects(const char *object_name[], int nobject) {
|
||||
int status = 0;
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
|
||||
|
||||
for (int i = 0; i < nobject; ++i) {
|
||||
TS3SizeCBD cbd = {0};
|
||||
do {
|
||||
S3_delete_object(&bucketContext, object_name[i], 0, timeoutMsG, &responseHandler, &cbd);
|
||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||
|
||||
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
||||
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||
TS3SizeCBD *cbd = callbackData;
|
||||
if (cbd->content_length != bufferSize) {
|
||||
cbd->status = S3StatusAbortedByCallback;
|
||||
return S3StatusAbortedByCallback;
|
||||
}
|
||||
|
||||
char *buf = taosMemoryCalloc(1, bufferSize);
|
||||
if (buf) {
|
||||
memcpy(buf, buffer, bufferSize);
|
||||
|
||||
cbd->status = S3StatusOK;
|
||||
return S3StatusOK;
|
||||
} else {
|
||||
cbd->status = S3StatusAbortedByCallback;
|
||||
return S3StatusAbortedByCallback;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) {
|
||||
int status = 0;
|
||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
|
||||
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
|
||||
&getObjectDataCallback};
|
||||
|
||||
TS3SizeCBD cbd = {0};
|
||||
cbd.content_length = size;
|
||||
do {
|
||||
S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd);
|
||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||
|
||||
if (cbd.status != S3StatusOK) {
|
||||
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
||||
return TAOS_SYSTEM_ERROR(EIO);
|
||||
}
|
||||
|
||||
*ppBlock = cbd.buf;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
long s3Size(const char *object_name) {
|
||||
long size = 0;
|
||||
int status = 0;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
|
||||
S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback};
|
||||
|
||||
TS3SizeCBD cbd = {0};
|
||||
do {
|
||||
S3_head_object(&bucketContext, object_name, 0, 0, &responseHandler, &cbd);
|
||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||
|
||||
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
||||
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
||||
}
|
||||
|
||||
size = cbd.content_length;
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
void s3EvictCache(const char *path, long object_size) {}
|
||||
|
||||
#elif defined(USE_COS)
|
||||
|
||||
#ifdef USE_COS
|
||||
#include "cos_api.h"
|
||||
#include "cos_http_io.h"
|
||||
#include "cos_log.h"
|
||||
|
|
Loading…
Reference in New Issue