diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d336c4235a..f6439d7499 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -32,10 +32,11 @@ typedef struct { char* buf; int32_t len; - SArray* pAdd; - SArray* pDel; - - int8_t update; + SHashObj* pSstTbl[2]; + SArray* pAdd; + SArray* pDel; + int8_t idx; + int8_t update; } SBackendManager; typedef struct SCompactFilteFactory { @@ -155,6 +156,10 @@ SBackendManager* backendManagerCreate(char* path) { p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); + p->idx = 0; + p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + p->pAdd = taosArrayInit(64, sizeof(void*)); p->pDel = taosArrayInit(64, sizeof(void*)); p->update = 0; @@ -172,6 +177,8 @@ void backendManagerDestroy(SBackendManager* bm) { taosArrayDestroyP(bm->pAdd, taosMemoryFree); taosArrayDestroyP(bm->pDel, taosMemoryFree); + taosHashCleanup(bm->pSstTbl[0]); + taosHashCleanup(bm->pSstTbl[1]); taosMemoryFree(bm); } @@ -210,10 +217,6 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list memset(bm->buf, 0, bm->len); sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); - SHashObj* pTable = bm->init == 0 - ? bm->pSSTable - : taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree); @@ -226,48 +229,50 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { taosMemoryFreeClear(bm->pCurrent); bm->pCurrent = taosStrdup(name); - taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { taosMemoryFreeClear(bm->pManifest); bm->pManifest = taosStrdup(name); - taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { char* p = taosStrdup(name); - - taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy)); continue; } } if (bm->init == 0) { - bm->preCkptId = chkpId; + bm->preCkptId = -1; bm->curChkpId = chkpId; bm->init = 1; - void* pIter = taosHashIterate(pTable, NULL); + void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL); while (pIter) { size_t len; char* name = taosHashGetKey(pIter, &len); if (name != NULL && len != 0) { taosArrayPush(bm->pAdd, &name); } - pIter = taosHashIterate(pTable, pIter); + pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter); } bm->update = 1; } else { - int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel); + int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); + bm->preCkptId = bm->curChkpId; bm->curChkpId = chkpId; - taosHashCleanup(pTable); if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) { bm->update = 0; } } + taosHashClear(bm->pSstTbl[bm->idx]); + bm->idx = 1 - bm->idx; + return 0; }