diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e093625ef1..a98ad5a4c2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -40,6 +40,10 @@ extern "C" { #define TASK_DOWNSTREAM_NOT_LEADER 0x2 #define TASK_SELF_NEW_STAGE 0x3 +#define NODE_ROLE_UNINIT 0x1 +#define NODE_ROLE_LEADER 0x2 +#define NODE_ROLE_FOLLOWER 0x3 + typedef struct SStreamTask SStreamTask; #define SSTREAM_TASK_VER 2 @@ -411,7 +415,8 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; int64_t stage; - bool leader; +// bool leader; + int32_t role; STaskStartInfo startInfo; SRWLatch lock; int32_t walScanCounter; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a29d595ef7..3a62f52bdd 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -225,7 +225,7 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqScanWalAsync(STQ* pTq, bool ckPause); int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); -int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq); @@ -249,8 +249,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7f0842736e..3e060b4a38 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -873,7 +873,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return 0; } -int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -899,7 +899,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; // only the leader node handle the check request - if (!pMeta->leader) { + if (pMeta->role == NODE_ROLE_FOLLOWER) { tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); rsp.status = TASK_DOWNSTREAM_NOT_LEADER; @@ -923,7 +923,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } -int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t vgId = pTq->pStreamMeta->vgId; @@ -1727,7 +1727,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs } // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task -int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -1840,8 +1840,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet); + + pMeta->startInfo.startedAfterNodeUpdate = 1; + if (updateTasks < numOfTasks) { - pMeta->startInfo.startedAfterNodeUpdate = 1; tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 9158702284..a5958197bd 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -24,23 +24,22 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; +static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData); static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, SSubmitTbData* pTableData); static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); -static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); -static int32_t tsAscendingSortFn(const void* p1, const void* p2); +static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks); +static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id); static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, const char* dstTableName, int64_t* uid); static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id); -static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, - SSDataBlock* pDataBlock); -static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); -static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); -static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks); +static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); +static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); +static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -255,7 +254,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* void* pBuf = NULL; int32_t numOfFinalBlocks = taosArrayGetSize(pReq->aSubmitTbData); - int32_t code = tqBuildSubmitReq(pReq, vgId, &pBuf, &len); + int32_t code = buildSubmitMsgImpl(pReq, vgId, &pBuf, &len); if (code != TSDB_CODE_SUCCESS) { tqError("s-task:%s build submit msg failed, vgId:%d, code:%s", id, vgId, tstrerror(code)); return code; @@ -274,7 +273,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pRec->numOfSubmit += 1; if ((pRec->numOfSubmit % 5000) == 0) { - double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el); @@ -462,7 +461,7 @@ int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, return code; } -int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) { +int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen) { int32_t code = 0; void* pBuf = NULL; *msgLen = 0; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index b87783cfd0..46228a46a2 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -42,7 +42,7 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { int64_t stage = pMeta->stage; pMeta->stage = state.term; - pMeta->leader = isLeader; + pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; if (isLeader) { tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, state.term, stage, isLeader); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1e61a1f9dd..97f484849c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -754,9 +754,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK: - return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); + return tqProcessTaskCheckReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: - return tqProcessStreamTaskCheckRsp(pVnode->pTq, pMsg); + return tqProcessTaskCheckRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: @@ -768,7 +768,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: - return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index dbcbfc0a94..2abd3bac05 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -224,7 +224,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); ASSERT(pInfo != NULL); - if (!pTask->pMeta->leader) { + if (pTask->pMeta->role == NODE_ROLE_FOLLOWER) { stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); status = TASK_INPUT_STATUS__REFUSED; } else { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index aa32fb6493..bd5753cac3 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1021,18 +1021,19 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); + const char* id = pTask->id.idStr; + int32_t level = pTask->info.taskLevel; + int32_t num = taosArrayGetSize(pTask->pRspMsgList); for (int32_t i = 0; i < num; ++i) { SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); tmsgSendRsp(&pInfo->msg); - stDebug("s-task:%s level:%d notify upstream:0x%x to continue process data in WAL", pTask->id.idStr, pTask->info.taskLevel, - pInfo->taskId); + stDebug("s-task:%s level:%d notify upstream:0x%x continuing scan data in WAL", id, level, pInfo->taskId); } taosArrayClear(pTask->pRspMsgList); - stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, - num); + stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", id, level, num); return 0; } @@ -1063,7 +1064,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; int32_t msgId = pTask->execInfo.dispatch; - if ((!pTask->pMeta->leader) || (pTask->status.downstreamReady != 1)) { + if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } @@ -1160,10 +1161,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // now ready for next data output atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - return TSDB_CODE_SUCCESS; + } else { + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } - - handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5e25d911b0..092566fd84 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -196,7 +196,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->chkpId = streamGetLatestCheckpointId(pMeta); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { - taosMsleep(2 * 1000); + taosMsleep(500); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { stError("vgId:%d failed to init stream backend", pMeta->vgId); @@ -205,6 +205,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + pMeta->role = NODE_ROLE_UNINIT; code = streamBackendLoadCheckpointInfo(pMeta); taosInitRWLatch(&pMeta->lock); @@ -237,6 +238,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; + pMeta->role = NODE_ROLE_UNINIT; char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128); sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); @@ -262,14 +264,14 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { - taosMsleep(2 * 1000); + taosMsleep(500); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { stError("vgId:%d failed to init stream backend", pMeta->vgId); stInfo("vgId:%d retry to init stream backend", pMeta->vgId); - // return -1; } } + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); streamBackendLoadCheckpointInfo(pMeta); @@ -346,6 +348,7 @@ void streamMetaCloseImpl(void* arg) { taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex); + pMeta->role = NODE_ROLE_UNINIT; taosMemoryFree(pMeta); stDebug("end to close stream meta"); } @@ -829,7 +832,7 @@ void metaHbToMnode(void* param, void* tmrId) { } // not leader not send msg - if (!pMeta->leader) { + if (pMeta->role == NODE_ROLE_FOLLOWER) { stInfo("vgId:%d follower not send hb to mnode", pMeta->vgId); taosReleaseRef(streamMetaId, rid); pMeta->pHbInfo->hbStart = 0; @@ -847,7 +850,7 @@ void metaHbToMnode(void* param, void* tmrId) { return; } - stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, pMeta->leader); + stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); SStreamHbMsg hbMsg = {0}; taosRLockLatch(&pMeta->lock); @@ -954,7 +957,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, - pMeta->leader, pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); + (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); taosWLockLatch(&pMeta->lock); @@ -973,7 +976,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { taosWUnLockLatch(&pMeta->lock); // wait for the stream meta hb function stopping - if (pMeta->leader) { + if (pMeta->role == NODE_ROLE_LEADER) { pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { taosMsleep(100); @@ -1002,5 +1005,5 @@ void streamMetaStartHb(SStreamMeta* pMeta) { void streamMetaInitForSnode(SStreamMeta* pMeta) { pMeta->stage = 0; - pMeta->leader = true; + pMeta->role = NODE_ROLE_LEADER; } \ No newline at end of file diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index d910c7d785..3ca81ea90b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -1000,36 +1000,6 @@ void streamTaskEnablePause(SStreamTask* pTask) { pTask->status.pauseAllowed = 1; } -// fix: this function should be removed, it may cause deadlock. -//void streamTaskHalt(SStreamTask* pTask) { -// int8_t status = pTask->status.taskStatus; -// if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { -// return; -// } -// -// if (status == TASK_STATUS__HALT) { -// return; -// } -// -// // wait for checkpoint completed -// while(pTask->status.taskStatus == TASK_STATUS__CK) { -// qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr, -// streamGetTaskStatusStr(TASK_STATUS__CK)); -// taosMsleep(1000); -// } -// -// // upgrade to halt status -// if (status == TASK_STATUS__PAUSE) { -// stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), -// streamGetTaskStatusStr(TASK_STATUS__PAUSE)); -// } else { -// stDebug("s-task:%s halt task", pTask->id.idStr); -// } -// -// pTask->status.keepTaskStatus = status; -// pTask->status.taskStatus = TASK_STATUS__HALT; -//} - void streamTaskResumeFromHalt(SStreamTask* pTask) { const char* id = pTask->id.idStr; int8_t status = pTask->status.taskStatus; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 91e91fe3d8..e5088e9c69 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -554,26 +554,26 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE } int32_t streamTaskStop(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; + int32_t vgId = pTask->pMeta->vgId; int64_t st = taosGetTimestampMs(); const char* id = pTask->id.idStr; taosThreadMutexLock(&pTask->lock); if (pTask->status.taskStatus == TASK_STATUS__CK) { - stDebug("s-task:%s in checkpoint will be discarded since task is stopped", pTask->id.idStr); + stDebug("s-task:%s in checkpoint will be discarded since task is stopped", id); } pTask->status.taskStatus = TASK_STATUS__STOP; taosThreadMutexUnlock(&pTask->lock); qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); - while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) { - stDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel); + stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id, + pTask->info.taskLevel); taosMsleep(100); } int64_t el = taosGetTimestampMs() - st; - stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el); + stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el); return 0; }