support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-09 09:50:49 +00:00
parent 6fdcd82a33
commit 1db15da4e1
1 changed files with 22 additions and 17 deletions

View File

@ -32,10 +32,11 @@ typedef struct {
char* buf; char* buf;
int32_t len; int32_t len;
SArray* pAdd; SHashObj* pSstTbl[2];
SArray* pDel; SArray* pAdd;
SArray* pDel;
int8_t update; int8_t idx;
int8_t update;
} SBackendManager; } SBackendManager;
typedef struct SCompactFilteFactory { typedef struct SCompactFilteFactory {
@ -155,6 +156,10 @@ SBackendManager* backendManagerCreate(char* path) {
p->len = strlen(path) + 128; p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len); p->buf = taosMemoryCalloc(1, p->len);
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pAdd = taosArrayInit(64, sizeof(void*)); p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*)); p->pDel = taosArrayInit(64, sizeof(void*));
p->update = 0; p->update = 0;
@ -172,6 +177,8 @@ void backendManagerDestroy(SBackendManager* bm) {
taosArrayDestroyP(bm->pAdd, taosMemoryFree); taosArrayDestroyP(bm->pAdd, taosMemoryFree);
taosArrayDestroyP(bm->pDel, taosMemoryFree); taosArrayDestroyP(bm->pDel, taosMemoryFree);
taosHashCleanup(bm->pSstTbl[0]);
taosHashCleanup(bm->pSstTbl[1]);
taosMemoryFree(bm); taosMemoryFree(bm);
} }
@ -210,10 +217,6 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list
memset(bm->buf, 0, bm->len); memset(bm->buf, 0, bm->len);
sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId);
SHashObj* pTable = bm->init == 0
? bm->pSSTable
: taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree);
@ -226,48 +229,50 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(bm->pCurrent); taosMemoryFreeClear(bm->pCurrent);
bm->pCurrent = taosStrdup(name); bm->pCurrent = taosStrdup(name);
taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue; continue;
} }
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(bm->pManifest); taosMemoryFreeClear(bm->pManifest);
bm->pManifest = taosStrdup(name); bm->pManifest = taosStrdup(name);
taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
continue; continue;
} }
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
char* p = taosStrdup(name); char* p = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy));
continue; continue;
} }
} }
if (bm->init == 0) { if (bm->init == 0) {
bm->preCkptId = chkpId; bm->preCkptId = -1;
bm->curChkpId = chkpId; bm->curChkpId = chkpId;
bm->init = 1; bm->init = 1;
void* pIter = taosHashIterate(pTable, NULL); void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL);
while (pIter) { while (pIter) {
size_t len; size_t len;
char* name = taosHashGetKey(pIter, &len); char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) { if (name != NULL && len != 0) {
taosArrayPush(bm->pAdd, &name); taosArrayPush(bm->pAdd, &name);
} }
pIter = taosHashIterate(pTable, pIter); pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter);
} }
bm->update = 1; bm->update = 1;
} else { } else {
int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel); int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel);
bm->preCkptId = bm->curChkpId;
bm->curChkpId = chkpId; bm->curChkpId = chkpId;
taosHashCleanup(pTable);
if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) { if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) {
bm->update = 0; bm->update = 0;
} }
} }
taosHashClear(bm->pSstTbl[bm->idx]);
bm->idx = 1 - bm->idx;
return 0; return 0;
} }