diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e3c52c78b0..00f01e4266 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -513,6 +513,8 @@ typedef struct SStreamMeta { void* qHandle; int32_t pauseTaskNum; + + void* bkdChkptMgt; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 9d84e76a29..cd620ed5b7 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -250,9 +250,9 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); SBkdMgt* bkdMgtCreate(char* path); int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); -int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list); +int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name); int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); void bkdMgtDestroy(SBkdMgt* bm); -int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** pathkj); +int32_t taskDbGenChkpUploadPath(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 55dd938835..253c18467e 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1717,14 +1717,30 @@ int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char return code; } -int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** path) { + +int32_t taskDbGenChkpUplaodPath__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) { + SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; + + char* temp = taosMemoryCalloc(1, strlen(pDb->path)); + sprintf(temp, "%s%s%s", pDb->path, TD_DIRSEP, "tmp"); + + if (!taosDirExist(temp)) { + taosMkDir(temp); + } + bkdMgtGetDelta(p, pDb->idstr, chkpId, NULL, temp); + + *path = temp; + + return 0; +} +int32_t taskDbGenChkpUploadPath(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) { STaskDbWrapper* pDb = arg; UPLOAD_TYPE utype = type; if (utype == UPLOAD_RSYNC) { - return taskDbGenChkpUplaodPath__rsync(pDb, chkpId, path); + return taskDbGenChkpUplaodPath__rsync(pDb,chkpId, path); } else if (utype == UPLOAD_S3) { - return 0; + return taskDbGenChkpUplaodPath__s3(pDb,mgt, chkpId, path); } return -1; } @@ -3603,14 +3619,28 @@ void bkdMgtDestroy(SBkdMgt* bm) { taosMemoryFree(bm); } -int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) { +int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) { int32_t code = 0; - taosThreadRwlockWrlock(&bm->rwLock); + taosThreadRwlockWrlock(&bm->rwLock); SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); - code = dbChkpGetDelta(pChkp, chkpId, list); + + if (pChkp == NULL) { + char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64); + sprintf(taskPath, "%s%s%s", bm->path, TD_DIRSEP, taskId); + + SDbChkp* p = dbChkpCreate(taskPath, chkpId); + taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*)); + + taosMemoryFree(taskPath); + } taosThreadRwlockUnlock(&bm->rwLock); + + code = dbChkpGetDelta(pChkp, chkpId, list); + + code = dbChkpDumpTo(pChkp, dname); + return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 51f2a18504..03696e1122 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -340,8 +340,10 @@ int32_t doUploadChkp(void* param) { SAsyncUploadArg* arg = param; char* path = NULL; int32_t code = 0; - if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->chkpId, (int8_t)(arg->type), &path)) != 0) { - stError("s-task:%s faile to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); + + if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, + (int8_t)(arg->type), &path)) != 0) { + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) { stError("s-task:%s faile to upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c5b98784a6..102023a728 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -356,6 +356,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->pHbInfo->stopFlag = 0; pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); + pMeta->bkdChkptMgt = bkdMgtCreate(tpath); + return pMeta; _err: