From 57fcd553f8a7e7b99b34030d39f45708fa721d9c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 28 Jun 2023 13:13:13 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 5 +++-- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 9 ++++++--- source/libs/stream/src/stream.c | 25 ++++++++++++++++++------- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamMeta.c | 11 ++++++++--- source/libs/stream/src/streamRecover.c | 18 +++++++++--------- tests/script/sh/deploy.sh | 2 +- 8 files changed, 47 insertions(+), 27 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ef533dc969..dbcc31a35e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -333,7 +333,8 @@ struct SStreamTask { // trigger int8_t triggerStatus; int64_t triggerParam; - void* timer; + void* schedTimer; + void* launchTaskTimer; SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend @@ -550,7 +551,7 @@ int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTas int64_t dstTaskId); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); -int32_t streamSetupTrigger(SStreamTask* pTask); +int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d2fcf166b4..3d9adf8156 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -90,7 +90,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); ASSERT(pTask->exec.pExecutor); - streamSetupTrigger(pTask); + streamSetupScheduleTrigger(pTask); qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ed37605d08..47a2eb6d3f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -889,11 +889,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } - streamSetupTrigger(pTask); + streamSetupScheduleTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d, scan-history:%d", + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 + " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel, - pTask->info.fillHistory); + pTask->info.fillHistory, pTask->triggerParam); // next valid version will add one pTask->chkInfo.version += 1; @@ -1189,9 +1190,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pStreamTask); + taosWLockLatch(&pMeta->lock); if (streamMetaCommit(pTask->pMeta) < 0) { // persist to disk } + taosWUnLockLatch(&pMeta->lock); } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d0c65cc256..f528080afe 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -64,14 +64,21 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; + int8_t status = atomic_load_8(&pTask->triggerStatus); + qDebug("s-task:%s in scheduler timer, trigger status:%d", pTask->id.idStr, status); + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { streamMetaReleaseTask(NULL, pTask); + qDebug("s-task:%s jump out of schedTimer", pTask->id.idStr); return; } - if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { + if (status == TASK_TRIGGER_STATUS__ACTIVE) { SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); - if (trigger == NULL) return; + if (trigger == NULL) { + return; + } + trigger->type = STREAM_INPUT__GET_RES; trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (trigger->pBlock == NULL) { @@ -84,23 +91,27 @@ void streamSchedByTimer(void* param, void* tmrId) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) { taosFreeQitem(trigger); - taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); return; } streamSchedExec(pTask); } - taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer); } -int32_t streamSetupTrigger(SStreamTask* pTask) { +int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { if (pTask->triggerParam != 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - ASSERT(ref == 2); - pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); + ASSERT(ref == 2 && pTask->schedTimer == NULL); + + qDebug("s-task:%s setup scheduler trigger, delay:%d ms", pTask->id.idStr, pTask->triggerParam); + + pTask->schedTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; } + return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 57acc6223f..c7e156bcf6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -557,7 +557,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration); - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->timer); + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); } int32_t streamDispatchStreamBlock(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a245b2d4f6..8242f84312 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -139,9 +139,14 @@ void streamMetaClose(SStreamMeta* pMeta) { } SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->timer) { - taosTmrStop(pTask->timer); - pTask->timer = NULL; + if (pTask->schedTimer) { + taosTmrStop(pTask->schedTimer); + pTask->schedTimer = NULL; + } + + if (pTask->launchTaskTimer) { + taosTmrStop(pTask->launchTaskTimer); + pTask->launchTaskTimer = NULL; } tFreeStreamTask(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 6ff7a365a9..8e4d50bcf2 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -441,7 +441,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { SStreamTaskRetryInfo* pInfo = param; SStreamMeta* pMeta = pInfo->pMeta; - qDebug("s-task:0x%x in timer to launch history task", pInfo->taskId); + qDebug("s-task:0x%x in timer to launch related history task", pInfo->taskId); taosWLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t)); @@ -472,7 +472,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { "destroyed, or should stop exec", pTask->id.idStr, pMeta->vgId, pStatus, pTask->historyTaskId.taskId); - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer); + taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); streamMetaReleaseTask(pMeta, pTask); return; } @@ -486,7 +486,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { pTask->status.timerActive = 0; streamMetaReleaseTask(pMeta, pTask); } else { - qError("s-task:0x%x failed to load task, it may have been destoryed", pInfo->taskId); + qError("s-task:0x%x failed to load task, it may have been destroyed", pInfo->taskId); } taosMemoryFree(pInfo); @@ -508,18 +508,18 @@ int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) { pInfo->taskId = pTask->id.taskId; pInfo->pMeta = pTask->pMeta; - if (pTask->timer == NULL) { - pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); - if (pTask->timer == NULL) { + if (pTask->launchTaskTimer == NULL) { + pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); + if (pTask->launchTaskTimer == NULL) { // todo failed to create timer } else { pTask->status.timerActive = 1; // timer is active - qDebug("s-task:%s set time active flag", pTask->id.idStr); + qDebug("s-task:%s set timer active flag", pTask->id.idStr); } } else { // timer exists pTask->status.timerActive = 1; - qDebug("s-task:%s set time active flag", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer); + qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); + taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); } // try again in 500ms diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 7da8da09bf..5b1773e664 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -118,7 +118,7 @@ echo "statusInterval 1" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG -echo "tmrDebugFlag 131" >> $TAOS_CFG +echo "tmrDebugFlag 143" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG echo "jniDebugFlag 143" >> $TAOS_CFG