From b6f4cac619091e07b8a67d7fb50106ddf35ca110 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Apr 2023 13:35:24 +0800 Subject: [PATCH] fix(stream): set the correct offset version. --- source/dnode/vnode/src/tq/tqRestore.c | 5 ++--- source/libs/stream/src/streamExec.c | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 6081057a4b..c164d037e0 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -128,7 +128,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } // append the data for the stream - tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pTask->chkInfo.currentVer, pTask->id.idStr); + tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); SPackedData packData = {0}; code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData); @@ -147,14 +147,13 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { noNewDataInWal = false; - tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr); code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); if (code == TSDB_CODE_SUCCESS) { pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, pTask->chkInfo.currentVer); } else { - // do nothing + tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } streamDataSubmitDestroy(p); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f82e9b9621..325d315262 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -111,7 +111,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* continue; } - qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId); + qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId); SSDataBlock block = {0}; assignOneDataBlock(&block, output); @@ -306,7 +306,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); - pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId}; + pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; taosWLockLatch(&pTask->pMeta->lock); streamMetaSaveTask(pTask->pMeta, pTask);