diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9b3ce36bdd..747ba34c97 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -324,12 +324,13 @@ typedef struct SStreamStatus { int8_t taskStatus; int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t schedStatus; - int32_t schedIdleTime; // idle time before invoke again - int64_t lastExecTs; // last exec time stamp int8_t statusBackup; - bool appendTranstateBlock; // has append the transfer state data block already - int32_t timerActive; // timer is active + int32_t schedIdleTime; // idle time before invoke again + int32_t timerActive; // timer is active + int64_t lastExecTs; // last exec time stamp int32_t inScanHistorySentinel; + bool appendTranstateBlock; // has append the transfer state data block already + bool supplementaryWalscan; // complete the supplementary wal scan or not } SStreamStatus; typedef struct SDataRange { diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index f7d865750a..4426ab0672 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -182,6 +182,7 @@ static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) { mndTransDrop(pTrans); return -1; } + mndTransDrop(pTrans); return 0; } @@ -210,7 +211,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ }; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name); + mInfo("receive pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name); } sdbRelease(pSdb, pStream); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a689932754..1ade1c8c41 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -835,8 +835,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { SCheckpointInfo* pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. - if (pTask->chkInfo.checkpointId != 0) { - pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; + if (pChkInfo->checkpointId != 0) { + pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3472f4e14d..fc0589031a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1080,7 +1080,7 @@ bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) { int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; + STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 27748c84a0..b0170d5083 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -390,6 +390,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pTimeWindow->skey = INT64_MIN; qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); + stDebug("s-task:%s after exceed the threshold:%" PRId64 " and then update the window filter", + pStreamTask->id.idStr, pStreamTask->dataRange.range.maxVer); } else { stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); } @@ -400,7 +402,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // 3. send msg to mnode to launch a checkpoint to keep the state for current stream streamTaskSendCheckpointReq(pStreamTask); -// streamTaskResume(pStreamTask); // 4. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; @@ -777,6 +778,8 @@ int32_t streamResumeTask(SStreamTask* pTask) { while (1) { /*int32_t code = */ doStreamExecTask(pTask); + + // check if continue taosThreadMutexLock(&pTask->lock); int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8ad20f357c..5e53a921b9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1322,7 +1322,7 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); } else { - stDebug("vgId:%d meta-runlock completed", pMeta->vgId); +// stTrace("vgId:%d meta-runlock completed", pMeta->vgId); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 20fdcff7d9..2f5bca8ed9 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -385,7 +385,6 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { void doProcessDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; - streamTaskOnHandleEventSuccess(pTask->status.pSM, event); int64_t initTs = pTask->execInfo.init; @@ -989,4 +988,3 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { streamSetParamForStreamScannerStep2(pTask, &verRange, &win); } } - diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ea20f0e2b1..e370312338 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -670,7 +670,6 @@ void streamStateFreeCur(SStreamStateCur* pCur) { if (!pCur) { return; } - qDebug("streamStateFreeCur"); streamStateResetCur(pCur); taosMemoryFree(pCur); }