From 467c27c7585d183f03447dbca23e80346921cb55 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 7 Nov 2023 15:45:38 +0800 Subject: [PATCH] recover flush mark --- include/libs/function/function.h | 1 - source/libs/stream/src/streamState.c | 3 --- source/libs/stream/src/tstreamFileState.c | 31 +++++++---------------- 3 files changed, 9 insertions(+), 26 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 2e3cd670d7..49435a6317 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -168,7 +168,6 @@ typedef struct { struct SStreamFileState *pFileState; int32_t number; SSHashObj *parNameMap; - int64_t checkPointId; int32_t taskId; int64_t streamId; int64_t streamBackendRid; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fb0090ec6d..6ca7bc5e7b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -221,7 +221,6 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz } pState->pTdbState->pOwner = pTask; - pState->checkPointId = 0; return pState; @@ -274,7 +273,6 @@ int32_t streamStateCommit(SStreamState* pState) { SStreamSnapshot* pShot = getSnapshot(pState->pFileState); flushSnapshot(pState->pFileState, pShot, true); } - pState->checkPointId++; return 0; #else if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) { @@ -288,7 +286,6 @@ int32_t streamStateCommit(SStreamState* pState) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - pState->checkPointId++; return 0; #endif } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 8a3e7ce892..0a3970adaa 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -28,6 +28,7 @@ #define MIN_NUM_OF_ROW_BUFF 10240 #define TASK_KEY "streamFileState" +#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint" struct SStreamFileState { SList* usedBuffs; @@ -192,15 +193,13 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ recoverSesssion(pFileState, checkpointId); } - char keyBuf[128] = {0}; void* valBuf = NULL; int32_t len = 0; - sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId); - int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, keyBuf, &valBuf, &len); + int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len); if (code == TSDB_CODE_SUCCESS) { ASSERT(len == sizeof(TSKEY)); streamFileStateDecode(&pFileState->flushMark, valBuf, len); - qDebug("===stream===flushMark read:%" PRId64 ",checkpointid:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId); + qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark); } return pFileState; @@ -600,24 +599,12 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, pFileState->id, numOfElems, BATCH_LIMIT, elapsed); if (flushState) { - { - char keyBuf[128] = {0}; - void* valBuf = NULL; - int32_t len = 0; - sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId); - streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); - qDebug("===stream===flushMark write:%" PRId64 ",checkpoint id:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId); - streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); - taosMemoryFree(valBuf); - } - { - char keyBuf[128] = {0}; - char valBuf[64] = {0}; - int32_t len = 0; - memcpy(keyBuf, TASK_KEY, strlen(TASK_KEY)); - len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); - } + void* valBuf = NULL; + int32_t len = 0; + streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); + qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark); + streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0); + taosMemoryFree(valBuf); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); }