diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fa734096c9..c066aaab18 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -852,11 +852,21 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } - tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 - " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->info.fillHistory, pTask->info.triggerParam); + if (pTask->info.fillHistory) { + tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam); + } else { + tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam); + } return 0; } @@ -1181,44 +1191,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } -// notify the downstream tasks to transfer executor state after handle all history blocks. -int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - - SStreamTransferReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pReq, len); - int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, - req.downstreamTaskId); - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", - req.downstreamTaskId); - return -1; - } - - int32_t remain = streamAlignTransferState(pTask); - if (remain > 0) { - tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; - } - - // transfer the ownership of executor state - tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr); - ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); - - streamSchedExec(pTask); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; -} - // only the agg tasks and the sink tasks will receive this message from upstream tasks int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -1592,6 +1564,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { int32_t code = 0; SStreamCheckpointSourceReq req = {0}; + if (!vnodeIsRoleLeader(pTq->pVnode)) { + tqDebug("vgId:%d not leader node, ignore checkpoint-source msg", vgId); + return TSDB_CODE_SUCCESS; + } SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); @@ -1801,6 +1777,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { vInfo("vgId:%d, restart all stream tasks", vgId); tqStartStreamTasks(pTq); tqCheckAndRunStreamTaskAsync(pTq); + } else { + vInfo("vgId:%d, follower node not start stream tasks", vgId); } pMeta->taskWillbeLaunched = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 94dfc09314..9a45555d4a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -358,18 +358,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // 3. clear the link between fill-history task and stream task info - pStreamTask->historyTaskId.taskId = 0; - - // 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be + // 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be // pause, since the pause allowed attribute is not set yet. streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); - // 5. free it and remove fill-history task from disk meta-store + // 4. free it and remove fill-history task from disk meta-store streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + // 5. clear the link between fill-history task and stream task info + pStreamTask->historyTaskId.taskId = 0; + // 6. save to disk taosWLockLatch(&pMeta->lock); streamMetaSaveTask(pMeta, pStreamTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9fa9a664b9..5084caad0c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -207,9 +207,6 @@ _err: if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); - // taosThreadMutexDestroy(&pMeta->backendMutex); - // taosThreadRwlockDestroy(&pMeta->lock); - taosMemoryFree(pMeta); qError("failed to open stream meta"); @@ -695,7 +692,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - streamTaskResetUpstreamStageInfo(pTask); if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); @@ -708,6 +704,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { ASSERT(pTask->status.downstreamReady == 0); } + qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum); tdbFree(pKey); @@ -939,4 +936,9 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); } -void streamMetaStartHb(SStreamMeta* pMeta) { metaHbToMnode(pMeta, NULL); } +void streamMetaStartHb(SStreamMeta* pMeta) { + int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); + metaRefMgtAdd(pMeta->vgId, pRid); + *pRid = pMeta->rid; + metaHbToMnode(pRid, NULL); +}