From 649d26ce72e7550009fee7a8683409643966b29a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Jan 2024 14:14:04 +0800 Subject: [PATCH] fix(stream): seek to right place to start wal read. --- source/dnode/mnode/impl/src/mndStream.c | 6 ++++-- source/dnode/vnode/src/tq/tqStreamTask.c | 18 ++++++------------ 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9c92d036b7..5528be3f0f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1583,9 +1583,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + mInfo("stream:%s, not exist, not pause stream", pauseReq.name); return 0; } else { + mError("stream:%s not exist, failed to pause stream", pauseReq.name); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } @@ -1671,10 +1672,11 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + mInfo("stream:%s not exist, not resume stream", pauseReq.name); sdbRelease(pMnode->pSdb, pStream); return 0; } else { + mError("stream:%s not exist, failed to resume stream", pauseReq.name); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index d24dc45624..280c110711 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -223,14 +223,6 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); - } else if (currentVer != pTask->chkInfo.nextProcessVer) { - int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - tqDebug("vgId:%d s-task:%s wal reader seek back to ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); } } @@ -312,17 +304,17 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; - while(1) { + while (1) { if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { *numOfItems += numOfNewItems; return numOfNewItems > 0; } SStreamQueueItem* pItem = NULL; - int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); + int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); + bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); if (itemInFillhistory) { numOfNewItems += 1; } @@ -342,7 +334,9 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems break; } } else { - tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); + walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); + tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, + pTask->chkInfo.nextProcessVer); break; } }