support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-09 06:10:40 +00:00
parent c43a6b272c
commit bbcfa9ab03
3 changed files with 85 additions and 12 deletions

View File

@ -200,13 +200,16 @@ void taosArrayClear(SArray* pArray);
* @param pArray * @param pArray
* @param fp * @param fp
*/ */
void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
void taosArrayClearP(SArray* pArray, void (*fp)(void*));
void* taosArrayDestroy(SArray* pArray); void* taosArrayDestroy(SArray* pArray);
void taosArrayDestroyP(SArray* pArray, FDelete fp); void taosArrayDestroyP(SArray* pArray, FDelete fp);
void taosArrayDestroyEx(SArray* pArray, FDelete fp); void taosArrayDestroyEx(SArray* pArray, FDelete fp);
void taosArraySwap(SArray* a, SArray* b); void taosArraySwap(SArray* a, SArray* b);

View File

@ -31,6 +31,9 @@ typedef struct {
char* path; char* path;
char* buf; char* buf;
int32_t len; int32_t len;
SArray* pAdd;
SArray* pDel;
} SBackendManager; } SBackendManager;
typedef struct SCompactFilteFactory { typedef struct SCompactFilteFactory {
@ -149,8 +152,25 @@ SBackendManager* backendManagerCreate(char* path) {
p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->len = strlen(path) + 128; p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len); p->buf = taosMemoryCalloc(1, p->len);
p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*));
return p; return p;
} }
void backendManagerDestroy(SBackendManager* bm) {
if (bm == NULL) return;
taosMemoryFree(bm->buf);
taosMemoryFree(bm->path);
taosHashCleanup(bm->pSSTable);
taosArrayDestroyP(bm->pSST, taosMemoryFree);
taosArrayDestroyP(bm->pAdd, taosMemoryFree);
taosArrayDestroyP(bm->pDel, taosMemoryFree);
taosMemoryFree(bm);
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0; int32_t code = 0;
@ -191,6 +211,9 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list
? bm->pSSTable ? bm->pSSTable
: taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); : taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
TdDirPtr pDir = taosOpenDir(bm->buf); TdDirPtr pDir = taosOpenDir(bm->buf);
TdDirEntryPtr de = NULL; TdDirEntryPtr de = NULL;
int8_t dummy = 0; int8_t dummy = 0;
@ -222,23 +245,23 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list
bm->curChkpId = chkpId; bm->curChkpId = chkpId;
bm->init = 1; bm->init = 1;
SArray* add = taosArrayInit(64, sizeof(void*)); // SArray* add = taosArrayInit(64, sizeof(void*));
void* pIter = taosHashIterate(pTable, NULL); void* pIter = taosHashIterate(pTable, NULL);
while (pIter) { while (pIter) {
size_t len; size_t len;
char* name = taosHashGetKey(pIter, &len); char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) { if (name != NULL && len != 0) {
taosArrayPush(add, &name); taosArrayPush(bm->pAdd, &name);
} }
pIter = taosHashIterate(pTable, pIter); pIter = taosHashIterate(pTable, pIter);
} }
} else { } else {
SArray* add = taosArrayInit(64, sizeof(void*)); // SArray* add = taosArrayInit(64, sizeof(void*));
SArray* del = taosArrayInit(64, sizeof(void*)); // SArray* del = taosArrayInit(64, sizeof(void*));
int32_t code = compareHashTable(bm->pSSTable, pTable, add, del); int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel);
bm->curChkpId = chkpId; bm->curChkpId = chkpId;
taosHashCleanup(pTable); taosHashCleanup(pTable);
@ -248,7 +271,8 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list
int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { int32_t backendManagerDumpTo(SBackendManager* bm, char* name) {
int32_t code = 0; int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(bm->path) + 64); int32_t len = bm->len + 64;
char* buf = taosMemoryCalloc(1, len);
sprintf(buf, "%s%s%s", bm->path, TD_DIRSEP, name); sprintf(buf, "%s%s%s", bm->path, TD_DIRSEP, name);
code = taosMkDir(buf); code = taosMkDir(buf);
@ -256,8 +280,37 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* name) {
return code; return code;
} }
// clear current file
memset(buf, 0, len);
sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pCurrent);
taosRemoveFile(buf);
memset(buf, 0, len);
sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pManifest);
taosRemoveFile(buf);
for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) {
memset(buf, 0, len);
char* filename = taosArrayGetP(bm->pAdd, i);
sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename);
char* src = taosMemoryCalloc(1, len);
sprintf(src, "%s%s%s%" PRId64 "%s%s", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId, TD_DIRSEP, filename);
taosCopyFile(src, buf);
}
for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) {
memset(buf, 0, len);
char* filename = taosArrayGetP(bm->pDel, i);
sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename);
taosRemoveFile(buf);
}
// clear delta data
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
return code;
} }
SCfInit ginitDict[] = { SCfInit ginitDict[] = {

View File

@ -191,7 +191,7 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
} }
if (index >= pArray->size) { if (index >= pArray->size) {
uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity); uError("index is out of range, current:%" PRIzu " max:%d", index, pArray->capacity);
return NULL; return NULL;
} }
@ -319,7 +319,7 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) {
if (NULL == pSrc) { if (NULL == pSrc) {
return NULL; return NULL;
} }
if (pSrc->size == 0) { // empty array list if (pSrc->size == 0) { // empty array list
return taosArrayInit(8, pSrc->elemSize); return taosArrayInit(8, pSrc->elemSize);
} }
@ -360,6 +360,23 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray->size = 0; pArray->size = 0;
} }
void taosArrayClearP(SArray* pArray, void (*fp)(void*)) {
// if (pArray == NULL) return;
// if (fp == NULL) {
// pArray->size = 0;
// return;
// }
// for (int32_t i = 0; i < pArray->size; ++i) {
// fp(TARRAY_GET_ELEM(pArray, i));
// }
if (pArray) {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
}
taosArrayClear(pArray);
}
void* taosArrayDestroy(SArray* pArray) { void* taosArrayDestroy(SArray* pArray) {
if (pArray) { if (pArray) {
@ -492,7 +509,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t
// order array<type *> // order array<type *>
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) { void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
taosqsort(pArray->pData, pArray->size, pArray->elemSize, param, fn); taosqsort(pArray->pData, pArray->size, pArray->elemSize, param, fn);
// taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); // taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
} }
void taosArraySwap(SArray* a, SArray* b) { void taosArraySwap(SArray* a, SArray* b) {