From e61aa8359483d62cbe42372e9a8ffe7fcf250216 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Jul 2023 14:07:18 +0800 Subject: [PATCH] enh(stream): handle the stream hb. --- include/libs/stream/tstream.h | 6 +- source/dnode/mnode/impl/src/mndScheduler.c | 53 ++------------- source/dnode/mnode/impl/src/mndStream.c | 75 ++++++++------------- source/libs/stream/src/streamTask.c | 76 ++++++++++++++++++++++ 4 files changed, 114 insertions(+), 96 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 551acc763e..06000e32b9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -630,6 +630,10 @@ void streamTaskHalt(SStreamTask* pTask); void streamTaskResumeFromHalt(SStreamTask* pTask); void streamTaskDisablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask); +int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); +void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); +void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); +void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); @@ -651,7 +655,7 @@ void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); -// save to b-tree meta store +// save to stream meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 293a1692f0..17652d601d 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,12 +25,8 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); -static int32_t updateTaskUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); -static void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); -static void updateFixDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { @@ -143,7 +139,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr } } else { SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0); - setFixedDownstreamInfo(pTask, pOneSinkTask); + streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask); } return 0; @@ -274,51 +270,12 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setTaskUpstreamInfo(pSinkTask, pTask); + streamTaskSetUpstreamInfo(pSinkTask, pTask); } return TSDB_CODE_SUCCESS; } -static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { - SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); - if (pEpInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pEpInfo->childId = pTask->info.selfChildId; - pEpInfo->epSet = pTask->info.epSet; - pEpInfo->nodeId = pTask->info.nodeId; - pEpInfo->taskId = pTask->id.taskId; - - return pEpInfo; -} - -void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) { - STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; - pDispatcher->taskId = pDownstreamTask->id.taskId; - pDispatcher->nodeId = pDownstreamTask->info.nodeId; - pDispatcher->epSet = pDownstreamTask->info.epSet; - - pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH; - pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; -} - -int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) { - SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask); - if (pEpInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - if (pTask->pUpstreamInfoList == NULL) { - pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES); - } - - taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo); - return TSDB_CODE_SUCCESS; -} - static SArray* addNewTaskList(SArray* pTasksList) { SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); taosArrayPush(pTasksList, &pTaskList); @@ -423,12 +380,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui pWindow->skey, pWindow->ekey); // all the source tasks dispatch result to a single agg node. - setFixedDownstreamInfo(pTask, pDownstreamTask); + streamTaskSetFixedDownstreamInfo(pTask, pDownstreamTask); if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) { return -1; } - return setTaskUpstreamInfo(pDownstreamTask, pTask); + return streamTaskSetUpstreamInfo(pDownstreamTask, pTask); } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, @@ -600,7 +557,7 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setTaskUpstreamInfo(pSinkTask, pUpstreamTask); + streamTaskSetUpstreamInfo(pSinkTask, pUpstreamTask); } } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3345e2ca01..46675e4e26 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -34,14 +34,14 @@ #define MND_STREAM_HB_INTERVAL 100 // 100 sec typedef struct SNodeEntry { - int32_t vgId; + int32_t nodeId; SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. int64_t hbTimestamp; // second } SNodeEntry; typedef struct SStreamVnodeRevertIndex { SHashObj* pVnodeMap; - SArray* pVnodeEntryList; + SArray* pNodeEntryList; } SStreamVnodeRevertIndex; static SStreamVnodeRevertIndex execNodeList; @@ -1017,10 +1017,11 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream for (int32_t i = 0; i < totLevel; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *pTask = taosArrayGetP(pLevel, 0); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); + pTask = taosArrayGetP(pLevel, j); if (pTask->info.fillHistory == 1) { continue; } @@ -1083,6 +1084,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } return 0; } + static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1756,13 +1758,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray* pList = taosArrayInit(4, sizeof(int32_t)); // record the timeout node - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pVnodeEntryList); ++i) { - SNodeEntry* pEntry = taosArrayGet(execNodeList.pVnodeEntryList, i); + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { + SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); if (now - pEntry->hbTimestamp > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next // taosArrayPush(pList, &pEntry); } - if (pEntry->vgId != req.vgId) { + if (pEntry->nodeId != req.vgId) { continue; } @@ -1774,9 +1776,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t nodeId = 0; SEpSet newEpSet = {0}; - {//check all streams that involved this vnode + { // check all streams that involved this vnode SStreamObj *pStream = NULL; - void* pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -1784,59 +1786,38 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } // update the related upstream and downstream tasks - taosRLockLatch(&pStream->lock); + taosWLockLatch(&pStream->lock); int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - for(int32_t j = 0; j < numOfLevels; ++j) { - SArray* pLevel = taosArrayGetP(pStream->tasks, j); + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); int32_t numOfTasks = taosArrayGetSize(pLevel); - for(int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask* pTask = taosArrayGetP(pLevel, k); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); if (pTask->info.nodeId == nodeId) { - // pTask->info.epSet = 0; set the new epset + //pTask->info.epSet = 0; set the new epset continue; } // check for the dispath info and the upstream task info - - int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SOURCE) { - // only update the upstream info of the direct downstream tasks - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - // todo extract method - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - - int32_t numOfVgroups = taosArrayGetSize(pVgs); - for (int32_t i = 0; i < numOfVgroups; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - - if (pVgInfo->vgId == nodeId) { - pVgInfo->epSet = newEpSet; - } - } - - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; - if (pDispatcher->nodeId == nodeId) { - pDispatcher->epSet = newEpSet; - } - } else { - // do nothing - } - } else if (level == TASK_LEVEL__AGG) { - // update the upstream info - SArray* pupstream = pTask->pUpstreamInfoList; -// for(int32_t i = 0; i < ) - } else { - // update the upstream tasks - } + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet); + } else if (level == TASK_LEVEL__AGG) { + streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet); + streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet); + } else { // TASK_LEVEL__SINK + streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet); } } } - taosRLockLatch(&pStream->lock); + + taosWUnLockLatch(&pStream->lock); } + } + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8ed0dcd2c4..43aeeba40f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -321,3 +321,79 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { } } } + +static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { + SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); + if (pEpInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pEpInfo->childId = pTask->info.selfChildId; + pEpInfo->epSet = pTask->info.epSet; + pEpInfo->nodeId = pTask->info.nodeId; + pEpInfo->taskId = pTask->id.taskId; + + return pEpInfo; +} + +int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) { + SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask); + if (pEpInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (pTask->pUpstreamInfoList == NULL) { + pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES); + } + + taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo); + return TSDB_CODE_SUCCESS; +} + +void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { + int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); + for(int32_t i = 0; i < numOfUpstream; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i); + if (pInfo->nodeId == nodeId) { + pInfo->epSet = *pEpSet; + break; + } + } +} + +void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) { + STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; + pDispatcher->taskId = pDownstreamTask->id.taskId; + pDispatcher->nodeId = pDownstreamTask->info.nodeId; + pDispatcher->epSet = pDownstreamTask->info.epSet; + + pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH; + pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; +} + +void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) { + int8_t type = pTask->outputInfo.type; + if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + + int32_t numOfVgroups = taosArrayGetSize(pVgs); + for (int32_t i = 0; i < numOfVgroups; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + + if (pVgInfo->vgId == nodeId) { + pVgInfo->epSet = *pEpSet; + qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId); + break; + } + } + } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { + STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; + if (pDispatcher->nodeId == nodeId) { + pDispatcher->epSet = *pEpSet; + qDebug("s-task:0x%x update the dispatch info, nodeId:%d", pTask->id.taskId, nodeId); + } + } else { + // do nothing + } +}