From 34ce872eafa1f9d7d08bdd694cc69046efec668a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 5 Aug 2024 10:16:09 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 11 +++++-- source/libs/stream/src/streamCheckpoint.c | 36 ++++++++++++++-------- source/libs/stream/src/streamData.c | 7 +++-- source/libs/stream/src/streamDispatch.c | 7 ----- 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b56c474ed5..a4c490e9b5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -553,8 +553,15 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) return code; } - tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_INVALID_MSG; + } else { + tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + } code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId); streamMetaReleaseTask(pMeta, pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f7c61b48e3..640e2af94f 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -94,12 +94,17 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i } int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + return TSDB_CODE_INVALID_MSG; + } // todo this status may not be set here. // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); - ASSERT(code == TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr); + return code; + } pTask->chkInfo.pActiveInfo->transId = pReq->transId; pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId; @@ -112,7 +117,10 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { - ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr); + return TSDB_CODE_INVALID_MSG; + } if (pRsp->rspCode != TSDB_CODE_SUCCESS) { stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr, @@ -258,7 +266,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock } if (p->upstreamTaskId == pBlock->srcTaskId) { - ASSERT(p->checkpointId == checkpointId); stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); @@ -320,7 +327,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0); if (pTask->chkInfo.startTs == 0) { pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -410,8 +416,6 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId int32_t notReady = 0; int32_t transId = 0; - ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG)); - // 1. not in checkpoint status now SStreamTaskState pStat = streamTaskGetStatus(pTask); if (pStat.state != TASK_STATUS__CK) { @@ -799,6 +803,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + // check the status every 100ms if (streamTaskShouldStop(pTask)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); @@ -843,7 +854,6 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // send msg to retrieve checkpoint trigger msg SArray* pList = pTask->upstreamInfo.pList; - ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); if (pNotSendList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1085,10 +1095,12 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask); - stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", - pTask->id.idStr, taskId, vgId, numOfConfirmed, total); - - ASSERT(taskId != 0); + if (taskId == 0) { + stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId); + } else { + stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d", + pTask->id.idStr, taskId, vgId, numOfConfirmed, total); + } } static int32_t uploadCheckpointToS3(const char* id, const char* path) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 57e5322e38..eb846b5a92 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -175,9 +175,10 @@ int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit* } void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { - ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); - taosMemoryFree(pDataSubmit->submit.msgStr); - taosFreeQitem(pDataSubmit); + if (pDataSubmit != NULL && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT) { + taosMemoryFree(pDataSubmit->submit.msgStr); + taosFreeQitem(pDataSubmit); + } } int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 255afb44f9..d245548ce5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -96,8 +96,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r int32_t code = 0; void* buf = NULL; int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); - ASSERT(sz > 0); - for (int32_t i = 0; i < sz; i++) { req->reqId = tGenIdPI64(); SStreamUpstreamEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); @@ -107,7 +105,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r tEncodeSize(tEncodeStreamRetrieveReq, req, len, code); if (code != 0) { - ASSERT(0); return code; } @@ -946,8 +943,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList; streamMutexLock(&pTask->chkInfo.pActiveInfo->lock); - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - if (taosArrayGetSize(pList) == 1) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0); tmsgSendRsp(&pInfo->msg); @@ -1122,8 +1117,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId) { - ASSERT(upstreamTaskId != 0); - pReadyInfo->upstreamTaskId = upstreamTaskId; pReadyInfo->upstreamNodeEpset = *pEpset; pReadyInfo->upstreamNodeId = upstreamNodeId;