support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-09 12:47:49 +00:00
parent 1db15da4e1
commit 19ac9054d0
1 changed files with 15 additions and 20 deletions

View File

@ -21,16 +21,15 @@
#include "tref.h" #include "tref.h"
typedef struct { typedef struct {
int8_t init; int8_t init;
char* pCurrent; char* pCurrent;
char* pManifest; char* pManifest;
SArray* pSST; SArray* pSST;
int64_t preCkptId; int64_t preCkptId;
int64_t curChkpId; int64_t curChkpId;
SHashObj* pSSTable; char* path;
char* path; char* buf;
char* buf; int32_t len;
int32_t len;
SHashObj* pSstTbl[2]; SHashObj* pSstTbl[2];
SArray* pAdd; SArray* pAdd;
@ -146,13 +145,12 @@ void destroyFunc(void* arg);
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
SBackendManager* backendManagerCreate(char* path) { SBackendManager* bkdMgtCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
p->curChkpId = 0; p->curChkpId = 0;
p->preCkptId = 0; p->preCkptId = 0;
p->pSST = taosArrayInit(64, sizeof(void*)); p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path); p->path = taosStrdup(path);
p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->len = strlen(path) + 128; p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len); p->buf = taosMemoryCalloc(1, p->len);
@ -165,14 +163,12 @@ SBackendManager* backendManagerCreate(char* path) {
p->update = 0; p->update = 0;
return p; return p;
} }
void backendManagerDestroy(SBackendManager* bm) { void bkdMgtDestroy(SBackendManager* bm) {
if (bm == NULL) return; if (bm == NULL) return;
taosMemoryFree(bm->buf); taosMemoryFree(bm->buf);
taosMemoryFree(bm->path); taosMemoryFree(bm->path);
taosHashCleanup(bm->pSSTable);
taosArrayDestroyP(bm->pSST, taosMemoryFree); taosArrayDestroyP(bm->pSST, taosMemoryFree);
taosArrayDestroyP(bm->pAdd, taosMemoryFree); taosArrayDestroyP(bm->pAdd, taosMemoryFree);
taosArrayDestroyP(bm->pDel, taosMemoryFree); taosArrayDestroyP(bm->pDel, taosMemoryFree);
@ -204,7 +200,7 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
return code; return code;
} }
int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
const char* pCurrent = "CURRENT"; const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent); int32_t currLen = strlen(pCurrent);
@ -259,8 +255,7 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list
} }
pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter); pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter);
} }
bm->update = 1; if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1;
} else { } else {
int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel); int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel);
@ -276,7 +271,7 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list
return 0; return 0;
} }
int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) { int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) {
int32_t code = 0; int32_t code = 0;
int32_t len = bm->len + 128; int32_t len = bm->len + 128;
@ -340,7 +335,7 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* dname) {
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest); sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest);
taosCopyFile(srcBuf, dstBuf); taosCopyFile(srcBuf, dstBuf);
// clear delta data // clear delta data buf
taosArrayClearP(bm->pAdd, taosMemoryFree); taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree); taosArrayClearP(bm->pDel, taosMemoryFree);