diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f6439d7499..f34b7dc3cb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -21,16 +21,15 @@ #include "tref.h" typedef struct { - int8_t init; - char* pCurrent; - char* pManifest; - SArray* pSST; - int64_t preCkptId; - int64_t curChkpId; - SHashObj* pSSTable; - char* path; - char* buf; - int32_t len; + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + char* buf; + int32_t len; SHashObj* pSstTbl[2]; SArray* pAdd; @@ -146,13 +145,12 @@ void destroyFunc(void* arg); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); -SBackendManager* backendManagerCreate(char* path) { +SBackendManager* bkdMgtCreate(char* path) { SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); p->curChkpId = 0; p->preCkptId = 0; p->pSST = taosArrayInit(64, sizeof(void*)); p->path = taosStrdup(path); - p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); @@ -165,14 +163,12 @@ SBackendManager* backendManagerCreate(char* path) { p->update = 0; return p; } -void backendManagerDestroy(SBackendManager* bm) { +void bkdMgtDestroy(SBackendManager* bm) { if (bm == NULL) return; taosMemoryFree(bm->buf); taosMemoryFree(bm->path); - taosHashCleanup(bm->pSSTable); - taosArrayDestroyP(bm->pSST, taosMemoryFree); taosArrayDestroyP(bm->pAdd, taosMemoryFree); taosArrayDestroyP(bm->pDel, taosMemoryFree); @@ -204,7 +200,7 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { return code; } -int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { +int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { const char* pCurrent = "CURRENT"; int32_t currLen = strlen(pCurrent); @@ -259,8 +255,7 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list } pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter); } - bm->update = 1; - + if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1; } else { int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); @@ -276,7 +271,7 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list return 0; } -int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { +int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) { int32_t code = 0; int32_t len = bm->len + 128; @@ -340,7 +335,7 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); taosCopyFile(srcBuf, dstBuf); - // clear delta data + // clear delta data buf taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree);