From c4a49a7bd9098dc0a0f1127830c3f807dbacc58a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 00:45:45 +0800 Subject: [PATCH 1/4] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 80 +++++++++---------- source/libs/stream/src/streamMeta.c | 94 ----------------------- source/libs/stream/src/streamStartTask.c | 58 +++++++++++++- source/libs/stream/src/streamTask.c | 2 +- source/libs/stream/src/streamUtil.c | 51 ++++++++++++ 5 files changed, 148 insertions(+), 137 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 35d5ba4e08..1a8b5c1028 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -209,29 +209,18 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream return code; } -int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { - SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); - if (pDataBlock == NULL) { - return TSDB_CODE_INVALID_PARA; - } - - int64_t checkpointId = pDataBlock->info.version; - int32_t transId = pDataBlock->info.window.skey; - const char* id = pTask->id.idStr; - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pTask->pMeta->vgId; - int32_t taskLevel = pTask->info.taskLevel; +static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId) { + int32_t code = 0; + int32_t vgId = pTask->pMeta->vgId; + int32_t taskLevel = pTask->info.taskLevel; + const char* id = pTask->id.idStr; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - - streamMutexLock(&pTask->lock); if (pTask->chkInfo.checkpointId > checkpointId) { stError("s-task:%s vgId:%d current checkpointId:%" PRId64 " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -239,37 +228,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64 " discard the checkpoint-trigger block", id, vgId, checkpointId, transId, pActiveInfo->failedId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } if (pTask->chkInfo.checkpointId == checkpointId) { { // send checkpoint-ready msg to upstream - SRpcMsg msg = {0}; + SRpcMsg msg = {0}; SStreamUpstreamEpInfo* pInfo = NULL; streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo); if (pInfo == NULL) { - streamMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg); if (code == TSDB_CODE_SUCCESS) { code = tmsgSendReq(&pInfo->epSet, &msg); + if (code) { + stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code)); + } } } stWarn( - "s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the " - "interrupted checkpoint", + "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume " + "the interrupted checkpoint", id, vgId, pBlock->srcTaskId); streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -278,9 +263,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 " discard", id, vgId, pActiveInfo->activeId, checkpointId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } else { // checkpointId == pActiveInfo->activeId if (pActiveInfo->allUpstreamTriggerRecv == 1) { @@ -288,8 +270,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " "checkpointId:%" PRId64 " transId:%d", id, vgId, checkpointId, transId); - streamMutexUnlock(&pTask->lock); - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -298,7 +278,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); if (p == NULL) { - streamMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; } @@ -306,9 +285,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); - - streamMutexUnlock(&pTask->lock); - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } } @@ -316,7 +292,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock } } + return 0; +} + +int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { + int64_t checkpointId = 0; + int32_t transId = 0; + const char* id = pTask->id.idStr; + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pTask->pMeta->vgId; + int32_t taskLevel = pTask->info.taskLevel; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); + if (pDataBlock == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + checkpointId = pDataBlock->info.version; + transId = pDataBlock->info.window.skey; + + streamMutexLock(&pTask->lock); + code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId); streamMutexUnlock(&pTask->lock); + if (code) { + streamFreeQitem((SStreamQueueItem*)pBlock); + return code; + } stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64 ", transId:%d current active checkpointId:%" PRId64, @@ -367,6 +369,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // before the next checkpoint. code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); if (code) { + streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -675,10 +678,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } streamMetaWLock(pMeta); - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } + code = streamMetaCommit(pMeta); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 514e25c689..d8249666c3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -458,9 +458,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, code = createMetaHbInfo(pRid, &pMeta->pHbInfo); TSDB_CHECK_CODE(code, lino, _err); - pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); - TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno); - code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); TSDB_CHECK_CODE(code, lino, _err); @@ -629,9 +626,6 @@ void streamMetaCloseImpl(void* arg) { taosMemoryFree(pMeta->path); streamMutexDestroy(&pMeta->backendMutex); - taosCleanUpScheduler(pMeta->qHandle); - taosMemoryFree(pMeta->qHandle); - bkdMgtDestroy(pMeta->bkdChkptMgt); pMeta->role = NODE_ROLE_UNINIT; @@ -1261,40 +1255,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) { streamMetaHbToMnode(pRid, NULL); } -void streamMetaRLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-rlock", pMeta->vgId); - int32_t code = taosThreadRwlockRdlock(&pMeta->lock); - if (code) { - stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); - } -} - -void streamMetaRUnLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-runlock", pMeta->vgId); - int32_t code = taosThreadRwlockUnlock(&pMeta->lock); - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code)); - } else { - // stTrace("vgId:%d meta-runlock completed", pMeta->vgId); - } -} - -void streamMetaWLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-wlock", pMeta->vgId); - int32_t code = taosThreadRwlockWrlock(&pMeta->lock); - if (code) { - stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code)); - } -} - -void streamMetaWUnLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-wunlock", pMeta->vgId); - int32_t code = taosThreadRwlockUnlock(&pMeta->lock); - if (code) { - stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code)); - } -} - int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { QRY_PARAM_CHECK(pList); @@ -1398,60 +1358,6 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { return 0; } -int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t now = taosGetTimestampMs(); - int64_t startTs = 0; - bool hasFillhistoryTask = false; - STaskId hId = {0}; - - stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); - - streamMetaRLock(pMeta); - - STaskId id = {.streamId = streamId, .taskId = taskId}; - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - - if (ppTask != NULL) { - startTs = (*ppTask)->taskCheckInfo.startTs; - hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); - hId = (*ppTask)->hTaskInfo.id; - - streamMetaRUnLock(pMeta); - - // add the failed task info, along with the related fill-history task info into tasks list. - code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); - if (hasFillhistoryTask) { - code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); - } - } else { - streamMetaRUnLock(pMeta); - - stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - streamId, taskId, pMeta->vgId); - code = TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - return code; -} - -void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { - int32_t startTs = pTask->execInfo.checkTs; - int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); - if (code) { - stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code)); - } - - // automatically set the related fill-history task to be failed. - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pId = &pTask->hTaskInfo.id; - code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); - if (code) { - stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code)); - } - } -} - void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs) { const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 98c6534b46..0858f57414 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -30,8 +30,8 @@ typedef struct STaskInitTs { } STaskInitTs; static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now); -static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal); -static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ); +static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal); +static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ); // restore the checkpoint id by negotiating the latest consensus checkpoint id int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { @@ -505,3 +505,57 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) { stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts); } + +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t now = taosGetTimestampMs(); + int64_t startTs = 0; + bool hasFillhistoryTask = false; + STaskId hId = {0}; + + stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); + + streamMetaRLock(pMeta); + + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + + if (ppTask != NULL) { + startTs = (*ppTask)->taskCheckInfo.startTs; + hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); + hId = (*ppTask)->hTaskInfo.id; + + streamMetaRUnLock(pMeta); + + // add the failed task info, along with the related fill-history task info into tasks list. + code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + } + } else { + streamMetaRUnLock(pMeta); + + stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + streamId, taskId, pMeta->vgId); + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + return code; +} + +void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { + int32_t startTs = pTask->execInfo.checkTs; + int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + if (code) { + stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code)); + } + + // automatically set the related fill-history task to be failed. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pId = &pTask->hTaskInfo.id; + code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + if (code) { + stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code)); + } + } +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9a324084ff..365710f8a7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -801,8 +801,8 @@ bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; ret = true; } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); return ret; } diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index b79ca32ff3..cef2ba35e7 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -35,3 +35,54 @@ void streamMutexDestroy(TdThreadMutex *pMutex) { stError("%p mutex destroy, code:%s", pMutex, tstrerror(code)); } } + +void streamMetaRLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-rlock", pMeta->vgId); + int32_t code = taosThreadRwlockRdlock(&pMeta->lock); + if (code) { + stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } +} + +void streamMetaRUnLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-runlock", pMeta->vgId); + int32_t code = taosThreadRwlockUnlock(&pMeta->lock); + if (code != TSDB_CODE_SUCCESS) { + stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } else { + // stTrace("vgId:%d meta-runlock completed", pMeta->vgId); + } +} + +void streamMetaWLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-wlock", pMeta->vgId); + int32_t code = taosThreadRwlockWrlock(&pMeta->lock); + if (code) { + stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code)); + } +} + +void streamMetaWUnLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-wunlock", pMeta->vgId); + int32_t code = taosThreadRwlockUnlock(&pMeta->lock); + if (code) { + stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code)); + } +} + +void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino) { + int32_t oldCode = atomic_val_compare_exchange_32(&pMeta->fatalInfo.code, 0, code); + if (oldCode == 0) { + pMeta->fatalInfo.ts = taosGetTimestampMs(); + pMeta->fatalInfo.threadId = taosGetSelfPthreadId(); + tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func)); + pMeta->fatalInfo.line = lino; + stInfo("vgId:%d set global fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); + } else { + stFatal("vgId:%d existed global fatal eror:%s, failed to set new fatal error code:%s", pMeta->vgId, code); + } +} + +int32_t streamGetFatalError(const SStreamMeta* pMeta) { + return atomic_load_32((volatile int32_t*) &pMeta->fatalInfo.code); +} From 3332a0b822c41fd593e16444fa95e93d4da3fde5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 01:19:30 +0800 Subject: [PATCH 2/4] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 26 +++++++---- source/dnode/mnode/impl/src/mndStream.c | 52 +++++++++++----------- source/dnode/vnode/src/tqCommon/tqCommon.c | 40 ++++++++++------- source/libs/stream/src/streamMeta.c | 22 +++++---- source/libs/stream/src/streamUtil.c | 5 ++- 5 files changed, 86 insertions(+), 59 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b77c8535f1..29759bc561 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -494,6 +494,14 @@ typedef struct SScanWalInfo { tmr_h scanTimer; } SScanWalInfo; +typedef struct SFatalErrInfo { + int32_t code; + int64_t ts; + int32_t threadId; + int32_t line; + char func[128]; +} SFatalErrInfo; + // meta typedef struct SStreamMeta { char* path; @@ -523,14 +531,13 @@ typedef struct SStreamMeta { int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfPausedTasks; int64_t rid; - - int64_t chkpId; - int32_t chkpCap; - SArray* chkpSaved; - SArray* chkpInUse; - SRWLatch chkpDirLock; - void* qHandle; // todo remove it - void* bkdChkptMgt; + SFatalErrInfo fatalInfo; // fatal error occurs, stream stop to execute + int64_t chkpId; + int32_t chkpCap; + SArray* chkpSaved; + SArray* chkpInUse; + SRWLatch chkpDirLock; + void* bkdChkptMgt; } SStreamMeta; typedef struct STaskUpdateEntry { @@ -776,6 +783,9 @@ void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); +void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino); +int32_t streamGetFatalError(const SStreamMeta* pMeta); + void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId); int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pTaskList); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e0b8caa938..1fb398d070 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1143,18 +1143,16 @@ int32_t extractStreamNodeList(SMnode *pMnode) { return taosArrayGetSize(execInfo.pNodeList); } -static bool taskNodeIsUpdated(SMnode *pMnode) { - bool allReady = true; - SArray *pNodeSnapshot = NULL; - - // check if the node update happens or not - streamMutexLock(&execInfo.lock); +static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) { + bool allReady = false; + bool nodeUpdated = false; + SVgroupChangeInfo changeInfo = {0}; int32_t numOfNodes = extractStreamNodeList(pMnode); + if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = taosGetTimestampSec(); - streamMutexUnlock(&execInfo.lock); return false; } @@ -1166,43 +1164,46 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); - streamMutexUnlock(&execInfo.lock); return true; } } - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); } if (!allReady) { mWarn("not all vnodes ready, quit from vnodes status check"); - taosArrayDestroy(pNodeSnapshot); - streamMutexUnlock(&execInfo.lock); return true; } - SVgroupChangeInfo changeInfo = {0}; - code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo); if (code) { - streamMutexUnlock(&execInfo.lock); - return false; + nodeUpdated = false; + } else { + nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + if (nodeUpdated) { + mDebug("stream tasks not ready due to node update"); + } } - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - mndDestroyVgroupChangeInfo(&changeInfo); - taosArrayDestroy(pNodeSnapshot); - - if (nodeUpdated) { - mDebug("stream tasks not ready due to node update"); - } - - streamMutexUnlock(&execInfo.lock); return nodeUpdated; } +// check if the node update happens or not +static bool taskNodeIsUpdated(SMnode *pMnode) { + SArray *pNodeSnapshot = NULL; + + streamMutexLock(&execInfo.lock); + bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot); + streamMutexUnlock(&execInfo.lock); + + taosArrayDestroy(pNodeSnapshot); + return updated; +} + static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { bool ready = true; if (taskNodeIsUpdated(pMnode)) { @@ -1993,7 +1994,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { mndDestroyVgroupChangeInfo(pInfo); - return terrno; + TSDB_CHECK_NULL(NULL, code, lino, _err, terrno); } int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); @@ -2048,6 +2049,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis return code; _err: + mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino); mndDestroyVgroupChangeInfo(pInfo); return code; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3f4329f22b..116acd4636 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -160,6 +160,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tDecoderClear(&decoder); + int32_t gError = streamGetFatalError(pMeta); + if (gError != 0) { + tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError), + pMeta->fatalInfo.ts, pMeta->fatalInfo.func); + return 0; + } + // update the nodeEpset when it exists streamMetaWLock(pMeta); @@ -290,8 +297,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); } else { - if (streamMetaCommit(pMeta) < 0) { - // persist to disk + if ((code = streamMetaCommit(pMeta)) < 0) { + // always return true + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return TSDB_CODE_SUCCESS; } streamMetaClearSetUpdateTaskListComplete(pMeta); @@ -754,8 +764,9 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored } streamMetaWUnLock(pMeta); + // always return success when handling the requirement issued by mnode during transaction. - return code; + return TSDB_CODE_SUCCESS; } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { @@ -1197,10 +1208,6 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } -int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) { - return doProcessDummyRspMsg(pMeta, pMsg); -} - int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; @@ -1221,14 +1228,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; int32_t code = 0; SStreamTask* pTask = NULL; - SRestoreCheckpointInfo req = {0}; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int64_t now = taosGetTimestampMs(); + SDecoder decoder; + SRestoreCheckpointInfo req = {0}; - SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); @@ -1239,16 +1245,15 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || (code != 0)) { - tqError( - "vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, req.taskId); + tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, req.taskId); // ignore this code to avoid error code over write int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); if (ret) { tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret)); } - return code; + return 0; } // discard the rsp, since it is expired. @@ -1272,7 +1277,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_STREAM_INTERNAL_ERROR; + return 0; } SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo; @@ -1299,10 +1304,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (pMeta->role == NODE_ROLE_LEADER) { code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); + if (code) { + tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code)); + } } else { tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr); } streamMetaReleaseTask(pMeta, pTask); - return code; + return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d8249666c3..f4202667ff 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -923,32 +923,38 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { streamMetaWLock(pMeta); int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (code) { + streamSetFatalError(pMeta, code, __func__, __LINE__); + } streamMetaWUnLock(pMeta); return code; } int32_t streamMetaCommit(SStreamMeta* pMeta) { - int32_t code = 0; - code = tdbCommit(pMeta->db, pMeta->txn); + int32_t code = tdbCommit(pMeta->db, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to commit stream meta", pMeta->vgId); - return code; + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code), + pMeta->fatalInfo.line); } code = tdbPostCommit(pMeta->db, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code), + pMeta->fatalInfo.line); return code; } code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); if (code != 0) { - stError("vgId:%d failed to begin trans", pMeta->vgId); - return code; + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line); + } else { + stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); } - stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); return code; } diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index cef2ba35e7..5bf9370cb7 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -77,9 +77,10 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, pMeta->fatalInfo.threadId = taosGetSelfPthreadId(); tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func)); pMeta->fatalInfo.line = lino; - stInfo("vgId:%d set global fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); + stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); } else { - stFatal("vgId:%d existed global fatal eror:%s, failed to set new fatal error code:%s", pMeta->vgId, code); + stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId, + pMeta->fatalInfo.ts, tstrerror(code)); } } From 1d2c00a4fff9fe1d788a09f14bc73bba0f5389f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 01:25:16 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/libs/stream/src/streamUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index 5bf9370cb7..4c481e6041 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -80,7 +80,7 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); } else { stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId, - pMeta->fatalInfo.ts, tstrerror(code)); + tstrerror(pMeta->fatalInfo.code), pMeta->fatalInfo.ts, tstrerror(code)); } } From b4277e0e6532023e9b9eb2bc5b1d07f01cbc67f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 16:15:11 +0800 Subject: [PATCH 4/4] refactor:do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamCheckStatus.c | 11 +- source/libs/stream/src/streamCheckpoint.c | 179 +++++++++------- source/libs/stream/src/streamDispatch.c | 214 +++++++++++--------- source/libs/stream/src/streamHb.c | 6 +- source/libs/stream/src/streamSched.c | 34 ++-- source/libs/stream/src/streamStartHistory.c | 21 +- source/libs/stream/src/streamTaskSm.c | 4 +- source/libs/stream/src/streamTimer.c | 18 +- 9 files changed, 274 insertions(+), 215 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 29759bc561..cb10aeb6a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -801,7 +801,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); -void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, +void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId, const char* pMsg); void streamTmrStop(tmr_h tmrId); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 76e74db33f..2688617823 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -303,13 +303,8 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref); - - if (pInfo->checkRspTmr == NULL) { - pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); - } else { - streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, - "check-status-monitor"); - } + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, + "check-status-monitor"); streamMutexUnlock(&pInfo->checkInfoLock); } @@ -860,7 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) { handleTimeoutDownstreamTasks(pTask, pTimeoutList); } - streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, "check-status-monitor"); streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a8b5c1028..769116264d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -347,12 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - - if (pTmrInfo->tmrHandle == NULL) { - pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer); - } else { - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - } + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "trigger-recv-monitor"); pTmrInfo->launchChkptId = pActiveInfo->activeId; } else { // already launched, do nothing stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); @@ -893,48 +889,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -void checkpointTriggerMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - int64_t now = taosGetTimestampMs(); - const char* id = pTask->id.idStr; - +static int32_t doChkptStatusCheck(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; - - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // check the status every 100ms - if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (++pTmrInfo->activeCounter < 50) { - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - return; - } - - pTmrInfo->activeCounter = 0; - stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); - - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { @@ -942,48 +901,44 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - streamMutexUnlock(&pTask->lock); - - streamMutexLock(&pActiveInfo->lock); - - // send msg to retrieve checkpoint trigger msg - SArray* pList = pTask->upstreamInfo.pList; - SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); - if (pNotSendList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); - streamMutexUnlock(&pActiveInfo->lock); - - stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - return; +// streamMutexUnlock(&pTask->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; } if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; +// streamMutexUnlock(&pActiveInfo->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; +// streamMutexUnlock(&pActiveInfo->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; + } + + return 0; +} + +static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) { + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); + if (pNotSendList == NULL) { + stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + return terrno; } for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { @@ -1007,13 +962,87 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { void* px = taosArrayPush(pNotSendList, pInfo); if (px == NULL) { stError("s-task:%s failed to record not send info, code: out of memory", id); + taosArrayDestroy(pNotSendList); + return terrno; } } } + *ppNotSendList = pNotSendList; + return 0; +} + +void checkpointTriggerMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (++pTmrInfo->activeCounter < 50) { + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + return; + } + + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); + + streamMutexLock(&pTask->lock); + SStreamTaskState state = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + + if (state.state != TASK_STATUS__CK) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, + vgId, state.name, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamMutexLock(&pActiveInfo->lock); + + int32_t code = doChkptStatusCheck(pTask); + if (code) { + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // send msg to retrieve checkpoint trigger msg + SArray* pList = pTask->upstreamInfo.pList; + SArray* pNotSendList = NULL; + + code = doFindNotSendUpstream(pTask, pList, &pNotSendList); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + + taosArrayDestroy(pNotSendList); + return; + } + // do send retrieve checkpoint trigger msg to upstream int32_t size = taosArrayGetSize(pNotSendList); - int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + code = doSendRetrieveTriggerMsg(pTask, pNotSendList); if (code) { stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); } @@ -1023,7 +1052,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check every 100ms if (size > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5f7b8dd39e..0f0015c7d9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -634,12 +634,8 @@ static void doMonitorDispatchData(void* param, void* tmrId) { void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { int32_t vgId = pTask->pMeta->vgId; - if (pTask->msgInfo.pRetryTmr != NULL) { - streamTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, - "dispatch-monitor-tmr"); - } else { - pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer); - } + streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, + "dispatch-monitor"); } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId, @@ -888,77 +884,42 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 return TSDB_CODE_SUCCESS; } -static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - const char* id = pTask->id.idStr; +static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; - // check the status every 100ms - if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (++pTmrInfo->activeCounter < 50) { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "chkpt-ready-monitor"); - return; - } - - pTmrInfo->activeCounter = 0; - stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); - - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, - pState.name, ref); - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - streamMutexUnlock(&pTask->lock); - - streamMutexLock(&pActiveInfo->lock); - - SArray* pList = pActiveInfo->pReadyMsgList; - int32_t num = taosArrayGetSize(pList); if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return -1; } - SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); - if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id, vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref); + return -1; + } - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return 0; +} + +static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level, + const char* id) { + SArray* pTmp = taosArrayInit(4, sizeof(int32_t)); + if (pTmp == NULL) { + return terrno; } for (int32_t i = 0; i < num; ++i) { @@ -971,63 +932,138 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { continue; } - void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId); + void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); if (p == NULL) { stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + return terrno; } else { stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, - pTask->info.taskLevel, pInfo->upstreamTaskId); + level, pInfo->upstreamTaskId); } } - int32_t checkpointId = pActiveInfo->activeId; + *ppNotRspList = pTmp; + return 0; +} - int32_t notRsp = taosArrayGetSize(pNotRspList); - if (notRsp > 0) { // send checkpoint-ready msg again - for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { - int32_t* pTaskId = taosArrayGet(pNotRspList, i); - if (pTaskId == NULL) { +static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t checkpointId, SArray* pReadyList) { + int32_t code = 0; + int32_t num = taosArrayGetSize(pReadyList); + const char* id = pTask->id.idStr; + + for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { + int32_t* pTaskId = taosArrayGet(pNotRspList, i); + if (pTaskId == NULL) { + continue; + } + + for (int32_t j = 0; j < num; ++j) { + STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pReadyList, j); + if (pReadyInfo == NULL) { continue; } - for (int32_t j = 0; j < num; ++j) { - STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j); - if (pReadyInfo == NULL) { - continue; - } + if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again - if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again - - SRpcMsg msg = {0}; - int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, - pReadyInfo->childId, checkpointId, &msg); + SRpcMsg msg = {0}; + code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, + pReadyInfo->childId, checkpointId, &msg); + if (code == TSDB_CODE_SUCCESS) { + code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg); if (code == TSDB_CODE_SUCCESS) { - code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg); - if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel, - pReadyInfo->upstreamTaskId); - } else { - stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); - } + stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel, + pReadyInfo->upstreamTaskId); } else { - stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); + stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); } + } else { + stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); } } } + } +} - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, +static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + SArray* pNotRspList = NULL; + + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (++pTmrInfo->activeCounter < 50) { + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "chkpt-ready-monitor"); + return; + } + + // reset tmr + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); + + streamMutexLock(&pTask->lock); + SStreamTaskState state = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + + // 1. check status in the first place + if (state.state != TASK_STATUS__CK) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId, + state.name, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamMutexLock(&pActiveInfo->lock); + + SArray* pList = pActiveInfo->pReadyMsgList; + int32_t num = taosArrayGetSize(pList); + int32_t code = doTaskChkptStatusCheck(pTask, num); + if (code) { + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, + tstrerror(code), ref); + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + + taosArrayDestroy(pNotRspList); + return; + } + + int32_t checkpointId = pActiveInfo->activeId; + int32_t notRsp = taosArrayGetSize(pNotRspList); + doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + + if (notRsp > 0) { // send checkpoint-ready msg again + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); streamMutexUnlock(&pActiveInfo->lock); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug( - "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " - "and quit from timer, ref:%d", + "s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit " + "from timer, ref:%d", id, vgId, ref); streamClearChkptReadyMsg(pActiveInfo); streamMutexUnlock(&pActiveInfo->lock); + // release should be the last execution, since pTask may be destroy after it immidiately. streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1085,12 +1121,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - if (pTmrInfo->tmrHandle == NULL) { - pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer); - } else { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "chkpt-ready-monitor"); - } + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "chkpt-ready-monitor"); // mark the timer monitor checkpointId pTmrInfo->launchChkptId = pActiveInfo->activeId; diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 703e6a3256..72d2e89936 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -279,7 +279,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, "meta-hb-tmr"); code = taosReleaseRef(streamMetaId, rid); @@ -301,7 +301,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } streamMetaRUnLock(pMeta); - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); code = taosReleaseRef(streamMetaId, rid); @@ -317,7 +317,7 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { return terrno; } - pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb"); pInfo->tickCounter = 0; pInfo->msgSendTs = -1; pInfo->hbCount = 0; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 2d54547aa2..63e24b0975 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -20,13 +20,14 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { + int64_t delaySchema = pTask->info.delaySchedParam; + if (delaySchema != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, pTask->info.delaySchedParam); - pTask->schedInfo.pDelayTimer = - taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); + streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, + pTask->pMeta->vgId, "sched-tmr"); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } } @@ -76,13 +77,8 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { // add one ref count for task streamMetaAcquireOneTask(pTask); - - if (pTask->schedInfo.pIdleTimer == NULL) { - pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer); - } else { - streamTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, - &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr"); - } + streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, + pTask->pMeta->vgId, "resume-task-tmr"); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -125,7 +121,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { - stDebug("s-task:%s jump out of schedTimer", id); + stDebug("s-task:%s should stop, jump out of schedTimer", id); return; } @@ -139,9 +135,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (code) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); terrno = code; - return; + goto _end; } pTrigger->type = STREAM_INPUT__GET_RES; @@ -149,10 +144,9 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (pTrigger->pBlock == NULL) { taosFreeQitem(pTrigger); - stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + stError("s-task:%s failed to build retrieve data trigger, code:out of memory, try again in %dms", id, nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); - return; + goto _end; } atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); @@ -160,8 +154,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); - return; + stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code)); + goto _end; } code = streamTrySchedExec(pTask); @@ -171,5 +165,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } } - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); +_end: + streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + "sched-run-tmr"); } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 21f0168434..1290aef6a3 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -80,6 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { } void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { + int32_t vgId = pTask->pMeta->vgId; int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { numOfTicks = 1; @@ -100,14 +101,8 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref); - - if (pTask->schedHistoryInfo.pTimer == NULL) { - pTask->schedHistoryInfo.pTimer = - taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); - } else { - streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); - } + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, + &pTask->schedHistoryInfo.pTimer, vgId, "history-task"); } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { @@ -337,7 +332,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); - streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, + streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } } @@ -391,7 +386,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { pHTaskInfo->tickCount -= 1; if (pHTaskInfo->tickCount > 0) { - streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, + streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); streamMetaReleaseTask(pMeta, pTask); return; @@ -519,7 +514,7 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { } stDebug("s-task:%s set timer active flag, task timer not null", idStr); - streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, + streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } @@ -621,8 +616,8 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { // release the task. streamMetaReleaseTask(pTask->pMeta, pTask); } else { - streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, + &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 06286479a3..3709c4dfbd 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -491,8 +491,8 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; - const char* id = pTask->id.idStr; - int32_t code = 0; + const char* id = pTask->id.idStr; + int32_t code = 0; // do update the task status streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 5e12f51c9d..8b77fe7cb1 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -40,11 +40,23 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) { return TSDB_CODE_SUCCESS; } -void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, +void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId, const char* pMsg) { - bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); - if (ret) { + if (*pTmrId == NULL) { + *pTmrId = taosTmrStart(fp, mseconds, pParam, pHandle); + if (*pTmrId == NULL) { + stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno)); + return; + } + } else { + bool ret = taosTmrReset(fp, mseconds, pParam, pHandle, pTmrId); + if (ret) { + stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno)); + return; + } } + + stDebug("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) {