support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-10 06:35:25 +00:00
parent ef247cdb1d
commit 100d2240c3
1 changed files with 21 additions and 4 deletions

View File

@ -28,14 +28,17 @@ typedef struct {
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
SArray* pAdd;
SArray* pDel;
int8_t idx;
int8_t update;
SArray* pAdd;
SArray* pDel;
int8_t update;
} SBackendManager;
typedef struct SCompactFilteFactory {
@ -258,6 +261,15 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1;
} else {
int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel);
if (code != 0) {
// dead code
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosHashClear(bm->pSstTbl[1 - bm->idx]);
bm->update = 0;
return code;
}
bm->preCkptId = bm->curChkpId;
bm->curChkpId = chkpId;
@ -714,11 +726,16 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
if (!taosDirExist(chkpPath)) {
// no checkpoint, nothing to load
taosMemoryFree(chkpPath);
return 0;
}
TdDirPtr pDir = taosOpenDir(chkpPath);
if (pDir == NULL) return 0;
if (pDir == NULL) {
taosMemoryFree(chkpPath);
return 0;
}
TdDirEntryPtr de = NULL;
SArray* suffix = taosArrayInit(4, sizeof(int64_t));