From 0f28cd1d481aa2ac97fd9453a06ddb5ed8556e07 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Sep 2024 13:53:53 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 135 +++++++++++++--------- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamHb.c | 4 +- 3 files changed, 82 insertions(+), 59 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 699774ed52..fd5591c488 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -120,38 +120,39 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + bool unQualified = false; + const char* id = pTask->id.idStr; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr); + stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id); return TSDB_CODE_INVALID_MSG; } if (pRsp->rspCode != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, - pRsp->upstreamTaskId, tstrerror(pRsp->rspCode)); + stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId, + tstrerror(pRsp->rspCode)); return TSDB_CODE_SUCCESS; } streamMutexLock(&pTask->lock); SStreamTaskState status = streamTaskGetStatus(pTask); - if (status.state != TASK_STATUS__CK) { - stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name); - streamMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - streamMutexUnlock(&pTask->lock); - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - streamMutexLock(&pInfo->lock); - if (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId) { - stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name); - - streamMutexUnlock(&pInfo->lock); + if (status.state != TASK_STATUS__CK) { + stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } + streamMutexLock(&pInfo->lock); + unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId); streamMutexUnlock(&pInfo->lock); + if (unQualified) { + stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions. int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, pRsp->upstreamTaskId); @@ -963,11 +964,44 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } +static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, SArray* pNotSendList) { + const char* id = pTask->id.idStr; + SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + int32_t vgId = pTask->pMeta->vgId; + + int32_t code = doChkptStatusCheck(pTask); + if (code) { + return code; + } + + code = doFindNotSendUpstream(pTask, pList, &pNotSendList); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); + return code; + } + + // do send retrieve checkpoint trigger msg to upstream + code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + if (code) { + stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); + code = 0; + } + + return code; +} + void checkpointTriggerMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); const char* id = pTask->id.idStr; + SArray* pNotSendList = NULL; + SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg + int32_t code = 0; + int32_t numOfNotSend = 0; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; @@ -1008,40 +1042,18 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); + code = chkptTriggerRecvMonitorHelper(pTask, pNotSendList); + streamMutexUnlock(&pActiveInfo->lock); - int32_t code = doChkptStatusCheck(pTask); - if (code) { - streamMutexUnlock(&pActiveInfo->lock); + if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // send msg to retrieve checkpoint trigger msg - SArray* pList = pTask->upstreamInfo.pList; - SArray* pNotSendList = NULL; - - code = doFindNotSendUpstream(pTask, pList, &pNotSendList); - if (code) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); - streamMutexUnlock(&pActiveInfo->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - taosArrayDestroy(pNotSendList); return; } - // do send retrieve checkpoint trigger msg to upstream - int32_t size = taosArrayGetSize(pNotSendList); - code = doSendRetrieveTriggerMsg(pTask, pNotSendList); - if (code) { - stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); - } - - streamMutexUnlock(&pActiveInfo->lock); - // check every 100ms - if (size > 0) { + numOfNotSend = taosArrayGetSize(pNotSendList); + if (numOfNotSend > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); } else { @@ -1106,19 +1118,13 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { return code; } -bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) { +static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) { int64_t now = taosGetTimestampMs(); const char* id = pTask->id.idStr; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SStreamTaskState pStatus = streamTaskGetStatus(pTask); - if (pStatus.state != TASK_STATUS__CK) { - return false; - } - - streamMutexLock(&pInfo->lock); if (!pInfo->dispatchTrigger) { - streamMutexUnlock(&pInfo->lock); return false; } @@ -1146,14 +1152,29 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId); } - streamMutexUnlock(&pInfo->lock); return true; } - streamMutexUnlock(&pInfo->lock); return false; } +bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) { + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + SStreamTaskState pStatus = streamTaskGetStatus(pTask); + + if (pStatus.state != TASK_STATUS__CK) { + return false; + } + + streamMutexLock(&pInfo->lock); + bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId); + streamMutexUnlock(&pInfo->lock); + + return send; +} + void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) { *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList); @@ -1169,8 +1190,10 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int64_t now = taosGetTimestampMs(); + int32_t code = 0; streamMutexLock(&pInfo->lock); + pInfo->dispatchTrigger = true; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; @@ -1178,8 +1201,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId}; void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); if (px == NULL) { // pause the stream task, if memory not enough - streamMutexUnlock(&pInfo->lock); - return terrno; + code = terrno; } } else { for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) { @@ -1191,14 +1213,15 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId}; void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p); if (px == NULL) { // pause the stream task, if memory not enough - streamMutexUnlock(&pInfo->lock); - return terrno; + code = terrno; + break; } } } streamMutexUnlock(&pInfo->lock); - return 0; + + return code; } int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 88e40b247b..5cfe835ec2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -780,7 +780,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - int32_t code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); + code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); if (code != 0) { stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code)); } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index dde3595eb3..19391bf7a0 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -42,7 +42,7 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); - for (int k = 0; k < numOfExisted; ++k) { + for (int32_t k = 0; k < numOfExisted; ++k) { if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { return true; } @@ -56,7 +56,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { streamMutexLock(&pTask->lock); int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); - for (int j = 0; j < num; ++j) { + for (int32_t j = 0; j < num; ++j) { SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j); bool exist = existInHbMsg(pMsg, pTaskEpset);