diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1a29a82c11..519019892f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1691,7 +1691,7 @@ void taskDbDestroy(void* pDb) { return; } -int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { +int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { int64_t st = taosGetTimestampMs(); int32_t code = -1; int64_t refId = pDb->refId; @@ -1718,7 +1718,7 @@ int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char return code; } -int32_t taskDbGenChkpUplaodPath__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) { +int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) { SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; char* temp = taosMemoryCalloc(1, strlen(pDb->path)); @@ -1727,6 +1727,8 @@ int32_t taskDbGenChkpUplaodPath__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 if (taosDirExist(temp)) { taosRemoveDir(temp); taosMkDir(temp); + } else { + taosMkDir(temp); } bkdMgtGetDelta(p, pDb->idstr, chkpId, NULL, temp); @@ -1739,9 +1741,9 @@ int32_t taskDbGenChkpUploadPath(void* arg, void* mgt, int64_t chkpId, int8_t typ UPLOAD_TYPE utype = type; if (utype == UPLOAD_RSYNC) { - return taskDbGenChkpUplaodPath__rsync(pDb, chkpId, path); + return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); } else if (utype == UPLOAD_S3) { - return taskDbGenChkpUplaodPath__s3(pDb, mgt, chkpId, path); + return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path); } return -1; } @@ -3399,7 +3401,7 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { int32_t sstLen = strlen(pSST); memset(p->buf, 0, p->len); - sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId); + sprintf(p->buf, "%s%s%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); taosArrayClearP(p->pAdd, taosMemoryFree); taosArrayClearP(p->pDel, taosMemoryFree); @@ -3527,8 +3529,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { char* srcDir = taosMemoryCalloc(1, len); char* dstDir = taosMemoryCalloc(1, len); - sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId); - sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname); + sprintf(srcDir, "%s%s%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", p->curChkpId); + sprintf(dstDir, "%s", dname); if (!taosDirExist(srcDir)) { stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); @@ -3602,6 +3604,7 @@ _ERROR: SBkdMgt* bkdMgtCreate(char* path) { SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + p->path = taosStrdup(path); taosThreadRwlockInit(&p->rwLock, NULL); return p; } @@ -3624,7 +3627,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, int32_t code = 0; taosThreadRwlockWrlock(&bm->rwLock); - SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + SDbChkp** ppChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + SDbChkp* pChkp = NULL; if (pChkp == NULL) { char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64); @@ -3639,9 +3643,12 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, code = dbChkpDumpTo(pChkp, dname); taosThreadRwlockUnlock(&bm->rwLock); return code; + } else { + pChkp = *ppChkp; } code = dbChkpGetDelta(pChkp, chkpId, list); + code = dbChkpDumpTo(pChkp, dname); taosThreadRwlockUnlock(&bm->rwLock); return code; }