diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 4f66648f69..293a1692f0 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,10 +25,12 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream); +static int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); +static int32_t updateTaskUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); -static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); +static void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask); +static void updateFixDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { @@ -141,7 +143,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr } } else { SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0); - setFixedDownstreamEpInfo(pTask, pOneSinkTask); + setFixedDownstreamInfo(pTask, pOneSinkTask); } return 0; @@ -272,7 +274,7 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setTaskUpstreamEpInfo(pTask, pSinkTask); + setTaskUpstreamInfo(pSinkTask, pTask); } return TSDB_CODE_SUCCESS; @@ -293,27 +295,27 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { return pEpInfo; } -void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { - STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher; - pDispatcher->taskId = pTask->id.taskId; - pDispatcher->nodeId = pTask->info.nodeId; - pDispatcher->epSet = pTask->info.epSet; +void setFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) { + STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; + pDispatcher->taskId = pDownstreamTask->id.taskId; + pDispatcher->nodeId = pDownstreamTask->info.nodeId; + pDispatcher->epSet = pDownstreamTask->info.epSet; - pDstTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH; - pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; + pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH; + pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } -int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) { - SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); +int32_t setTaskUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) { + SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - if (pDownstream->pUpstreamInfoList == NULL) { - pDownstream->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES); + if (pTask->pUpstreamInfoList == NULL) { + pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES); } - taosArrayPush(pDownstream->pUpstreamInfoList, &pEpInfo); + taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo); return TSDB_CODE_SUCCESS; } @@ -421,12 +423,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui pWindow->skey, pWindow->ekey); // all the source tasks dispatch result to a single agg node. - setFixedDownstreamEpInfo(pTask, pDownstreamTask); + setFixedDownstreamInfo(pTask, pDownstreamTask); if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) { return -1; } - return setTaskUpstreamEpInfo(pTask, pDownstreamTask); + return setTaskUpstreamInfo(pDownstreamTask, pTask); } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, @@ -598,7 +600,7 @@ static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpst SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask); + setTaskUpstreamInfo(pSinkTask, pUpstreamTask); } } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cbb958a9fa..3345e2ca01 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -30,8 +30,21 @@ #define MND_STREAM_VER_NUMBER 3 #define MND_STREAM_RESERVE_SIZE 64 +#define MND_STREAM_MAX_NUM 60 +#define MND_STREAM_HB_INTERVAL 100 // 100 sec -#define MND_STREAM_MAX_NUM 60 +typedef struct SNodeEntry { + int32_t vgId; + SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. + int64_t hbTimestamp; // second +} SNodeEntry; + +typedef struct SStreamVnodeRevertIndex { + SHashObj* pVnodeMap; + SArray* pVnodeEntryList; +} SStreamVnodeRevertIndex; + +static SStreamVnodeRevertIndex execNodeList; static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); @@ -49,6 +62,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); +static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, + int64_t streamId, int32_t taskId); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -85,11 +100,6 @@ int32_t mndInitStream(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask, - SMStreamDoCheckpointMsg *pMsg); - -static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, - int64_t streamId, int32_t taskId); void mndCleanupStream(SMnode *pMnode) {} @@ -838,48 +848,6 @@ _OVER: return code; } -// static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamList) { -// void *buf = NULL; -// int32_t tlen = 0; -// int32_t checkpointId = tGenIdPI64(); - -// SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); -// SArray *stream = taosArrayInit(64, sizeof(void *)); - -// SListIter iter = {0}; -// tdListInitIter(pStreamList, &iter, TD_LIST_FORWARD); -// SListNode *pNode = NULL; -// while ((pNode = tdListNext(&iter)) != NULL) { -// char streamName[TSDB_STREAM_FNAME_LEN] = {0}; -// tdListNodeGetData(pStreamList, pNode, streamName); -// SStreamObj *pStream = mndAcquireStream(pMnode, streamName); -// taosArrayPush(stream, &pStream); -// } - -// if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, vgId, checkpointId, 0, 0) < 0) { -// mndReleaseVgroup(pMnode, pVgObj); -// for (int i = 0; i < taosArrayGetSize(stream); i++) { -// SStreamObj *p = taosArrayGetP(stream, i); -// mndReleaseStream(pMnode, p); -// } -// taosArrayDestroy(stream); -// return -1; - -// STransAction action = {0}; -// action.epSet = mndGetVgroupEpset(pMnode, pVgObj); -// action.pCont = buf; -// action.contLen = tlen; -// action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; -// } -// mndReleaseVgroup(pMnode, pVgObj); - -// for (int i = 0; i < taosArrayGetSize(stream); i++) { -// SStreamObj *p = taosArrayGetP(stream, i); -// mndReleaseStream(pMnode, p); -// } -// taosArrayDestroy(stream); -// return 0; -// } static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -897,48 +865,6 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { return 0; } -static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, const SStreamTask *pTask, - SMStreamDoCheckpointMsg *pMsg) { - SStreamCheckpointSourceReq req = {0}; - req.checkpointId = pMsg->checkpointId; - req.nodeId = pTask->info.nodeId; - req.expireTime = -1; - req.streamId = pTask->id.streamId; - req.taskId = pTask->id.taskId; - - int32_t code; - int32_t blen; - - tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - int32_t tlen = sizeof(SMsgHead) + blen; - - void *buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - tEncodeStreamCheckpointSourceReq(&encoder, &req); - - SMsgHead *pMsgHead = (SMsgHead *)buf; - pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(pTask->info.nodeId); - - tEncoderClear(&encoder); - - *pBuf = buf; - *pLen = tlen; - - return 0; -} static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, int64_t streamId, int32_t taskId) { SStreamCheckpointSourceReq req = {0}; @@ -1807,3 +1733,110 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } + +// todo: handle the database drop/stream drop case +int32_t mndProcessStreamHb(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + SStreamHbMsg req = {0}; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeStreamHbMsg(&decoder, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int64_t now = taosGetTimestampSec(); + + // only handle the vnode transfer case. + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + + // record the timeout node + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pVnodeEntryList); ++i) { + SNodeEntry* pEntry = taosArrayGet(execNodeList.pVnodeEntryList, i); + if (now - pEntry->hbTimestamp > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next +// taosArrayPush(pList, &pEntry); + } + + if (pEntry->vgId != req.vgId) { + continue; + } + + // check epset to identify whether the node has been transferred to other dnodes. + // 1. if the epset is changed + taosArrayPush(pList, &pEntry); + } + + int32_t nodeId = 0; + SEpSet newEpSet = {0}; + + {//check all streams that involved this vnode + SStreamObj *pStream = NULL; + void* pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + // update the related upstream and downstream tasks + taosRLockLatch(&pStream->lock); + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + + for(int32_t j = 0; j < numOfLevels; ++j) { + SArray* pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for(int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask* pTask = taosArrayGetP(pLevel, k); + if (pTask->info.nodeId == nodeId) { + // pTask->info.epSet = 0; set the new epset + continue; + } + + // check for the dispath info and the upstream task info + + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + // only update the upstream info of the direct downstream tasks + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + // todo extract method + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + + int32_t numOfVgroups = taosArrayGetSize(pVgs); + for (int32_t i = 0; i < numOfVgroups; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + + if (pVgInfo->vgId == nodeId) { + pVgInfo->epSet = newEpSet; + } + } + + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + STaskDispatcherFixedEp* pDispatcher = &pTask->fixedEpDispatcher; + if (pDispatcher->nodeId == nodeId) { + pDispatcher->epSet = newEpSet; + } + } else { + // do nothing + } + } else if (level == TASK_LEVEL__AGG) { + // update the upstream info + SArray* pupstream = pTask->pUpstreamInfoList; +// for(int32_t i = 0; i < ) + } else { + // update the upstream tasks + } + } + } + } + taosRLockLatch(&pStream->lock); + } + + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 30ec564c45..d45eac58e9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -557,10 +557,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { return pEncoder->pos; } -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp) { +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->numOfTasks) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; tEndDecode(pDecoder); return 0; }