From dd2252598ed402b1bda417182e40c9c31e6f72ca Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Nov 2023 17:39:34 +0800 Subject: [PATCH] upload by rsync --- source/libs/stream/inc/streamBackendRocksdb.h | 3 ++ source/libs/stream/src/streamBackendRocksdb.c | 38 ++++++++++++++++ source/libs/stream/src/streamCheckpoint.c | 43 +++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index c9cb5ab64c..9d84e76a29 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -17,6 +17,7 @@ #define _STREAM_BACKEDN_ROCKSDB_H_ #include "rocksdb/c.h" +//#include "streamInt.h" #include "streamState.h" #include "tcoding.h" #include "tcommon.h" @@ -252,4 +253,6 @@ int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list); int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); + +int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** pathkj); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a06ec8c9c6..55dd938835 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1691,6 +1691,44 @@ void taskDbDestroy(void* pDb) { return; } +int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { + int64_t st = taosGetTimestampMs(); + int32_t code = -1; + int64_t refId = pDb->refId; + + if (taosAcquireRef(taskDbWrapperId, refId) == NULL) { + return -1; + } + char* pChkpDir = NULL; + char* pChkpIdDir = NULL; + if (chkpPreBuildDir(pDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { + code = -1; + } + + if (taosIsDir(pChkpIdDir) && isValidCheckpoint(pChkpIdDir)) { + code = 0; + *path = pChkpIdDir; + pChkpIdDir = NULL; + } + + taosMemoryFree(pChkpDir); + taosMemoryFree(pChkpIdDir); + taosReleaseRef(taskDbWrapperId, refId); + + return code; +} +int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** path) { + STaskDbWrapper* pDb = arg; + UPLOAD_TYPE utype = type; + + if (utype == UPLOAD_RSYNC) { + return taskDbGenChkpUplaodPath__rsync(pDb, chkpId, path); + } else if (utype == UPLOAD_S3) { + return 0; + } + return -1; +} + int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) { int32_t code = 0; char* err = NULL; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 34943454ff..51f2a18504 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -15,8 +15,16 @@ #include "cos.h" #include "rsync.h" +#include "streamBackendRocksdb.h" #include "streamInt.h" +typedef struct { + UPLOAD_TYPE type; + char* taskId; + int64_t chkpId; + + SStreamTask* pTask; +} SAsyncUploadArg; int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -328,6 +336,36 @@ void streamTaskSetFailedId(SStreamTask* pTask) { pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId; } +int32_t doUploadChkp(void* param) { + SAsyncUploadArg* arg = param; + char* path = NULL; + int32_t code = 0; + if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->chkpId, (int8_t)(arg->type), &path)) != 0) { + stError("s-task:%s faile to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); + } + if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) { + stError("s-task:%s faile to upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); + } + + taosMemoryFree(path); + taosMemoryFree(arg->taskId); + taosMemoryFree(arg); + return 0; +} +int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { + // async upload + UPLOAD_TYPE type = getUploadType(); + if (type == UPLOAD_DISABLE) { + return 0; + } + SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); + arg->type = type; + arg->taskId = taosStrdup(taskId); + arg->chkpId = chkpId; + arg->pTask = pTask; + + return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); +} int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; int64_t startTs = pTask->chkInfo.startTs; @@ -363,6 +401,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno)); + } else { + code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr); + if (code != 0) { + stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", pTask->id.idStr, ckId); + } } }