Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-21898

This commit is contained in:
liuyao 2023-11-09 13:59:39 +08:00
commit e56a3e8a1e
23 changed files with 666 additions and 109 deletions

View File

@ -16,7 +16,7 @@
#ifndef _TD_VND_COS_H_
#define _TD_VND_COS_H_
#include "vnd.h"
#include "os.h"
#ifdef __cplusplus
extern "C" {
@ -24,6 +24,7 @@ extern "C" {
#define S3_BLOCK_CACHE
extern int8_t tsS3StreamEnabled;
extern int8_t tsS3Enabled;
extern int32_t tsS3BlockSize;
extern int32_t tsS3BlockCacheSize;
@ -38,6 +39,7 @@ void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path);
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path);
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);

24
include/common/rsync.h Normal file
View File

@ -0,0 +1,24 @@
//
// Created by mingming wanng on 2023/11/2.
//
#ifndef TDENGINE_RSYNC_H
#define TDENGINE_RSYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tarray.h"
void stopRsync();
void startRsync();
int uploadRsync(char* id, char* path);
int downloadRsync(char* id, char* path);
int deleteRsync(char* id);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RSYNC_H

View File

@ -75,6 +75,13 @@ extern int32_t tsElectInterval;
extern int32_t tsHeartbeatInterval;
extern int32_t tsHeartbeatTimeout;
// snode
extern int32_t tsRsyncPort;
extern char tsCheckpointBackupDir[];
// vnode checkpoint
extern char tsSnodeAddress[]; //127.0.0.1:873
// mnode
extern int64_t tsMndSdbWriteDelta;
extern int64_t tsMndLogRetention;

View File

@ -237,6 +237,12 @@ void syslog(int unused, const char *format, ...);
#define TD_DIRSEP "/"
#endif
#if defined(_WIN32)
#define TD_DIRSEP_CHAR '\\'
#else
#define TD_DIRSEP_CHAR '/'
#endif
#define TD_LOCALE_LEN 64
#define TD_CHARSET_LEN 64
#define TD_TIMEZONE_LEN 96

View File

@ -46,6 +46,75 @@ target_link_libraries(
INTERFACE api
)
if(${BUILD_S3})
if(${BUILD_WITH_S3})
target_include_directories(
common
PUBLIC "$ENV{HOME}/.cos-local.2/include"
)
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(XML2_LIBRARY xml2)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
target_link_libraries(
common
# s3
PUBLIC ${S3_LIBRARY}
PUBLIC ${CURL_LIBRARY}
PUBLIC ${SSL_LIBRARY}
PUBLIC ${CRYPTO_LIBRARY}
PUBLIC ${XML2_LIBRARY}
)
add_definitions(-DUSE_S3)
endif()
if(${BUILD_WITH_COS})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
find_library(CURL_LIBRARY curl)
target_link_libraries(
common
# s3
PUBLIC cos_c_sdk_static
PUBLIC ${APR_UTIL_LIBRARY}
PUBLIC ${APR_LIBRARY}
PUBLIC ${MINIXML_LIBRARY}
PUBLIC ${CURL_LIBRARY}
)
# s3
FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
IF (APR_CONFIG_BIN)
EXECUTE_PROCESS(
COMMAND ${APR_CONFIG_BIN} --includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF()
include_directories (${APR_INCLUDE_DIR})
target_include_directories(
common
PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk"
PUBLIC "$ENV{HOME}/.cos-local.1/include"
)
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})
endif()
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})

View File

@ -1,6 +1,6 @@
#define ALLOW_FORBID_FUNC
#include "vndCos.h"
#include "cos.h"
extern char tsS3Endpoint[];
extern char tsS3AccessKeyId[];
@ -13,6 +13,7 @@ extern int8_t tsS3Https;
#if defined(USE_S3)
#include "libs3.h"
#include "tarray.h"
static int verifyPeerG = 0;
static const char *awsRegionG = NULL;
@ -34,7 +35,7 @@ static int32_t s3Begin() {
}
if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) {
vError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
uError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
return -1;
}
@ -65,12 +66,18 @@ static int should_retry() {
static void s3PrintError(const char *func, S3Status status, char error_details[]) {
if (status < S3StatusErrorAccessDenied) {
vError("%s: %s", __func__, S3_get_status_name(status));
uError("%s: %s", __func__, S3_get_status_name(status));
} else {
vError("%s: %s, %s", __func__, S3_get_status_name(status), error_details);
uError("%s: %s, %s", __func__, S3_get_status_name(status), error_details);
}
}
typedef struct {
char err_msg[128];
S3Status status;
TdFilePtr file;
} TS3GetData;
typedef struct {
char err_msg[128];
S3Status status;
@ -78,6 +85,11 @@ typedef struct {
char *buf;
} TS3SizeCBD;
static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) {
// (void)callbackData;
return S3StatusOK;
}
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
//(void)callbackData;
TS3SizeCBD *cbd = callbackData;
@ -291,7 +303,7 @@ S3Status initial_multipart_callback(const char *upload_id, void *callbackData) {
}
S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) {
responsePropertiesCallback(properties, callbackData);
responsePropertiesCallbackNull(properties, callbackData);
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
@ -390,7 +402,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &listPartsCallback};
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listPartsCallback};
list_parts_callback_data data;
@ -445,13 +457,13 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
data.noStatus = noStatus;
if (taosStatFile(file, &contentLength, NULL, NULL) < 0) {
vError("ERROR: %s Failed to stat file %s: ", __func__, file);
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
vError("ERROR: %s Failed to open file %s: ", __func__, file);
uError("ERROR: %s Failed to open file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
@ -469,7 +481,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
metaProperties, useServerSideEncryption};
if (contentLength <= MULTIPART_CHUNK_SIZE) {
S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&putObjectDataCallback};
do {
@ -486,7 +498,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
s3PrintError(__func__, data.status, data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data.contentLength) {
vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
(unsigned long long)data.contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
}
@ -506,14 +518,14 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {{&responsePropertiesCallback, &responseCompleteCallback},
S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallback, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq);
manager.next_etags_pos = 0;
@ -658,19 +670,19 @@ static void s3FreeObjectKey(void *pItem) {
taosMemoryFree(key);
}
void s3DeleteObjectsByPrefix(const char *prefix) {
static SArray* getListByPrefix(const char *prefix){
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&listBucketCallback};
const char *marker = 0, *delimiter = 0;
int maxkeys = 0, allDetails = 0;
list_bucket_callback_data data;
data.objectArray = taosArrayInit(32, POINTER_BYTES);
data.objectArray = taosArrayInit(32, sizeof(void*));
if (!data.objectArray) {
vError("%s: %s", __func__, "out of memoty");
return;
uError("%s: %s", __func__, "out of memoty");
return NULL;
}
if (marker) {
snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker);
@ -693,18 +705,15 @@ void s3DeleteObjectsByPrefix(const char *prefix) {
if (data.status == S3StatusOK) {
if (data.keyCount > 0) {
// printListBucketHeader(allDetails);
s3DeleteObjects(TARRAY_DATA(data.objectArray), TARRAY_SIZE(data.objectArray));
return data.objectArray;
}
} else {
s3PrintError(__func__, data.status, data.err_msg);
}
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
return NULL;
}
void s3DeleteObjects(const char *object_name[], int nobject) {
int status = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
@ -721,6 +730,13 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
}
}
void s3DeleteObjectsByPrefix(const char *prefix) {
SArray* objectArray = getListByPrefix(prefix);
if(objectArray == NULL)return;
s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
}
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
TS3SizeCBD *cbd = callbackData;
if (cbd->content_length != bufferSize) {
@ -758,7 +774,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
} while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) {
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
return TAOS_SYSTEM_ERROR(EIO);
}
@ -767,6 +783,67 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
return 0;
}
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
TS3GetData *cbd = (TS3GetData *) callbackData;
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
return ((wrote < (size_t) bufferSize) ?
S3StatusAbortedByCallback : S3StatusOK);
}
int32_t s3GetObjectToFile(const char *object_name, char* fileName) {
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
const char *ifMatch = 0, *ifNotMatch = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&getObjectCallback};
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[s3] open file error, errno:%d, fileName:%s", errno, fileName);
return -1;
}
TS3GetData cbd = {0};
cbd.file = pFile;
do {
S3_get_object(&bucketContext, object_name, &getConditions, 0, 0, 0, 0, &getObjectHandler, &cbd);
} while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) {
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
taosCloseFile(&pFile);
return TAOS_SYSTEM_ERROR(EIO);
}
taosCloseFile(&pFile);
return 0;
}
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){
SArray* objectArray = getListByPrefix(prefix);
if(objectArray == NULL) return -1;
for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){
char* object = taosArrayGetP(objectArray, i);
const char* tmp = strchr(object, '/');
tmp = (tmp == NULL) ? object : tmp + 1;
char fileName[PATH_MAX] = {0};
if(path[strlen(path) - 1] != TD_DIRSEP_CHAR){
snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp);
}else{
snprintf(fileName, PATH_MAX, "%s%s", path, tmp);
}
if(s3GetObjectToFile(object, fileName) != 0){
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
return -1;
}
}
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
return 0;
}
long s3Size(const char *object_name) {
long size = 0;
int status = 0;
@ -782,7 +859,7 @@ long s3Size(const char *object_name) {
} while (S3_status_is_retryable(cbd.status) && should_retry());
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
}
size = cbd.content_length;
@ -1237,6 +1314,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; }
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path) { return 0; }
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; }
void s3EvictCache(const char *path, long object_size) {}
long s3Size(const char *object_name) { return 0; }

235
source/common/src/rsync.c Normal file
View File

@ -0,0 +1,235 @@
//
// Created by mingming wanng on 2023/11/2.
//
#include "rsync.h"
#include <stdlib.h>
#include "tglobal.h"
#define ERRNO_ERR_FORMAT "errno:%d,msg:%s"
#define ERRNO_ERR_DATA errno,strerror(errno)
// deleteRsync function produce empty directories, traverse base directory to remove them
static void removeEmptyDir(){
TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir);
if (pDir == NULL) return;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
if (!taosDirEntryIsDir(de)) {
continue;
}
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
char filename[PATH_MAX] = {0};
snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de));
TdDirPtr pDirTmp = taosOpenDir(filename);
TdDirEntryPtr deTmp = NULL;
bool empty = true;
while ((deTmp = taosReadDir(pDirTmp)) != NULL){
if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue;
empty = false;
}
if(empty) taosRemoveDir(filename);
taosCloseDir(&pDirTmp);
}
taosCloseDir(&pDir);
}
#ifdef WINDOWS
// C:\TDengine\data\backup\checkpoint\ -> /c/TDengine/data/backup/checkpoint/
static void changeDirFromWindowsToLinux(char* from, char* to){
to[0] = '/';
to[1] = from[0];
for(int i = 2; i < strlen(from); i++) {
if (from[i] == '\\') {
to[i] = '/';
} else {
to[i] = from[i];
}
}
}
#endif
static int generateConfigFile(char* confDir){
TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
return -1;
}
#ifdef WINDOWS
char path[PATH_MAX] = {0};
changeDirFromWindowsToLinux(tsCheckpointBackupDir, path);
#endif
char confContent[PATH_MAX*4] = {0};
snprintf(confContent, PATH_MAX*4,
#ifndef WINDOWS
"uid = root\n"
"gid = root\n"
#endif
"use chroot = false\n"
"max connections = 200\n"
"timeout = 100\n"
"lock file = %srsync.lock\n"
"log file = %srsync.log\n"
"ignore errors = true\n"
"read only = false\n"
"list = false\n"
"[checkpoint]\n"
"path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir,
#ifdef WINDOWS
path
#else
tsCheckpointBackupDir
#endif
);
uDebug("[rsync] conf:%s", confContent);
if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0){
uError("[rsync] write conf file error,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
taosCloseFile(&pFile);
return -1;
}
taosCloseFile(&pFile);
return 0;
}
static int execCommand(char* command){
int try = 3;
int32_t code = 0;
while(try-- > 0) {
code = system(command);
if (code == 0) {
break;
}
taosMsleep(10);
}
return code;
}
void stopRsync(){
int code =
#ifdef WINDOWS
system("taskkill /f /im rsync.exe");
#else
system("pkill rsync");
#endif
if(code != 0){
uError("[rsync] stop rsync server failed,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
return;
}
uDebug("[rsync] stop rsync server successful");
}
void startRsync(){
if(taosMulMkDir(tsCheckpointBackupDir) != 0){
uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA);
return;
}
removeEmptyDir();
char confDir[PATH_MAX] = {0};
snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir);
int code = generateConfigFile(confDir);
if(code != 0){
return;
}
char cmd[PATH_MAX] = {0};
snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir);
// start rsync service to backup checkpoint
code = system(cmd);
if(code != 0){
uError("[rsync] start server failed, code:%d,"ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return;
}
uDebug("[rsync] start server successful");
}
int uploadRsync(char* id, char* path){
#ifdef WINDOWS
char pathTransform[PATH_MAX] = {0};
changeDirFromWindowsToLinux(path, pathTransform);
#endif
char command[PATH_MAX] = {0};
#ifdef WINDOWS
if(pathTransform[strlen(pathTransform) - 1] != '/'){
#else
if(path[strlen(path) - 1] != '/'){
#endif
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/",
#ifdef WINDOWS
pathTransform
#else
path
#endif
, tsSnodeAddress, id);
}else{
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
#ifdef WINDOWS
pathTransform
#else
path
#endif
, tsSnodeAddress, id);
}
int code = execCommand(command);
if(code != 0){
uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] upload data:%s successful", id);
return 0;
}
int downloadRsync(char* id, char* path){
#ifdef WINDOWS
char pathTransform[PATH_MAX] = {0};
changeDirFromWindowsToLinux(path, pathTransform);
#endif
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
tsSnodeAddress, id,
#ifdef WINDOWS
pathTransform
#else
path
#endif
);
int code = execCommand(command);
if(code != 0){
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] down data:%s successful", id);
return 0;
}
int deleteRsync(char* id){
char* tmp = "./tmp_empty/";
int code = taosMkDir(tmp);
if(code != 0){
uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/",
tmp, tsSnodeAddress, id);
code = execCommand(command);
taosRemoveDir(tmp);
if(code != 0){
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] delete data:%s successful", id);
return 0;
}

View File

@ -127,6 +127,15 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
// bool tsSmlDataFormat = false;
// int32_t tsSmlBatchSize = 10000;
// checkpoint backup
char tsSnodeAddress[TSDB_FQDN_LEN] = {0};
int32_t tsRsyncPort = 873;
#ifdef WINDOWS
char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\";
#else
char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/";
#endif
// tmq
int32_t tmqMaxTopicNum = 20;
// query
@ -260,6 +269,7 @@ char tsS3AccessKeySecret[TSDB_FQDN_LEN] = "<accesskeysecrect>";
char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
int8_t tsS3Enabled = false;
int8_t tsS3StreamEnabled = false;
int8_t tsS3Https = true;
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
@ -323,9 +333,10 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN);
}
}
if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) {
if (tsS3BucketName[0] != '<') {
#if defined(USE_COS) || defined(USE_S3)
tsS3Enabled = true;
if(tsDiskCfgNum > 1) tsS3Enabled = true;
tsS3StreamEnabled = true;
#endif
}
@ -661,6 +672,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
@ -1099,7 +1114,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tsRsyncPort = cfgGetItem(pCfg, "rsyncPort")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tstrncpy(tsSnodeAddress, cfgGetItem(pCfg, "snodeAddress")->str, TSDB_FQDN_LEN);
tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rsync.h"
#include "executor.h"
#include "sndInt.h"
#include "tstream.h"
@ -122,6 +123,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
goto FAIL;
}
stopRsync();
startRsync();
// todo fix it: send msg to mnode to rollback to an existed checkpoint
streamMetaInitForSnode(pSnode->pMeta);
return pSnode;

View File

@ -8,7 +8,6 @@ set(
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeModule.c"
"src/vnd/vnodeCos.c"
"src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
@ -161,75 +160,6 @@ target_link_libraries(
PUBLIC index
)
if(${BUILD_S3})
if(${BUILD_WITH_S3})
target_include_directories(
vnode
PUBLIC "$ENV{HOME}/.cos-local.2/include"
)
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(XML2_LIBRARY xml2)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH)
target_link_libraries(
vnode
# s3
PUBLIC ${S3_LIBRARY}
PUBLIC ${CURL_LIBRARY}
PUBLIC ${SSL_LIBRARY}
PUBLIC ${CRYPTO_LIBRARY}
PUBLIC ${XML2_LIBRARY}
)
add_definitions(-DUSE_S3)
endif()
if(${BUILD_WITH_COS})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
find_library(CURL_LIBRARY curl)
target_link_libraries(
vnode
# s3
PUBLIC cos_c_sdk_static
PUBLIC ${APR_UTIL_LIBRARY}
PUBLIC ${APR_LIBRARY}
PUBLIC ${MINIXML_LIBRARY}
PUBLIC ${CURL_LIBRARY}
)
# s3
FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/)
IF (APR_CONFIG_BIN)
EXECUTE_PROCESS(
COMMAND ${APR_CONFIG_BIN} --includedir
OUTPUT_VARIABLE APR_INCLUDE_DIR
OUTPUT_STRIP_TRAILING_WHITESPACE
)
ENDIF()
include_directories (${APR_INCLUDE_DIR})
target_include_directories(
vnode
PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk"
PUBLIC "$ENV{HOME}/.cos-local.1/include"
)
add_definitions(-DUSE_COS)
endif(${BUILD_WITH_COS})
endif()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
ENDIF ()

View File

@ -12,11 +12,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "tsdb.h"
#include "tsdbDataFileRW.h"
#include "tsdbReadUtil.h"
#include "vnd.h"
#include "vndCos.h"
#define ROCKS_BATCH_SIZE (4096)

View File

@ -14,9 +14,9 @@
*/
#include "tsdbFS2.h"
#include "cos.h"
#include "tsdbUpgrade.h"
#include "vnd.h"
#include "vndCos.h"
#define BLOCK_COMMIT_FACTOR 3

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "tsdb.h"
#include "vndCos.h"
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
int32_t code = 0;

View File

@ -15,7 +15,8 @@
#include "tsdb.h"
#include "tsdbFS2.h"
#include "vndCos.h"
#include "cos.h"
#include "vnd.h"
typedef struct {
STsdb *tsdb;

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "tsdb.h"
#include "vndCos.h"
/**
* @brief max key by precision

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "vnd.h"
#include "vndCos.h"
typedef struct SVnodeTask SVnodeTask;
struct SVnodeTask {

View File

@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "sync.h"
#include "tsdb.h"
#include "vnd.h"
#include "vndCos.h"
int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) {

View File

@ -18,7 +18,7 @@
#include "tmsg.h"
#include "tstrbuild.h"
#include "vnd.h"
#include "vndCos.h"
#include "cos.h"
#include "vnode.h"
#include "vnodeInt.h"

View File

@ -140,6 +140,18 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
typedef enum UPLOAD_TYPE{
UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0,
UPLOAD_RSYNC = 1,
} UPLOAD_TYPE;
UPLOAD_TYPE getUploadType();
int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -14,6 +14,8 @@
*/
#include "streamInt.h"
#include "rsync.h"
#include "cos.h"
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
@ -372,3 +374,91 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
static int uploadCheckpointToS3(char* id, char* path){
TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 ||
taosDirEntryIsDir(de)) continue;
char filename[PATH_MAX] = {0};
if(path[strlen(path) - 1] == TD_DIRSEP_CHAR){
snprintf(filename, sizeof(filename), "%s%s", path, name);
}else{
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
}
char object[PATH_MAX] = {0};
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
if(s3PutObjectFromFile2(filename, object) != 0){
taosCloseDir(&pDir);
return -1;
}
stDebug("[s3] upload checkpoint:%s", filename);
}
taosCloseDir(&pDir);
return 0;
}
UPLOAD_TYPE getUploadType(){
if(strlen(tsSnodeAddress) != 0){
return UPLOAD_RSYNC;
}else if(tsS3StreamEnabled){
return UPLOAD_S3;
}else{
return UPLOAD_DISABLE;
}
}
int uploadCheckpoint(char* id, char* path){
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
stError("uploadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeAddress) != 0){
return uploadRsync(id, path);
}else if(tsS3StreamEnabled){
return uploadCheckpointToS3(id, path);
}
return 0;
}
int downloadCheckpoint(char* id, char* path){
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
stError("downloadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeAddress) != 0){
return downloadRsync(id, path);
}else if(tsS3StreamEnabled){
return s3GetObjectsByPrefix(id, path);
}
return 0;
}
int deleteCheckpoint(char* id){
if(id == NULL || strlen(id) == 0){
stError("deleteCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeAddress) != 0){
return deleteRsync(id);
}else if(tsS3StreamEnabled){
s3DeleteObjectsByPrefix(id);
}
return 0;
}
int deleteCheckpointFile(char* id, char* name){
char object[128] = {0};
snprintf(object, sizeof(object), "%s/%s", id, name);
char *tmp = object;
s3DeleteObjects((const char**)&tmp, 1);
return 0;
}

View File

@ -19,7 +19,6 @@
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tcommon.h"
#include "streamInt.h"
enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1,
@ -52,6 +51,7 @@ struct SStreamSnapHandle {
int8_t filetype;
SArray* pFileList;
int32_t currFileIdx;
int8_t delFlag; // 0 : not del, 1: del
};
struct SStreamSnapBlockHdr {
int8_t type;
@ -148,6 +148,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
taosMemoryFree(tdir);
return code;
}
pHandle->delFlag = 1;
chkpId = 0;
}
@ -274,7 +275,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->checkpointId == 0) {
// del tmp dir
if (pFile && taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path);
if (handle->delFlag) taosRemoveDir(pFile->path);
}
} else {
streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
@ -344,10 +345,10 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
if(buf == NULL){
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) {
taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno);
@ -423,6 +424,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->pFileList = list;
pHandle->currFileIdx = 0;
pHandle->offset = 0;
pHandle->delFlag = 0;
*ppWriter = pWriter;
return 0;

View File

@ -18,6 +18,17 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
ADD_EXECUTABLE(checkpointTest checkpointTest.cpp)
TARGET_LINK_LIBRARIES(
checkpointTest
PUBLIC os common gtest stream executor qcom index transport util
)
TARGET_INCLUDE_DIRECTORIES(
checkpointTest
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
add_test(
NAME streamUpdateTest
COMMAND streamUpdateTest

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "rsync.h"
#include "streamInt.h"
#include "cos.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
printf("error");
}
if (s3Init() < 0) {
return -1;
}
strcpy(tsSnodeAddress, "127.0.0.1");
return RUN_ALL_TESTS();
}
TEST(testCase, checkpointUpload_Test) {
stopRsync();
startRsync();
taosSsleep(5);
char* id = "2013892036";
uploadCheckpoint(id, "/root/offset/");
}
TEST(testCase, checkpointDownload_Test) {
char* id = "2013892036";
downloadCheckpoint(id, "/root/offset/download/");
}
TEST(testCase, checkpointDelete_Test) {
char* id = "2013892036";
deleteCheckpoint(id);
}
TEST(testCase, checkpointDeleteFile_Test) {
char* id = "2013892036";
deleteCheckpointFile(id, "offset-ver0");
}