From ac08f659c36e2d9025561ed9b7ad584e55bbc323 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Jan 2024 16:20:43 +0800 Subject: [PATCH] refactor: remove invalid procedure in fill-history. --- include/common/tmsgdef.h | 1 - include/dnode/vnode/tqCommon.h | 2 - include/libs/executor/executor.h | 1 - include/libs/stream/tstream.h | 29 ---- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 2 - source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 - source/dnode/mnode/impl/inc/mndDef.h | 7 - source/dnode/mnode/impl/src/mndStream.c | 22 ++- source/dnode/snode/src/snode.c | 4 - source/dnode/vnode/src/inc/vnodeInt.h | 2 - source/dnode/vnode/src/tq/tq.c | 9 - source/dnode/vnode/src/tqCommon/tqCommon.c | 68 -------- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 - source/libs/executor/src/executor.c | 51 ------ source/libs/stream/inc/streamInt.h | 7 +- source/libs/stream/src/streamDispatch.c | 183 -------------------- source/libs/stream/src/streamStart.c | 147 ---------------- 17 files changed, 11 insertions(+), 530 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index f389bc1a61..3bf22ec339 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -343,7 +343,6 @@ TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8 TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 50458d684f..dc145819ca 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -23,8 +23,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index be11d04ff8..f78b7a3126 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -210,7 +210,6 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); -int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7ff47d2d59..9b3ce36bdd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -628,17 +628,6 @@ typedef struct { int8_t igUntreated; } SStreamScanHistoryReq; -typedef struct { - int64_t streamId; - int32_t upstreamTaskId; - int32_t downstreamTaskId; - int32_t upstreamNodeId; - int32_t childId; -} SStreamScanHistoryFinishReq; - -int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq); -int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq); - // mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished typedef struct { int64_t streamId; @@ -713,17 +702,6 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); void streamMetaClearHbMsg(SStreamHbMsg* pMsg); -typedef struct { - int64_t streamId; - int32_t upstreamTaskId; - int32_t upstreamNodeId; - int32_t downstreamId; - int32_t downstreamNode; -} SStreamCompleteHistoryMsg; - -int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); -int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); - typedef struct SNodeUpdateInfo { int32_t nodeId; SEpSet prevEp; @@ -820,7 +798,6 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); -int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); void streamTaskRestoreStatus(SStreamTask* pTask); @@ -829,7 +806,6 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* SRpcHandleInfo* pRpcInfo, int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); -int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); @@ -859,11 +835,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st); -int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); - -// agg level -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo); -int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta void streamMetaInit(); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index a1af11f2ec..1488df3cb1 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -86,8 +86,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 6781947849..a2f0b7aced 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -828,8 +828,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8dfd03622f..b056d561c7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -707,13 +707,6 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); void tFreeStreamObj(SStreamObj* pObj); -// typedef struct { -// char streamName[TSDB_STREAM_FNAME_LEN]; -// int64_t uid; -// int64_t streamUid; -// SArray* childInfo; // SArray -// } SStreamCheckpointObj; - #define VIEW_TYPE_UPDATABLE (1 << 0) #define VIEW_TYPE_MATERIALIZED (1 << 1) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3547d61c3d..7b348172f2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -29,6 +29,10 @@ #define MND_STREAM_MAX_NUM 60 +typedef struct SMStreamNodeCheckMsg { + int8_t placeHolder; // // to fix windows compile error, define place holder +} SMStreamNodeCheckMsg; + static int32_t mndNodeCheckSentinel = 0; SStreamExecInfo execInfo; @@ -55,13 +59,11 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); - -static void freeCheckpointCandEntry(void *); -static void freeTaskList(void *param); - +static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); +static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); +static void freeCheckpointCandEntry(void *); +static void freeTaskList(void *param); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -1708,7 +1710,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); - // resume all tasks + // set the resume action if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -2081,10 +2083,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } -typedef struct SMStreamNodeCheckMsg { - int8_t placeHolder; // // to fix windows compile error, define place holder -} SMStreamNodeCheckMsg; - static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4284b0838e..f173c327c7 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -171,10 +171,6 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_RSP: // 1036 break; - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return tqStreamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqStreamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK: return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 38c3441d43..23f79158c3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -267,8 +267,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2e947e4a4c..a689932754 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1043,15 +1043,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return code; } -// only the agg tasks and the sink tasks will receive this message from upstream tasks -int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg); -} - -int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg); -} - int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index ac1818f877..21bc09eba0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -328,74 +328,6 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return 0; } -int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamScanHistoryFinishReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId, - req.downstreamTaskId); - return -1; - } - - tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); - - int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t code = TSDB_CODE_SUCCESS; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamCompleteHistoryMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeCompleteHistoryDataMsg(&decoder, &req); - tDecoderClear(&decoder); - - if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower", - req.upstreamTaskId, pMeta->vgId); - return TASK_DOWNSTREAM_NOT_LEADER; - } - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId, - req.upstreamTaskId); - return -1; - } - - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); - if (remain > 0) { - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", - pTask->id.idStr, req.downstreamId, remain); - } else { - tqDebug( - "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", - pTask->id.idStr, req.downstreamId); - code = streamProcessScanHistoryFinishRsp(pTask); - } - - streamMetaReleaseTask(pMeta, pTask); - return code; -} - int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 98988c5114..176af79a87 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -784,10 +784,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); default: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index eb84cb0639..e1a8e8ea01 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1027,57 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { return 0; } -int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SOperatorInfo* pOperator = pTaskInfo->pRoot; - - while (1) { - int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) { - SStreamEventAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - } - - // iterate operator tree - if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { - return 0; - } else { - pOperator = pOperator->pDownstream[0]; - } - } -} - int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; const char* id = GET_TASKID(pTaskInfo); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 1e4b8996b6..0ab56b23e4 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -123,8 +123,6 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t STaskId streamTaskGetTaskId(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, - int32_t* pLen); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); void streamClearChkptReadyMsg(SStreamTask* pTask); @@ -134,10 +132,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); - -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); -int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); -int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTransferStateToStreamTask(SStreamTask* pTask); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b51845d152..6b7c0fc69a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -34,9 +34,6 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int64_t groupId); -static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet); - static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type); @@ -676,41 +673,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { - SStreamScanHistoryFinishReq req = { - .streamId = pTask->id.streamId, - .childId = pTask->info.selfChildId, - .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->pMeta->vgId, - }; - - // serialize - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - pTask->notReadyTasks = 1; - doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, - &pTask->outputInfo.fixedDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgs = taosArrayGetSize(vgInfo); - pTask->notReadyTasks = numOfVgs; - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", - pTask->id.idStr, numOfVgs, pState->name); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.downstreamTaskId = pVgInfo->taskId; - doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } else { - stDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr); - streamProcessScanHistoryFinishRsp(pTask); - } - - return 0; -} - // this function is usually invoked by sink/agg task int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pReadyMsgList); @@ -782,48 +744,6 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch return 0; } -int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { - if (buf) { - rpcFreeCont(buf); - } - return code; - } - - tEncoderClear(&encoder); - - initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); - - tmsgSendReq(pEpSet, &msg); - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pState->name, - pReq->downstreamTaskId, vgId); - return 0; -} - int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; @@ -989,109 +909,6 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { taosArrayClear(pTask->pReadyMsgList); } -int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, - int32_t* pLen) { - int32_t len = 0; - int32_t code = 0; - SEncoder encoder; - - SStreamCompleteHistoryMsg msg = { - .streamId = pReq->streamId, - .upstreamTaskId = pReq->upstreamTaskId, - .upstreamNodeId = pReq->upstreamNodeId, - .downstreamId = pReq->downstreamTaskId, - .downstreamNode = pTask->pMeta->vgId, - }; - - tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code); - if (code < 0) { - return code; - } - - void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); - if (pBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId); - - void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); - - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeCompleteHistoryDataMsg(&encoder, &msg); - tEncoderClear(&encoder); - - *pBuffer = pBuf; - *pLen = len; - return 0; -} - -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { - void* pBuf = NULL; - int32_t len = 0; - - streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); - - SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet}; - initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); - info.msg.info = *pRpcInfo; - - taosThreadMutexLock(&pTask->lock); - - if (pTask->pRspMsgList == NULL) { - pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); - } - taosArrayPush(pTask->pRspMsgList, &info); - taosThreadMutexUnlock(&pTask->lock); - - int32_t num = taosArrayGetSize(pTask->pRspMsgList); - stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, - num); - return TSDB_CODE_SUCCESS; -} - -int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); - - const char* id = pTask->id.idStr; - int32_t level = pTask->info.taskLevel; - - int32_t num = taosArrayGetSize(pTask->pRspMsgList); - for (int32_t i = 0; i < num; ++i) { - SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); - tmsgSendRsp(&pInfo->msg); - - stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId); - } - - taosArrayClear(pTask->pRspMsgList); - stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", id, level, num); - return 0; -} - // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 140a22ee73..20fdcff7d9 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -592,108 +592,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) { - return -1; - } - - if (qStreamRecoverFinish(exec) < 0) { - return -1; - } - return 0; -} - -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, - SRpcHandleInfo* pRpcInfo) { - int32_t taskLevel = pTask->info.taskLevel; - ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); - - const char* id = pTask->id.idStr; - SStreamTaskState* p = streamTaskGetStatus(pTask); - - if (p->state != TASK_STATUS__SCAN_HISTORY) { - stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, - p->name, pReq->upstreamTaskId); - - void* pBuf = NULL; - int32_t len = 0; - streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); - - SRpcMsg msg = {.info = *pRpcInfo}; - initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); - - tmsgSendRsp(&msg); - stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel, - pReq->upstreamTaskId, pReq->upstreamNodeId); - return 0; - } - - // sink tasks do not send end of scan history msg to its upstream, which is agg task. - streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq); - - int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); - ASSERT(left >= 0); - - if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList); - if (taskLevel == TASK_LEVEL__AGG) { - stDebug( - "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing " - "and send rsp to all upstream tasks", - id, numOfTasks); - streamAggUpstreamScanHistoryFinish(pTask); - } else { - stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id, - numOfTasks); - } - - // all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data - // from the WAL files, which contains the real time stream data. - streamNotifyUpstreamContinue(pTask); - - // mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks. - if (taskLevel == TASK_LEVEL__AGG) { - /*int32_t code = */ streamTaskScanHistoryDataComplete(pTask); - } else { // for sink task, set normal - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - } - } else { - stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id, - pReq->upstreamTaskId, pReq->childId, left); - } - - return 0; -} - -int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { - ETaskStatus status = streamTaskGetStatus(pTask)->state; - - // task restart now, not handle the scan-history finish rsp - if (status == TASK_STATUS__UNINIT) { - return TSDB_CODE_INVALID_MSG; - } - - ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/); - SStreamMeta* pMeta = pTask->pMeta; - - // execute in the scan history complete call back msg, ready to process data from inputQ - int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - streamTaskSetSchedStatusInactive(pTask); - - streamMetaWLock(pMeta); - streamMetaSaveTask(pMeta, pTask); - streamMetaCommit(pMeta); - streamMetaWUnLock(pMeta); - - // for source tasks, let's continue execute. - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamSchedExec(pTask); - } - - return TSDB_CODE_SUCCESS; -} - static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { SDataRange* pRange = &pHTask->dataRange; @@ -946,29 +844,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } } -int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) { - return 0; - } - - // restore param - int32_t code = 0; - if (pTask->info.fillHistory) { - code = streamRestoreParam(pTask); - if (code < 0) { - return -1; - } - } - - // dispatch scan-history finish req to all related downstream task - code = streamDispatchScanHistoryFinishMsg(pTask); - if (code < 0) { - return -1; - } - - return 0; -} - int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamInfoResetTimewindowFilter(exec); @@ -1072,28 +947,6 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint return 0; } -int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange;