From 1b285f8a1d00ad987febc806ba53d5a79b5d75a2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 27 Nov 2024 18:52:04 +0800 Subject: [PATCH] fix issue --- source/libs/stream/src/tstreamFileState.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 8c8aee8c7d..dcabadb8bd 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -991,6 +991,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { } winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + qDebug("===stream=== get state by cur winres:%d. %s", winCode, __func__); if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -1007,6 +1008,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; + qDebug("===stream=== read checkpoint state from disc. %s", __func__); code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos); @@ -1077,6 +1079,7 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t vlen = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); winRes = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + qDebug("===stream=== get state by cur winres:%d. %s", winRes, __func__); if (winRes != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0)) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -1085,9 +1088,17 @@ int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) { break; } + if (vlen != pFileState->rowSize) { + qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + taosMemoryFreeClear(pVal); + QUERY_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pRowBuff, pVal, vlen); taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; + qDebug("===stream=== read checkpoint state from disc. %s", __func__); winRes = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); if (winRes != TSDB_CODE_SUCCESS) { destroyRowBuffPos(pNewPos);