From e972ab16fe566d6446e4d11c69dd6ecaaa468b09 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 7 Nov 2023 14:11:08 +0800 Subject: [PATCH] recover flush mark --- source/libs/stream/src/tstreamFileState.c | 49 ++++++++++++++--------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index a597858e63..8a3e7ce892 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -27,6 +27,8 @@ #define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024) #define MIN_NUM_OF_ROW_BUFF 10240 +#define TASK_KEY "streamFileState" + struct SStreamFileState { SList* usedBuffs; SList* freeBuffs; @@ -113,6 +115,15 @@ void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) { return pStateKey; } +static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); } + +static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { + *pLen = sizeof(TSKEY); + (*pVal) = taosMemoryCalloc(1, *pLen); + void* buff = *pVal; + taosEncodeFixedI64(&buff, *pKey); +} + SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, int8_t type) { @@ -181,6 +192,17 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ recoverSesssion(pFileState, checkpointId); } + char keyBuf[128] = {0}; + void* valBuf = NULL; + int32_t len = 0; + sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId); + int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, keyBuf, &valBuf, &len); + if (code == TSDB_CODE_SUCCESS) { + ASSERT(len == sizeof(TSKEY)); + streamFileStateDecode(&pFileState->flushMark, valBuf, len); + qDebug("===stream===flushMark read:%" PRId64 ",checkpointid:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId); + } + return pFileState; _error: @@ -506,15 +528,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { return pFileState->usedBuffs; } -void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); } - -void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { - *pLen = sizeof(TSKEY); - (*pVal) = taosMemoryCalloc(1, *pLen); - void* buff = *pVal; - taosEncodeFixedI64(&buff, *pKey); -} - static void getDebugRowBuff(char* val, int32_t vlen, char* output) { for (int32_t i = 0; i < vlen; ++i) { if (*(val + i) == '\0') { @@ -550,6 +563,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, continue; } pPos->beFlushed = true; + pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey)); if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { @@ -586,13 +600,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, pFileState->id, numOfElems, BATCH_LIMIT, elapsed); if (flushState) { - const char* taskKey = "streamFileState"; { char keyBuf[128] = {0}; void* valBuf = NULL; int32_t len = 0; - sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); + sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); + qDebug("===stream===flushMark write:%" PRId64 ",checkpoint id:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); taosMemoryFree(valBuf); } @@ -600,7 +614,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, char keyBuf[128] = {0}; char valBuf[64] = {0}; int32_t len = 0; - memcpy(keyBuf, taskKey, strlen(taskKey)); + memcpy(keyBuf, TASK_KEY, strlen(TASK_KEY)); len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); } @@ -612,26 +626,23 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { - const char* taskKey = "streamFileState"; char keyBuf[128] = {0}; - sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId); + sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { - const char* taskKey = "streamFileState"; - return streamDefaultIterGet_rocksdb(pFileState->pFileStore, taskKey, NULL, list); + return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list); } int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t code = TSDB_CODE_SUCCESS; - const char* taskKey = "streamFileState"; int64_t maxCheckPointId = 0; { char buf[128] = {0}; void* val = NULL; int32_t len = 0; - memcpy(buf, taskKey, strlen(taskKey)); + memcpy(buf, TASK_KEY, strlen(TASK_KEY)); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); if (code != 0 || len == 0 || val == NULL) { return TSDB_CODE_FAILED; @@ -645,7 +656,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { char buf[128] = {0}; void* val = 0; int32_t len = 0; - sprintf(buf, "%s:%" PRId64 "", taskKey, i); + sprintf(buf, "%s:%" PRId64 "", TASK_KEY, i); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); if (code != 0) { return TSDB_CODE_FAILED;