From 6d37e596ec466f26c84b7ccd481adf24b6c35897 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 4 Jan 2024 18:47:49 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/snode/src/snode.c | 4 +- source/dnode/vnode/src/tq/tq.c | 29 +++------ source/dnode/vnode/src/tq/tqStreamTask.c | 16 +++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 10 +-- source/libs/stream/inc/streamsm.h | 5 -- source/libs/stream/src/stream.c | 4 +- source/libs/stream/src/streamCheckpoint.c | 7 +-- source/libs/stream/src/streamDispatch.c | 11 ++-- source/libs/stream/src/streamExec.c | 51 +++++++-------- source/libs/stream/src/streamMeta.c | 10 +-- source/libs/stream/src/streamStart.c | 72 ++++++++++------------ source/libs/stream/src/streamTask.c | 4 +- source/libs/stream/src/streamTaskSm.c | 13 ++-- 13 files changed, 103 insertions(+), 133 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 33eb9cd3ed..837c55c219 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -91,9 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } - char *p = NULL; - streamTaskGetStatus(pTask, &p); - + char* p = streamTaskGetStatus(pTask)->name; if (pTask->info.fillHistory) { sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 851d3f967b..203c34f1ce 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -842,8 +842,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } - char* p = NULL; - streamTaskGetStatus(pTask, &p); + char* p = streamTaskGetStatus(pTask)->name; if (pTask->info.fillHistory) { tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 @@ -932,8 +931,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step1 const char* id = pTask->id.idStr; - char* pStatus = NULL; - streamTaskGetStatus(pTask, &pStatus); + char* pStatus = streamTaskGetStatus(pTask)->name; // avoid multi-thread exec while (1) { @@ -990,15 +988,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (retInfo.ret == TASK_SCANHISTORY_REXEC) { streamReExecScanHistoryFuture(pTask, retInfo.idleTime); } else { - char* p = NULL; - ETaskStatus s = streamTaskGetStatus(pTask, &p); + SStreamTaskState* p = streamTaskGetStatus(pTask); + ETaskStatus s = p->state; if (s == TASK_STATUS__PAUSE) { tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, el, pTask->execInfo.step1El, status); } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) { - tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, p, - pTask->execInfo.step1El); + tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, + p->name, pTask->execInfo.step1El); } } @@ -1038,15 +1036,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pStreamTask); } else { ASSERT(0); -// STimeWindow* pWindow = &pTask->dataRange.window; -// ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask)); -// -// // Not update the fill-history time window until the state transfer is completed. -// tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64 -// ", window:%" PRId64 " - %" PRId64, -// id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey); -// -// code = streamTaskScanHistoryDataComplete(pTask); } atomic_store_32(&pTask->status.inScanHistorySentinel, 0); @@ -1138,7 +1127,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } streamTaskResume(pTask); - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SINK) { @@ -1275,7 +1264,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // todo save the checkpoint failed info taosThreadMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", @@ -1362,7 +1351,7 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { } taosThreadMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; // if (status == TASK_STATUS__STREAM_SCAN_HISTORY) { // streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); // } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 38b240b47e..12f540b9cf 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -220,16 +220,15 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { } // not in ready state, do not handle the data from wal - char* p = NULL; - int32_t status = streamTaskGetStatus(pTask, &p); - if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) { - tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, p); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state != TASK_STATUS__READY) { + tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, pState->name); return false; } // fill-history task has entered into the last phase, no need to anything if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { - ASSERT(status == TASK_STATUS__READY); + ASSERT(pState->state == TASK_STATUS__READY); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, pTask->dataRange.range.maxVer); @@ -342,10 +341,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { taosThreadMutexLock(&pTask->lock); - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (status != TASK_STATUS__READY) { - tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, p); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state != TASK_STATUS__READY) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState->name); taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 95bc4f4131..6a47cfc3eb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -400,11 +400,11 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); streamMetaReleaseTask(pMeta, pTask); - char* p = NULL; - streamTaskGetStatus(pTask, &p); + SStreamTaskState* pState = streamTaskGetStatus(pTask); tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, pState->name, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, + rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 @@ -706,9 +706,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead streamMetaStopAllTasks(pMeta); return 0; } else if (type == STREAM_EXEC_T_RESUME_TASK) { + // task resume to run after idle for a while SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask != NULL) { ASSERT(streamTaskReadyToRun(pTask, NULL)); + tqDebug("s-task:%s task resume to run after idle for a while", pTask->id.idStr); streamResumeTask(pTask); } @@ -772,7 +774,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); // clear flag set during do checkpoint, and open inputQ for all upstream tasks - if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) { + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { streamTaskClearCheckInfo(pTask, true); streamTaskSetStatusReady(pTask); } diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h index ea0522bd5a..abdafc0240 100644 --- a/source/libs/stream/inc/streamsm.h +++ b/source/libs/stream/inc/streamsm.h @@ -23,11 +23,6 @@ extern "C" { #endif // moore finite state machine for stream task -typedef struct SStreamTaskState { - ETaskStatus state; - char* name; -} SStreamTaskState; - typedef int32_t (*__state_trans_fn)(SStreamTask*); typedef int32_t (*__state_trans_succ_fn)(SStreamTask*); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index f383f0f31d..10b1b3fbbe 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -49,7 +49,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { return; } - if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) { + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { if (status == TASK_TRIGGER_STATUS__ACTIVE) { @@ -247,7 +247,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } // disable the data from upstream tasks - if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) { + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) { status = TASK_INPUT_STATUS__BLOCKED; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b54adb0f96..c75185b0ea 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -187,7 +187,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t code = TSDB_CODE_SUCCESS; // set task status - if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) { + if (streamTaskGetStatus(pTask)->state != TASK_STATUS__CK) { pTask->chkInfo.checkpointingId = checkpointId; code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { @@ -309,8 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { p->chkInfo.checkpointVer = p->chkInfo.processedVer; streamTaskClearCheckInfo(p, false); - char* str = NULL; - streamTaskGetStatus(p, &str); + SStreamTaskState* pState = streamTaskGetStatus(p); code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&p->lock); @@ -322,7 +321,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s", - vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str); + vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, pState->name); // save the task if not sink task if (p->info.taskLevel != TASK_LEVEL__SINK) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 00a8940b6a..a6d21e2625 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -696,10 +696,9 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { int32_t numOfVgs = taosArrayGetSize(vgInfo); pTask->notReadyTasks = numOfVgs; - char* p = NULL; - streamTaskGetStatus(pTask, &p); + SStreamTaskState* pState = streamTaskGetStatus(pTask); stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", - pTask->id.idStr, numOfVgs, p); + pTask->id.idStr, numOfVgs, pState->name); for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.downstreamTaskId = pVgInfo->taskId; @@ -819,9 +818,9 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); tmsgSendReq(pEpSet, &msg); - char* p = NULL; - streamTaskGetStatus(pTask, &p); - stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, p, + + SStreamTaskState* pState = streamTaskGetStatus(pTask); + stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pState->name, pReq->downstreamTaskId, vgId); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d68288222c..d83d99f10f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,12 +24,12 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { - ETaskStatus s = streamTaskGetStatus(pTask, NULL); - return (s == TASK_STATUS__STOP) || (s == TASK_STATUS__DROPPING); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + return (pState->state == TASK_STATUS__STOP) || (pState->state == TASK_STATUS__DROPPING); } bool streamTaskShouldPause(const SStreamTask* pTask) { - return (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE); + return (streamTaskGetStatus(pTask)->state == TASK_STATUS__PAUSE); } static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) { @@ -344,11 +344,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamMetaWUnLock(pMeta); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { - stDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", id, - pStreamTask->id.idStr); + double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.; + stDebug( + "s-task:%s fill-history task end, scal wal elapsed time:%.2fSec,update related stream task:%s info, transfer " + "exec state", + id, el, pStreamTask->id.idStr); } - ETaskStatus status = streamTaskGetStatus(pStreamTask, NULL); + ETaskStatus status = streamTaskGetStatus(pStreamTask)->state; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; // It must be halted for a source stream task, since when the related scan-history-data task start scan the history @@ -374,8 +377,10 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { // In case of sink tasks, no need to halt them. // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. - char* p = NULL; - status = streamTaskGetStatus(pStreamTask, &p); +// char* p = NULL; + SStreamTaskState* pState = streamTaskGetStatus(pStreamTask); + status = pState->state; + char* p = pState->name; if (status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING) { stError("s-task:%s failed to transfer state from fill-history task:%s, status:%s", id, pStreamTask->id.idStr, p); streamMetaReleaseTask(pMeta, pStreamTask); @@ -409,7 +414,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); // 5. save to disk - pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask, NULL); + pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; // 6. add empty delete block if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { @@ -570,7 +575,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) { int32_t blockSize = 0; int32_t numOfBlocks = 0; SStreamQueueItem* pInput = NULL; - if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__UNINIT)) { + if (streamTaskShouldStop(pTask) || (streamTaskGetStatus(pTask)->state == TASK_STATUS__UNINIT)) { stDebug("s-task:%s stream task is stopped", id); break; } @@ -647,10 +652,9 @@ int32_t doStreamExecTask(SStreamTask* pTask) { if (type == STREAM_INPUT__CHECKPOINT) { // todo add lock - char* p = NULL; - ETaskStatus s = streamTaskGetStatus(pTask, &p); - if (s == TASK_STATUS__CK) { - stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state == TASK_STATUS__CK) { + stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, pState->name); streamTaskBuildCheckpoint(pTask); } else { // todo refactor @@ -678,26 +682,27 @@ int32_t doStreamExecTask(SStreamTask* pTask) { // the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not // be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING. bool streamTaskIsIdle(const SStreamTask* pTask) { - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || status == TASK_STATUS__STOP || status == TASK_STATUS__DROPPING); } bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { - ETaskStatus st = streamTaskGetStatus(pTask, pStatus); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + ETaskStatus st = pState->state; + *pStatus = pState->name; return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK); } static void doStreamExecTaskHelper(void* param, void* tmrId) { SStreamTask* pTask = (SStreamTask*)param; - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + SStreamTaskState* p = streamTaskGetStatus(pTask); + if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { streamTaskSetSchedStatusInactive(pTask); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p, ref); + stDebug("s-task:%s status:%s not resume task, ref:%d", pTask->id.idStr, p->name, ref); streamMetaReleaseTask(pTask->pMeta, pTask); return; @@ -770,8 +775,7 @@ int32_t streamResumeTask(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); - char* p = NULL; - streamTaskGetStatus(pTask, &p); + char* p = streamTaskGetStatus(pTask)->name; stDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); return 0; } @@ -789,8 +793,7 @@ int32_t streamExecTask(SStreamTask* pTask) { if (schedStatus == TASK_SCHED_STATUS__WAITING) { streamResumeTask(pTask); } else { - char* p = NULL; - streamTaskGetStatus(pTask, &p); + char* p = streamTaskGetStatus(pTask)->name; stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8017a88dea..7ea42e62c1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1093,7 +1093,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { STaskStatusEntry entry = { .id = *pId, - .status = streamTaskGetStatus(*pTask, NULL), + .status = streamTaskGetStatus(*pTask)->state, .nodeId = hbMsg.vgId, .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), @@ -1362,13 +1362,13 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { } taosThreadMutexLock(&pTask->lock); - char* p = NULL; - ETaskStatus s = streamTaskGetStatus(pTask, &p); - if (s == TASK_STATUS__CK) { + + SStreamTaskState* pState = streamTaskGetStatus(pTask); + if (pState->state == TASK_STATUS__CK) { streamTaskSetCheckpointFailedId(pTask); stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId); } else { - stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, p); + stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->state); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 945d7844af..45a571e93c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -48,15 +48,13 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { - char* p = NULL; - int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); - ETaskStatus status = streamTaskGetStatus(pTask, &p); + int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); + SStreamTaskState* p = streamTaskGetStatus(pTask); - if ((status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/) && - pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", - pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p); + pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, p->name); } ASSERT(pTask->status.downstreamReady == 0); @@ -93,12 +91,11 @@ static void doReExecScanhistory(void* param, void* tmrId) { SStreamTask* pTask = param; pTask->schedHistoryInfo.numOfTicks -= 1; - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); - if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + SStreamTaskState* p = streamTaskGetStatus(pTask); + if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { streamMetaReleaseTask(pTask->pMeta, pTask); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref); + stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); return; } @@ -155,7 +152,7 @@ static int32_t doStartScanHistoryTask(SStreamTask* pTask) { int32_t streamTaskStartScanHistory(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; ASSERT(pTask->status.downstreamReady == 1 && ((status == TASK_STATUS__SCAN_HISTORY)/* || (status == TASK_STATUS__STREAM_SCAN_HISTORY)*/)); @@ -315,7 +312,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ id, upstreamTaskId, vgId, stage, pInfo->stage); // record the checkpoint failure id and sent to mnode taosThreadMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { streamTaskSetCheckpointFailedId(pTask); } @@ -325,7 +322,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ if (pInfo->stage != stage) { taosThreadMutexLock(&pTask->lock); - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { streamTaskSetCheckpointFailedId(pTask); } @@ -346,9 +343,8 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { streamTaskSetReady(pTask); streamTaskSetRangeStreamCalc(pTask); - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); - ASSERT(status == TASK_STATUS__READY); + SStreamTaskState* p = streamTaskGetStatus(pTask); + ASSERT(p->state == TASK_STATUS__READY); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader); @@ -372,15 +368,14 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) { streamTaskSetReady(pTask); streamTaskSetRangeStreamCalc(pTask); - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); - ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/); + SStreamTaskState* p = streamTaskGetStatus(pTask); + ASSERT(p->state == TASK_STATUS__SCAN_HISTORY); if (pTask->info.fillHistory == 1) { - stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p); + stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", id, p->name); streamTaskStartScanHistory(pTask); } else { - stDebug("s-task:%s scan wal data, status:%s", id, p); + stDebug("s-task:%s scan wal data, status:%s", id, p->name); } // NOTE: there will be an deadlock if launch fill history here. @@ -624,12 +619,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); const char* id = pTask->id.idStr; - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); + SStreamTaskState* p = streamTaskGetStatus(pTask); - if (status != TASK_STATUS__SCAN_HISTORY /*&& status != TASK_STATUS__STREAM_SCAN_HISTORY*/) { - stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p, - pReq->upstreamTaskId); + if (p->state != TASK_STATUS__SCAN_HISTORY) { + stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, + p->name, pReq->upstreamTaskId); void* pBuf = NULL; int32_t len = 0; @@ -682,7 +676,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory } int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { - ETaskStatus status = streamTaskGetStatus(pTask, NULL); + ETaskStatus status = streamTaskGetStatus(pTask)->state; // task restart now, not handle the scan-history finish rsp if (status == TASK_STATUS__UNINIT) { @@ -741,8 +735,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ASSERT((*ppTask)->status.timerActive >= 1); if (streamTaskShouldStop(*ppTask)) { - char* p = NULL; - streamTaskGetStatus((*ppTask), &p); + char* p = streamTaskGetStatus(*ppTask)->name; int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); @@ -781,10 +774,9 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // abort the timer if intend to stop task SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId); if (pHTask == NULL && (!streamTaskShouldStop(pTask))) { - char* p = NULL; + char* p = streamTaskGetStatus(pTask)->name; int32_t hTaskId = pHTaskInfo->id.taskId; - streamTaskGetStatus(pTask, &p); stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); @@ -878,7 +870,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { - if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__DROPPING) { + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) { return 0; } @@ -1058,23 +1050,21 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { } void streamTaskResume(SStreamTask* pTask) { - char* p = NULL; - ETaskStatus status = streamTaskGetStatus(pTask, &p); + SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamMeta* pMeta = pTask->pMeta; - if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT) { + if (p->state == TASK_STATUS__PAUSE || p->state == TASK_STATUS__HALT) { streamTaskRestoreStatus(pTask); - char* pNew = NULL; - streamTaskGetStatus(pTask, &pNew); - if (status == TASK_STATUS__PAUSE) { + char* pNew = streamTaskGetStatus(pTask)->name; + if (p->state == TASK_STATUS__PAUSE) { int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, p, num); + stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, p->name, num); } else { - stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, p); + stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, p->name); } } else { - stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, p); + stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, p->name); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 32fd5cbb39..c14d355dc8 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -309,7 +309,9 @@ void tFreeStreamTask(SStreamTask* pTask) { ETaskStatus status1 = TASK_STATUS__UNINIT; taosThreadMutexLock(&pTask->lock); if (pTask->status.pSM != NULL) { - status1 = streamTaskGetStatus(pTask, &p); + SStreamTaskState* pStatus = streamTaskGetStatus(pTask); + p = pStatus->name; + status1 = pStatus->state; } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index d785932109..819cdbf374 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -65,8 +65,7 @@ static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus nex static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; } static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) { - char* p = NULL; - streamTaskGetStatus(pTask, &p); + char* p = streamTaskGetStatus(pTask)->name; stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p, GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name); @@ -275,7 +274,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt while (1) { // wait for the task to be here taosThreadMutexLock(&pTask->lock); - ETaskStatus s = streamTaskGetStatus(pTask, NULL); + ETaskStatus s = streamTaskGetStatus(pTask)->state; taosThreadMutexUnlock(&pTask->lock); if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already @@ -400,12 +399,8 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even return TSDB_CODE_SUCCESS; } -ETaskStatus streamTaskGetStatus(const SStreamTask* pTask, char** pStr) { - SStreamTaskState s = pTask->status.pSM->current; // copy one obj in case of multi-thread environment - if (pStr != NULL) { - *pStr = s.name; - } - return s.state; +SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask) { + return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment } const char* streamTaskGetStatusStr(ETaskStatus status) {