feat:checkpoint bakeup using rsync

This commit is contained in:
wangmm0220 2023-11-02 20:13:29 +08:00
parent c4e1faae36
commit 159873692c
20 changed files with 445 additions and 13 deletions

View File

@ -82,6 +82,10 @@ extern int32_t tsHeartbeatTimeout;
// vnode // vnode
extern int64_t tsVndCommitMaxIntervalMs; extern int64_t tsVndCommitMaxIntervalMs;
// snode
extern char tsSnodeIp[];
extern char tsCheckpointBackupDir[];
// mnode // mnode
extern int64_t tsMndSdbWriteDelta; extern int64_t tsMndSdbWriteDelta;
extern int64_t tsMndLogRetention; extern int64_t tsMndLogRetention;

View File

@ -16,7 +16,7 @@
#ifndef _TD_VND_COS_H_ #ifndef _TD_VND_COS_H_
#define _TD_VND_COS_H_ #define _TD_VND_COS_H_
#include "vnd.h" #include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -24,6 +24,7 @@ extern "C" {
#define S3_BLOCK_CACHE #define S3_BLOCK_CACHE
extern int8_t tsS3StreamEnabled;
extern int8_t tsS3Enabled; extern int8_t tsS3Enabled;
extern int32_t tsS3BlockSize; extern int32_t tsS3BlockSize;
extern int32_t tsS3BlockCacheSize; extern int32_t tsS3BlockCacheSize;

24
include/util/rsync.h Normal file
View File

@ -0,0 +1,24 @@
//
// Created by mingming wanng on 2023/11/2.
//
#ifndef TDENGINE_RSYNC_H
#define TDENGINE_RSYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tarray.h"
void stopRsync();
void startRsync();
int uploadRsync(char* id, SArray* fileList);
int downloadRsync(char* id, char* path);
int deleteRsync(char* id);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_RSYNC_H

View File

@ -133,6 +133,10 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
// bool tsSmlDataFormat = false; // bool tsSmlDataFormat = false;
// int32_t tsSmlBatchSize = 10000; // int32_t tsSmlBatchSize = 10000;
// checkpoint backup
char tsSnodeIp[TSDB_FQDN_LEN] = {0};
char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/";
// tmq // tmq
int32_t tmqMaxTopicNum = 20; int32_t tmqMaxTopicNum = 20;
// query // query
@ -275,6 +279,7 @@ char tsS3AccessKeySecret[TSDB_FQDN_LEN] = "<accesskeysecrect>";
char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>"; char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>"; char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
int8_t tsS3Enabled = false; int8_t tsS3Enabled = false;
int8_t tsS3StreamEnabled = false;
int8_t tsS3Https = true; int8_t tsS3Https = true;
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>"; char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
@ -338,9 +343,10 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN); tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN);
} }
} }
if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) { if (tsS3BucketName[0] != '<') {
#if defined(USE_COS) || defined(USE_S3) #if defined(USE_COS) || defined(USE_S3)
tsS3Enabled = true; if(tsDiskCfgNum > 1) tsS3Enabled = true;
tsS3StreamEnabled = true;
#endif #endif
} }

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rsync.h"
#include "executor.h" #include "executor.h"
#include "sndInt.h" #include "sndInt.h"
#include "tstream.h" #include "tstream.h"
@ -120,6 +121,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
goto FAIL; goto FAIL;
} }
stopRsync();
startRsync();
// todo fix it: send msg to mnode to rollback to an existed checkpoint // todo fix it: send msg to mnode to rollback to an existed checkpoint
streamMetaInitForSnode(pSnode->pMeta); streamMetaInitForSnode(pSnode->pMeta);
return pSnode; return pSnode;

View File

@ -8,7 +8,6 @@ set(
"src/vnd/vnodeCommit.c" "src/vnd/vnodeCommit.c"
"src/vnd/vnodeQuery.c" "src/vnd/vnodeQuery.c"
"src/vnd/vnodeModule.c" "src/vnd/vnodeModule.c"
"src/vnd/vnodeCos.c"
"src/vnd/vnodeSvr.c" "src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c" "src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c" "src/vnd/vnodeSnapshot.c"

View File

@ -12,11 +12,11 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cos.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbDataFileRW.h" #include "tsdbDataFileRW.h"
#include "tsdbReadUtil.h" #include "tsdbReadUtil.h"
#include "vnd.h" #include "vnd.h"
#include "vndCos.h"
#define ROCKS_BATCH_SIZE (4096) #define ROCKS_BATCH_SIZE (4096)

View File

@ -14,9 +14,9 @@
*/ */
#include "tsdbFS2.h" #include "tsdbFS2.h"
#include "cos.h"
#include "tsdbUpgrade.h" #include "tsdbUpgrade.h"
#include "vnd.h" #include "vnd.h"
#include "vndCos.h"
#define BLOCK_COMMIT_FACTOR 3 #define BLOCK_COMMIT_FACTOR 3

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cos.h"
#include "tsdb.h" #include "tsdb.h"
#include "vndCos.h"
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
int32_t code = 0; int32_t code = 0;

View File

@ -15,7 +15,8 @@
#include "tsdb.h" #include "tsdb.h"
#include "tsdbFS2.h" #include "tsdbFS2.h"
#include "vndCos.h" #include "cos.h"
#include "vnd.h"
typedef struct { typedef struct {
STsdb *tsdb; STsdb *tsdb;

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cos.h"
#include "tsdb.h" #include "tsdb.h"
#include "vndCos.h"
/** /**
* @brief max key by precision * @brief max key by precision

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cos.h"
#include "vnd.h" #include "vnd.h"
#include "vndCos.h"
typedef struct SVnodeTask SVnodeTask; typedef struct SVnodeTask SVnodeTask;
struct SVnodeTask { struct SVnodeTask {

View File

@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cos.h"
#include "sync.h" #include "sync.h"
#include "tsdb.h" #include "tsdb.h"
#include "vnd.h" #include "vnd.h"
#include "vndCos.h"
int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) { int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) { if (pTfs) {

View File

@ -16,7 +16,7 @@
#include "tencode.h" #include "tencode.h"
#include "tmsg.h" #include "tmsg.h"
#include "vnd.h" #include "vnd.h"
#include "vndCos.h" #include "cos.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "audit.h" #include "audit.h"

View File

@ -137,6 +137,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
//#define CHECKPOINT_PATH_LEN 128
//typedef struct SChekpointDataHeader{
// int64_t size;
// char name[CHECKPOINT_PATH_LEN];
// char id[CHECKPOINT_PATH_LEN];
//} SChekpointDataHeader;
int uploadCheckpoint(char* id, SArray* fileList);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -14,6 +14,7 @@
*/ */
#include "streamInt.h" #include "streamInt.h"
#include "rsync.h"
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
@ -357,3 +358,134 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code; return code;
} }
//static int64_t kBlockSize = 64 * 1024;
//static int sendCheckpointToS3(char* id, SArray* fileList){
// code = s3PutObjectFromFile2(from->fname, object_name);
// return 0;
//}
//static int sendCheckpointToSnode(char* id, SArray* fileList){
// if(strlen(id) >= CHECKPOINT_PATH_LEN){
// tqError("uploadCheckpoint id name too long, name:%s", id);
// return -1;
// }
// uint8_t* buf = taosMemoryCalloc(1, sizeof(SChekpointDataHeader) + kBlockSize);
// if(buf == NULL){
// tqError("uploadCheckpoint malloc failed");
// return -1;
// }
//
// SChekpointDataHeader* pHdr = (SChekpointDataHeader*)buf;
// strcpy(pHdr->id, id);
//
// TdFilePtr fd = NULL;
// for(int i = 0; i < taosArrayGetSize(fileList); i++){
// char* name = (char*)taosArrayGetP(fileList, i);
// if(strlen(name) >= CHECKPOINT_PATH_LEN){
// tqError("uploadCheckpoint file name too long, name:%s", name);
// return -1;
// }
// int64_t offset = 0;
//
// fd = taosOpenFile(name, TD_FILE_READ);
// tqDebug("uploadCheckpoint open file %s, file index: %d", name, i);
//
// while(1){
// int64_t nread = taosPReadFile(fd, buf + sizeof(SChekpointDataHeader), kBlockSize, offset);
// if (nread == -1) {
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// tqError("uploadCheckpoint failed to read file name:%s,reason:%d", name, errno);
// return -1;
// } else if (nread == 0){
// tqDebug("uploadCheckpoint no data read, close file:%s, move to next file, open and read", name);
// taosCloseFile(&fd);
// break;
// } else if (nread == kBlockSize){
// offset += nread;
// } else {
// taosCloseFile(&fd);
// offset = 0;
// }
// tqDebug("uploadCheckpoint read file %s, size:%" PRId64 ", current offset:%" PRId64, name, nread, offset);
//
//
// pHdr->size = nread;
// strcpy(pHdr->name, name);
//
// SRpcMsg rpcMsg = {0};
// int32_t bytes = sizeof(SChekpointDataHeader) + nread;
// rpcMsg.pCont = rpcMallocCont(bytes);
// rpcMsg.msgType = TDMT_SYNC_SNAPSHOT_SEND;
// rpcMsg.contLen = bytes;
// if (rpcMsg.pCont == NULL) {
// tqError("uploadCheckpoint malloc failed");
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// return -1;
// }
// memcpy(rpcMsg.pCont, buf, bytes);
// int try = 3;
// int32_t code = 0;
// while(try-- > 0){
// code = tmsgSendReq(pEpSet, &rpcMsg);
// if(code == 0)
// break;
// taosMsleep(10);
// }
// if(code != 0){
// tqError("uploadCheckpoint send request failed code:%d", code);
// taosCloseFile(&fd);
// taosMemoryFree(buf);
// return -1;
// }
//
// if(offset == 0){
// break;
// }
// }
// }
//
// taosMemoryFree(buf);
//}
int uploadCheckpoint(char* id, SArray* fileList){
if(id == NULL || fileList == NULL || strlen(id) == 0 || taosArrayGetSize(fileList) == 0){
stError("uploadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeIp) != 0){
uploadRsync(id, fileList);
// }else if(tsS3StreamEnabled){
}
return 0;
}
int downloadCheckpoint(char* id, char* path){
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){
stError("downloadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeIp) != 0){
downloadRsync(id, path);
// }else if(tsS3StreamEnabled){
}
return 0;
}
int deleteCheckpoint(char* id){
if(id == NULL || strlen(id) == 0){
stError("deleteCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeIp) != 0){
deleteRsync(id);
// }else if(tsS3StreamEnabled){
}
return 0;
}

View File

@ -18,6 +18,17 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
) )
ADD_EXECUTABLE(checkpointTest checkpointTest.cpp)
TARGET_LINK_LIBRARIES(
checkpointTest
PUBLIC os common gtest stream executor qcom index transport util
)
TARGET_INCLUDE_DIRECTORIES(
checkpointTest
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
)
add_test( add_test(
NAME streamUpdateTest NAME streamUpdateTest
COMMAND streamUpdateTest COMMAND streamUpdateTest

View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "rsync.h"
#include "streamInt.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
strcpy(tsSnodeIp, "127.0.0.1");
return RUN_ALL_TESTS();
}
TEST(testCase, checkpointUpload_Test) {
stopRsync();
startRsync();
taosSsleep(5);
SArray* fileList = taosArrayInit(0, POINTER_BYTES);
char* id = "2013892036";
char* file1 = "/Users/mingmingwanng/wal1";
char* file2 = "/Users/mingmingwanng/java_error_in_clion.hprof";
taosArrayPush(fileList, &file1);
taosArrayPush(fileList, &file2);
uploadCheckpoint(id, fileList);
}
TEST(testCase, checkpointDownload_Test) {
char* id = "2013892036";
downloadRsync(id, "/Users/mingmingwanng/rsync/tmp");
}
TEST(testCase, checkpointDelete_Test) {
char* id = "2013892036";
deleteRsync(id);
}

View File

@ -1,6 +1,6 @@
#define ALLOW_FORBID_FUNC #define ALLOW_FORBID_FUNC
#include "vndCos.h" #include "cos.h"
extern char tsS3Endpoint[]; extern char tsS3Endpoint[];
extern char tsS3AccessKeyId[]; extern char tsS3AccessKeyId[];

178
source/util/src/rsync.c Normal file
View File

@ -0,0 +1,178 @@
//
// Created by mingming wanng on 2023/11/2.
//
#include "rsync.h"
#include <stdlib.h>
#include "tglobal.h"
#define ERRNO_ERR_FORMAT "errno:%d,msg:%s"
#define ERRNO_ERR_DATA errno,strerror(errno)
// deleteRsync function produce empty directories, traverse base directory to remove them
static void removeEmptyDir(){
TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir);
if (pDir == NULL) return;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
if (!taosDirEntryIsDir(de)) {
continue;
}
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
char filename[PATH_MAX] = {0};
snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de));
TdDirPtr pDirTmp = taosOpenDir(filename);
TdDirEntryPtr deTmp = NULL;
bool empty = true;
while ((deTmp = taosReadDir(pDirTmp)) != NULL){
if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue;
empty = false;
}
if(empty) taosRemoveDir(filename);
taosCloseDir(&pDirTmp);
}
taosCloseDir(&pDir);
}
static int generateConfigFile(char* confDir){
TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
return -1;
}
char confContent[PATH_MAX*4] = {0};
snprintf(confContent, PATH_MAX*4,
"uid = root\n"
"gid = root\n"
"use chroot = false\n"
"max connections = 200\n"
"timeout = 100\n"
"lock file = %srsync.lock\n"
"log file = %srsync.log\n"
"ignore errors = true\n"
"read only = false\n"
"list = false\n"
"[checkpoint]\n"
"path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir, tsCheckpointBackupDir);
uDebug("[rsync] conf:%s", confContent);
if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0){
uError("[rsync] write conf file error,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
taosCloseFile(&pFile);
return -1;
}
taosCloseFile(&pFile);
return 0;
}
static int execCommand(char* command){
int try = 3;
int32_t code = 0;
while(try-- > 0) {
code = system(command);
if (code == 0) {
break;
}
taosMsleep(10);
}
return code;
}
void stopRsync(){
int code = system("pkill rsync");
if(code != 0){
uError("[rsync] stop rsync server failed,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
return;
}
uDebug("[rsync] stop rsync server successful");
}
void startRsync(){
if(taosMulMkDir(tsCheckpointBackupDir) != 0){
uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA);
return;
}
removeEmptyDir();
char confDir[PATH_MAX] = {0};
snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir);
int code = generateConfigFile(confDir);
if(code != 0){
return;
}
char cmd[PATH_MAX] = {0};
snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir);
// start rsync service to backup checkpoint
code = system(cmd);
if(code != 0){
uError("[rsync] start server failed, code:%d,"ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return;
}
uDebug("[rsync] start server successful");
}
int uploadRsync(char* id, SArray* fileList){
for(int i = 0; i < taosArrayGetSize(fileList); i++) {
char* fullName = (char*)taosArrayGetP(fileList, i);
char command[PATH_MAX] = {0};
// char* name = strrchr(fullName, '/');
// if(name == NULL){
// uError("[rsync] file name invalid, name:%s", name);
// return -1;
// }
// name = name + 1;
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
fullName, tsSnodeIp, id);
int code = execCommand(command);
if(code != 0){
uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
}
uDebug("[rsync] upload data:%s successful", id);
return 0;
}
int downloadRsync(char* id, char* path){
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
tsSnodeIp, id, path);
int code = execCommand(command);
if(code != 0){
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] down data:%s successful", id);
return 0;
}
int deleteRsync(char* id){
char* tmp = "./tmp_empty/";
int code = taosMkDir(tmp);
if(code != 0){
uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/",
tmp, tsSnodeIp, id);
code = execCommand(command);
taosRemoveDir(tmp);
if(code != 0){
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] delete data:%s successful", id);
return 0;
}