diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 604f5f7c2a..5c834a6e95 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1722,7 +1722,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; char* temp = taosMemoryCalloc(1, strlen(pDb->path)); - sprintf(temp, "%s%s%s", pDb->path, TD_DIRSEP, "tmp"); + sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId); if (taosDirExist(temp)) { taosRemoveDir(temp); @@ -3363,16 +3363,16 @@ _err: return code >= 0 ? 0 : -1; } -int32_t isBkdDataMeta(char* name) { +int32_t isBkdDataMeta(char* name, int32_t len) { const char* pCurrent = "CURRENT"; int32_t currLen = strlen(pCurrent); const char* pManifest = "MANIFEST-"; int32_t maniLen = strlen(pManifest); - if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { + if (len >= maniLen && strncmp(name, pManifest, maniLen) == 0) { return 1; - } else if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { + } else if (len == currLen && strcmp(name, pCurrent) == 0) { return 1; } return 0; @@ -3383,9 +3383,10 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { void* pIter = taosHashIterate(p2, NULL); while (pIter) { char* name = taosHashGetKey(pIter, &len); - if (!isBkdDataMeta(name) && !taosHashGet(p1, name, len)) { - char* p = taosStrdup(name); - taosArrayPush(diff, &p); + if (!isBkdDataMeta(name, len) && !taosHashGet(p1, name, len)) { + char* fname = taosMemoryCalloc(1, len + 1); + strncpy(fname, name, len); + taosArrayPush(diff, &fname); } pIter = taosHashIterate(p2, pIter); } @@ -3429,18 +3430,17 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { taosMemoryFreeClear(p->pCurrent); p->pCurrent = taosStrdup(name); - taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + // taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { taosMemoryFreeClear(p->pManifest); p->pManifest = taosStrdup(name); - taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + // taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { - // char* p = taosStrdup(name); taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } @@ -3448,18 +3448,23 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); while (pIter) { - char *name = taosHashGetKey(pIter, NULL); - stError("curr file list: %s", name); + size_t len = 0; + char* name = taosHashGetKey(pIter, &len); + + char* buf = taosMemoryCalloc(1, len + 1); + strncpy(buf, name, len); + stError("curr file list: %s", buf); pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); } if (p->init == 0) { void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); while (pIter) { - size_t len; + size_t len = 0; char* name = taosHashGetKey(pIter, &len); - if (name != NULL && !isBkdDataMeta(name)) { - char* fname = taosStrdup(name); + if (name != NULL && !isBkdDataMeta(name, len)) { + char* fname = taosMemoryCalloc(1, len + 1); + strncpy(fname, name, len); taosArrayPush(p->pAdd, &fname); } pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); @@ -3579,8 +3584,8 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { // add file to $name dir for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) { - memset(dstBuf, 0, len); memset(srcBuf, 0, len); + memset(dstBuf, 0, len); char* filename = taosArrayGetP(p->pAdd, i); sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 854cdc12b1..f99713a431 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -348,7 +348,7 @@ int32_t doUploadChkp(void* param) { if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) { stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); } - + taosRemoveDir(path); taosMemoryFree(path); taosMemoryFree(arg->taskId); taosMemoryFree(arg); @@ -435,6 +435,7 @@ static int uploadCheckpointToS3(char* id, char* path) { if (pDir == NULL) return -1; TdDirEntryPtr de = NULL; + s3Init(); while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue; @@ -454,6 +455,7 @@ static int uploadCheckpointToS3(char* id, char* path) { return -1; } stDebug("[s3] upload checkpoint:%s", filename); + break; } taosCloseDir(&pDir);