diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index f389bc1a61..3bf22ec339 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -343,7 +343,6 @@ TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8 TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 50458d684f..dc145819ca 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -23,8 +23,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index be11d04ff8..f78b7a3126 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -210,7 +210,6 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); -int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7ff47d2d59..9b3ce36bdd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -628,17 +628,6 @@ typedef struct { int8_t igUntreated; } SStreamScanHistoryReq; -typedef struct { - int64_t streamId; - int32_t upstreamTaskId; - int32_t downstreamTaskId; - int32_t upstreamNodeId; - int32_t childId; -} SStreamScanHistoryFinishReq; - -int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq); -int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq); - // mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished typedef struct { int64_t streamId; @@ -713,17 +702,6 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); void streamMetaClearHbMsg(SStreamHbMsg* pMsg); -typedef struct { - int64_t streamId; - int32_t upstreamTaskId; - int32_t upstreamNodeId; - int32_t downstreamId; - int32_t downstreamNode; -} SStreamCompleteHistoryMsg; - -int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); -int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); - typedef struct SNodeUpdateInfo { int32_t nodeId; SEpSet prevEp; @@ -820,7 +798,6 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); -int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); void streamTaskRestoreStatus(SStreamTask* pTask); @@ -829,7 +806,6 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* SRpcHandleInfo* pRpcInfo, int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); -int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); @@ -859,11 +835,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st); -int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); - -// agg level -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo); -int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta void streamMetaInit(); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index a1af11f2ec..1488df3cb1 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -86,8 +86,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 6781947849..a2f0b7aced 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -828,8 +828,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8dfd03622f..b056d561c7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -707,13 +707,6 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); void tFreeStreamObj(SStreamObj* pObj); -// typedef struct { -// char streamName[TSDB_STREAM_FNAME_LEN]; -// int64_t uid; -// int64_t streamUid; -// SArray* childInfo; // SArray -// } SStreamCheckpointObj; - #define VIEW_TYPE_UPDATABLE (1 << 0) #define VIEW_TYPE_MATERIALIZED (1 << 1) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 871e12c5e6..92035101f6 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -33,6 +33,11 @@ typedef struct SStreamTransInfo { int32_t transId; } SStreamTransInfo; +typedef struct SVgroupChangeInfo { + SHashObj *pDBMap; + SArray *pUpdateNodeList; // SArray +} SVgroupChangeInfo; + // time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard // to avoid too many checkpoints for a taskk in the waiting list typedef struct SCheckpointCandEntry { @@ -94,25 +99,27 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid); // for sma // TODO refactor -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); - +int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); -void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode); +int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode); STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); + SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t initStreamNodeList(SMnode *pMnode); -int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); -int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); +int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); + #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5e03ec6447..7b348172f2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -29,10 +29,9 @@ #define MND_STREAM_MAX_NUM 60 -typedef struct SVgroupChangeInfo { - SHashObj *pDBMap; - SArray *pUpdateNodeList; // SArray -} SVgroupChangeInfo; +typedef struct SMStreamNodeCheckMsg { + int8_t placeHolder; // // to fix windows compile error, define place holder +} SMStreamNodeCheckMsg; static int32_t mndNodeCheckSentinel = 0; SStreamExecInfo execInfo; @@ -60,14 +59,11 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); -static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); - -static void freeCheckpointCandEntry(void *); -static void freeTaskList(void *param); - +static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); +static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); +static void freeCheckpointCandEntry(void *); +static void freeTaskList(void *param); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -470,10 +466,8 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - STransAction action = {0}; - action.mTraceId = pTrans->mTraceId; - initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); + if (code != 0) { taosMemoryFree(buf); return -1; } @@ -614,8 +608,6 @@ _OVER: return -1; } - - static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { @@ -627,24 +619,17 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - STransAction action = {0}; - SEpSet epset = {0}; - bool hasEpset = false; - + SEpSet epset = {0}; + bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction terrno = code; return -1; } - // no valid epset, return directly without redoAction - if (!hasEpset) { - return TSDB_CODE_SUCCESS; - } - // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + if (code != 0) { taosMemoryFree(pReq); return -1; } @@ -752,17 +737,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_CREATE_NAME); + STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); if (pTrans == NULL) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); - goto _OVER; - } - - mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); - - mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndTransDrop(pTrans); goto _OVER; } @@ -808,6 +784,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); taosThreadMutexLock(&execInfo.lock); + mDebug("stream tasks register into node list"); saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); @@ -940,20 +917,14 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, MND_STREAM_CHECKPOINT_NAME); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream"); if (pTrans == NULL) { - return -1; - } - - mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTrancCheckConflict(pMnode, pTrans) != 0) { mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); goto _ERR; } + mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid); mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId); taosWLockLatch(&pStream->lock); @@ -970,27 +941,26 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - if (pVgObj == NULL) { - taosWUnLockLatch(&pStream->lock); - goto _ERR; - } - void *buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId, pTrans->id, mndTrigger) < 0) { - mndReleaseVgroup(pMnode, pVgObj); + pTask->id.taskId, pTrans->id, mndTrigger) < 0) { taosWUnLockLatch(&pStream->lock); goto _ERR; } - STransAction act = {0}; - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + SEpSet epset = {0}; + bool hasEpset = false; + code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(buf); + taosWUnLockLatch(&pStream->lock); + goto _ERR; + } - initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); - if (mndTransAppendRedoAction(pTrans, &act) != 0) { + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, + TSDB_CODE_SYN_PROPOSE_NOT_READY); + if (code != 0) { taosMemoryFree(buf); taosWUnLockLatch(&pStream->lock); goto _ERR; @@ -1219,7 +1189,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME); + STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1227,16 +1197,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - tFreeMDropStreamReq(&dropReq); - return -1; - } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid); // drop all tasks @@ -1563,18 +1523,6 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS return TSDB_CODE_SUCCESS; } -static int32_t getNumOfTasks(SArray *pTaskList) { - int32_t numOfLevels = taosArrayGetSize(pTaskList); - - int32_t count = 0; - for (int32_t i = 0; i < numOfLevels; i++) { - SArray *pLevel = taosArrayGetP(pTaskList, i); - count += taosArrayGetSize(pLevel); - } - - return count; -} - static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1590,7 +1538,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // lock taosRLockLatch(&pStream->lock); - int32_t count = getNumOfTasks(pStream->tasks); + int32_t count = mndGetNumOfStreamTasks(pStream); if (numOfRows + count > rowsCapacity) { blockDataEnsureCapacity(pBlock, numOfRows + count); } @@ -1623,21 +1571,6 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) { - taosWLockLatch(&pStream->lock); - pStream->status = status; - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - - taosWUnLockLatch(&pStream->lock); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - return 0; -} - static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -1652,9 +1585,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + mInfo("stream:%s, not exist, not pause stream", pauseReq.name); return 0; } else { + mError("stream:%s not exist, failed to pause stream", pauseReq.name); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } @@ -1683,26 +1617,17 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_PAUSE_NAME); + STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream"); if (pTrans == NULL) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } - mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); // if nodeUpdate happened, not send pause trans - if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1710,12 +1635,18 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } // pause stream - if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__PAUSE) < 0) { + taosWLockLatch(&pStream->lock); + pStream->status = STREAM_STATUS__PAUSE; + if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) { + taosWUnLockLatch(&pStream->lock); + sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } + taosWUnLockLatch(&pStream->lock); + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1743,10 +1674,11 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", pauseReq.name); + mInfo("stream:%s not exist, not resume stream", pauseReq.name); sdbRelease(pMnode->pSdb, pStream); return 0; } else { + mError("stream:%s not exist, failed to resume stream", pauseReq.name); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } @@ -1769,26 +1701,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME); + STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream"); if (pTrans == NULL) { mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } - mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); - // resume all tasks - if (mndResumeStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { + // set the resume action + if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1796,12 +1719,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // resume stream - if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__NORMAL) < 0) { + taosWLockLatch(&pStream->lock); + pStream->status = STREAM_STATUS__NORMAL; + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) { + taosWUnLockLatch(&pStream->lock); + sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } + taosWUnLockLatch(&pStream->lock); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1815,91 +1743,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, - int32_t transId) { - pMsg->streamId = pId->streamId; - pMsg->taskId = pId->taskId; - pMsg->transId = transId; - pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); - taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); -} - -static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, - SStreamTaskId *pId, int32_t transId) { - SStreamTaskNodeUpdateMsg req = {0}; - initNodeUpdateMsg(&req, pInfo, pId, transId); - - int32_t code = 0; - int32_t blen; - - tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); - return -1; - } - - int32_t tlen = sizeof(SMsgHead) + blen; - - void *buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); - return -1; - } - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - tEncodeStreamTaskUpdateMsg(&encoder, &req); - - SMsgHead *pMsgHead = (SMsgHead *)buf; - pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(nodeId); - - tEncoderClear(&encoder); - - *pBuf = buf; - *pLen = tlen; - - taosArrayDestroy(req.pNodeList); - return TSDB_CODE_SUCCESS; -} - -// todo extract method: traverse stream tasks -// build trans to update the epset -static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { - mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid); - - taosWLockLatch(&pStream->lock); - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - - void *pBuf = NULL; - int32_t len = 0; - streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); - - STransAction action = {0}; - initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pBuf); - taosWUnLockLatch(&pStream->lock); - return -1; - } - } - } - - taosWUnLockLatch(&pStream->lock); - return 0; -} - static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); const SEp *p = GET_ACTIVE_EP(pCurrent); @@ -1997,7 +1840,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return terrno; } - mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); + mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); } void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); @@ -2011,7 +1854,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); - int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans); + int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again if (code != TSDB_CODE_SUCCESS) { @@ -2171,26 +2014,6 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { return 0; } -// kill all trans in the dst DB -static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { - mDebug("start to clear checkpoints in all Dbs"); - - void *pIter = NULL; - while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { - char *pDb = (char *)pIter; - - size_t len = 0; - void *pKey = taosHashGetKey(pDb, &len); - char *p = strndup(pKey, len); - - mDebug("clear checkpoint trans in Db:%s", p); - doKillCheckpointTrans(pMnode, pKey, len); - taosMemoryFree(p); - } - - mDebug("complete clear checkpoints in Dbs"); -} - // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; @@ -2260,10 +2083,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } -typedef struct SMStreamNodeCheckMsg { - int8_t placeHolder; // // to fix windows compile error, define place holder -} SMStreamNodeCheckMsg; - static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -2335,26 +2154,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { - // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); - if (pTransInfo == NULL) { - return TSDB_CODE_SUCCESS; - } - - // not checkpoint trans, ignore - if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { - mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); - return TSDB_CODE_SUCCESS; - } - - char *pDupDBName = strndup(pDBName, len); - mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); - taosMemoryFree(pDupDBName); - - return TSDB_CODE_SUCCESS; -} - void freeCheckpointCandEntry(void *param) { SCheckpointCandEntry *pEntry = param; taosMemoryFreeClear(pEntry->pName); @@ -2402,8 +2201,15 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + if (pStream == NULL) { + mError("failed to find the stream:0x%"PRIx64" not handle the checkpoint req", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } + + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 3fe736926b..e4599edbd4 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -65,6 +65,8 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) return terrno; } + /*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); + taosWLockLatch(&pStream->lock); int32_t numOfLevels = taosArrayGetSize(pStream->tasks); @@ -92,19 +94,13 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) SEpSet epset = {0}; bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || !hasEpset) { taosMemoryFree(pReq); continue; } - if (!hasEpset) { - taosMemoryFree(pReq); - continue; - } - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); + if (code != 0) { taosMemoryFree(pReq); taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 959f69944c..0a7397827e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -169,7 +169,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const return NULL; } - mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg); + mInfo("s-task:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -255,11 +255,132 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) return 0; } -void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode) { - pAction->epSet = *pEpset; - pAction->contLen = contLen; - pAction->pCont = pCont; - pAction->msgType = msgType; - pAction->retryCode = retryCode; +int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode) { + STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode}; + return mndTransAppendRedoAction(pTrans, &action); +} + +int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { + // data in the hash table will be removed automatically, no need to remove it here. + SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); + if (pTransInfo == NULL) { + return TSDB_CODE_SUCCESS; + } + + // not checkpoint trans, ignore + if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { + mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); + return TSDB_CODE_SUCCESS; + } + + char *pDupDBName = strndup(pDBName, len); + mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); + taosMemoryFree(pDupDBName); + + return TSDB_CODE_SUCCESS; +} + +// kill all trans in the dst DB +void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { + mDebug("start to clear checkpoints in all Dbs"); + + void *pIter = NULL; + while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { + char *pDb = (char *)pIter; + + size_t len = 0; + void *pKey = taosHashGetKey(pDb, &len); + char *p = strndup(pKey, len); + + mDebug("clear checkpoint trans in Db:%s", p); + doKillCheckpointTrans(pMnode, pKey, len); + taosMemoryFree(p); + } + + mDebug("complete clear checkpoints in Dbs"); +} + +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, + int32_t transId) { + pMsg->streamId = pId->streamId; + pMsg->taskId = pId->taskId; + pMsg->transId = transId; + pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); + taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); +} + +static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, + SStreamTaskId *pId, int32_t transId) { + SStreamTaskNodeUpdateMsg req = {0}; + initNodeUpdateMsg(&req, pInfo, pId, transId); + + int32_t code = 0; + int32_t blen; + + tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(req.pNodeList); + return -1; + } + + int32_t tlen = sizeof(SMsgHead) + blen; + + void *buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(req.pNodeList); + return -1; + } + + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeStreamTaskUpdateMsg(&encoder, &req); + + SMsgHead *pMsgHead = (SMsgHead *)buf; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(nodeId); + + tEncoderClear(&encoder); + + *pBuf = buf; + *pLen = tlen; + + taosArrayDestroy(req.pNodeList); + return TSDB_CODE_SUCCESS; +} + +// todo extract method: traverse stream tasks +// build trans to update the epset +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { + mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); + + taosWLockLatch(&pStream->lock); + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + + void *pBuf = NULL; + int32_t len = 0; + streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); + doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); + + int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBuf); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + } + + taosWUnLockLatch(&pStream->lock); + return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index b8bd323fa3..2ee73528e0 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -143,7 +143,7 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t } } -static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { +static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), @@ -160,15 +160,14 @@ static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *p SEpSet epset = {0}; bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { terrno = code; taosMemoryFree(pReq); return -1; } - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); + if (code != 0) { taosMemoryFree(pReq); return -1; } @@ -201,14 +200,14 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { return num; } -int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { + if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { return -1; } @@ -220,7 +219,7 @@ int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream return 0; } -static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { +static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), @@ -233,32 +232,25 @@ static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pT pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SEpSet epset = {0}; - mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps); + SEpSet epset = {0}; bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || !hasEpset) { terrno = code; taosMemoryFree(pReq); - return -1; + return code; } - // no valid epset, return directly without redoAction - if (!hasEpset) { - taosMemoryFree(pReq); - return TSDB_CODE_SUCCESS; - } - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps); + code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); + if (code != 0) { taosMemoryFree(pReq); return -1; } return 0; } -int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { +int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { SArray *tasks = pStream->tasks; int32_t size = taosArrayGetSize(tasks); @@ -267,7 +259,8 @@ int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) { + + if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { return -1; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4284b0838e..f173c327c7 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -171,10 +171,6 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_RSP: // 1036 break; - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return tqStreamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqStreamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK: return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 38c3441d43..23f79158c3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -267,8 +267,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2e947e4a4c..a689932754 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1043,15 +1043,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return code; } -// only the agg tasks and the sink tasks will receive this message from upstream tasks -int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg); -} - -int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg); -} - int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index cdb5cc26f8..280c110711 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); -static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); +static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. @@ -300,21 +300,21 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { return true; } -bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { +bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; - while(1) { + while (1) { if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { *numOfItems += numOfNewItems; return numOfNewItems > 0; } SStreamQueueItem* pItem = NULL; - int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); + int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); + bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); if (itemInFillhistory) { numOfNewItems += 1; } @@ -334,7 +334,9 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num break; } } else { - tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); + walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); + tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, + pTask->chkInfo.nextProcessVer); break; } } @@ -399,7 +401,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems); + bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems); taosThreadMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index ac1818f877..21bc09eba0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -328,74 +328,6 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return 0; } -int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamScanHistoryFinishReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId, - req.downstreamTaskId); - return -1; - } - - tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); - - int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t code = TSDB_CODE_SUCCESS; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamCompleteHistoryMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeCompleteHistoryDataMsg(&decoder, &req); - tDecoderClear(&decoder); - - if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower", - req.upstreamTaskId, pMeta->vgId); - return TASK_DOWNSTREAM_NOT_LEADER; - } - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId, - req.upstreamTaskId); - return -1; - } - - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); - if (remain > 0) { - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", - pTask->id.idStr, req.downstreamId, remain); - } else { - tqDebug( - "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", - pTask->id.idStr, req.downstreamId); - code = streamProcessScanHistoryFinishRsp(pTask); - } - - streamMetaReleaseTask(pMeta, pTask); - return code; -} - int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7e470bafbe..3ec6adee41 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -789,10 +789,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); default: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e872c87237..3472f4e14d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1027,57 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { return 0; } -int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SOperatorInfo* pOperator = pTaskInfo->pRoot; - - while (1) { - int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) { - SStreamEventAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - } - - // iterate operator tree - if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { - return 0; - } else { - pOperator = pOperator->pDownstream[0]; - } - } -} - int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; const char* id = GET_TASKID(pTaskInfo); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index bd5bddcece..0534f2c30c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -123,8 +123,6 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t STaskId streamTaskGetTaskId(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, - int32_t* pLen); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); void streamClearChkptReadyMsg(SStreamTask* pTask); @@ -134,10 +132,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); - -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); -int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); -int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTransferStateToStreamTask(SStreamTask* pTask); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b51845d152..6b7c0fc69a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -34,9 +34,6 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int64_t groupId); -static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet); - static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type); @@ -676,41 +673,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { - SStreamScanHistoryFinishReq req = { - .streamId = pTask->id.streamId, - .childId = pTask->info.selfChildId, - .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->pMeta->vgId, - }; - - // serialize - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - pTask->notReadyTasks = 1; - doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, - &pTask->outputInfo.fixedDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgs = taosArrayGetSize(vgInfo); - pTask->notReadyTasks = numOfVgs; - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", - pTask->id.idStr, numOfVgs, pState->name); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.downstreamTaskId = pVgInfo->taskId; - doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } else { - stDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr); - streamProcessScanHistoryFinishRsp(pTask); - } - - return 0; -} - // this function is usually invoked by sink/agg task int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pReadyMsgList); @@ -782,48 +744,6 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch return 0; } -int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { - if (buf) { - rpcFreeCont(buf); - } - return code; - } - - tEncoderClear(&encoder); - - initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); - - tmsgSendReq(pEpSet, &msg); - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pState->name, - pReq->downstreamTaskId, vgId); - return 0; -} - int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; @@ -989,109 +909,6 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { taosArrayClear(pTask->pReadyMsgList); } -int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, - int32_t* pLen) { - int32_t len = 0; - int32_t code = 0; - SEncoder encoder; - - SStreamCompleteHistoryMsg msg = { - .streamId = pReq->streamId, - .upstreamTaskId = pReq->upstreamTaskId, - .upstreamNodeId = pReq->upstreamNodeId, - .downstreamId = pReq->downstreamTaskId, - .downstreamNode = pTask->pMeta->vgId, - }; - - tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code); - if (code < 0) { - return code; - } - - void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); - if (pBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId); - - void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); - - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeCompleteHistoryDataMsg(&encoder, &msg); - tEncoderClear(&encoder); - - *pBuffer = pBuf; - *pLen = len; - return 0; -} - -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { - void* pBuf = NULL; - int32_t len = 0; - - streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); - - SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet}; - initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); - info.msg.info = *pRpcInfo; - - taosThreadMutexLock(&pTask->lock); - - if (pTask->pRspMsgList == NULL) { - pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); - } - taosArrayPush(pTask->pRspMsgList, &info); - taosThreadMutexUnlock(&pTask->lock); - - int32_t num = taosArrayGetSize(pTask->pRspMsgList); - stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, - num); - return TSDB_CODE_SUCCESS; -} - -int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); - - const char* id = pTask->id.idStr; - int32_t level = pTask->info.taskLevel; - - int32_t num = taosArrayGetSize(pTask->pRspMsgList); - for (int32_t i = 0; i < num; ++i) { - SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); - tmsgSendRsp(&pInfo->msg); - - stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId); - } - - taosArrayClear(pTask->pRspMsgList); - stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", id, level, num); - return 0; -} - // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 140a22ee73..20fdcff7d9 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -592,108 +592,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) { - return -1; - } - - if (qStreamRecoverFinish(exec) < 0) { - return -1; - } - return 0; -} - -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, - SRpcHandleInfo* pRpcInfo) { - int32_t taskLevel = pTask->info.taskLevel; - ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); - - const char* id = pTask->id.idStr; - SStreamTaskState* p = streamTaskGetStatus(pTask); - - if (p->state != TASK_STATUS__SCAN_HISTORY) { - stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, - p->name, pReq->upstreamTaskId); - - void* pBuf = NULL; - int32_t len = 0; - streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); - - SRpcMsg msg = {.info = *pRpcInfo}; - initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); - - tmsgSendRsp(&msg); - stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel, - pReq->upstreamTaskId, pReq->upstreamNodeId); - return 0; - } - - // sink tasks do not send end of scan history msg to its upstream, which is agg task. - streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq); - - int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); - ASSERT(left >= 0); - - if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList); - if (taskLevel == TASK_LEVEL__AGG) { - stDebug( - "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing " - "and send rsp to all upstream tasks", - id, numOfTasks); - streamAggUpstreamScanHistoryFinish(pTask); - } else { - stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id, - numOfTasks); - } - - // all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data - // from the WAL files, which contains the real time stream data. - streamNotifyUpstreamContinue(pTask); - - // mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks. - if (taskLevel == TASK_LEVEL__AGG) { - /*int32_t code = */ streamTaskScanHistoryDataComplete(pTask); - } else { // for sink task, set normal - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - } - } else { - stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id, - pReq->upstreamTaskId, pReq->childId, left); - } - - return 0; -} - -int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { - ETaskStatus status = streamTaskGetStatus(pTask)->state; - - // task restart now, not handle the scan-history finish rsp - if (status == TASK_STATUS__UNINIT) { - return TSDB_CODE_INVALID_MSG; - } - - ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/); - SStreamMeta* pMeta = pTask->pMeta; - - // execute in the scan history complete call back msg, ready to process data from inputQ - int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - streamTaskSetSchedStatusInactive(pTask); - - streamMetaWLock(pMeta); - streamMetaSaveTask(pMeta, pTask); - streamMetaCommit(pMeta); - streamMetaWUnLock(pMeta); - - // for source tasks, let's continue execute. - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamSchedExec(pTask); - } - - return TSDB_CODE_SUCCESS; -} - static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { SDataRange* pRange = &pHTask->dataRange; @@ -946,29 +844,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } } -int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) { - return 0; - } - - // restore param - int32_t code = 0; - if (pTask->info.fillHistory) { - code = streamRestoreParam(pTask); - if (code < 0) { - return -1; - } - } - - // dispatch scan-history finish req to all related downstream task - code = streamDispatchScanHistoryFinishMsg(pTask); - if (code < 0) { - return -1; - } - - return 0; -} - int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamInfoResetTimewindowFilter(exec); @@ -1072,28 +947,6 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint return 0; } -int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index c4885747d1..61cb770a10 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -139,7 +139,7 @@ class TDCom: self.stream_suffix = "_stream" self.range_count = 5 self.default_interval = 5 - self.stream_timeout = 12 + self.stream_timeout = 60 self.create_stream_sleep = 0.5 self.record_history_ts = str() self.precision = "ms" @@ -1688,8 +1688,8 @@ class TDCom: res1 = self.round_handle(res1) res2 = self.round_handle(res2) if latency < self.stream_timeout: - latency += 0.2 - time.sleep(0.2) + latency += 0.5 + time.sleep(0.5) else: if latency == 0: return False