diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3eca61c1dd..53afcc0e3f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -77,12 +77,12 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in int64_t streamId, int32_t taskId); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); -static SArray *doExtractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); +static SArray *doExtractNodeListFromStream(SMnode *pMnode); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); +static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); int32_t mndInitStream(SMnode *pMnode) { @@ -129,7 +129,11 @@ int32_t mndInitStream(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupStream(SMnode *pMnode) {} +void mndCleanupStream(SMnode *pMnode) { + taosArrayDestroy(execNodeList.pTaskList); + taosHashCleanup(execNodeList.pTaskMap); + taosThreadMutexDestroy(&execNodeList.lock); +} SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1156,7 +1160,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); taosArrayDestroy(pNodeSnapshot); @@ -1167,12 +1171,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } } - {// check if all tasks are in TASK_STATUS__NORMAL status + { // check if all tasks are in TASK_STATUS__NORMAL status bool ready = true; taosThreadMutexLock(&execNodeList.lock); - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { - STaskStatusEntry* p = taosArrayGet(execNodeList.pTaskList, i); + for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { + STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i); if (p->status != TASK_STATUS__NORMAL) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status)); @@ -1197,9 +1201,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); - const char* pDb = mndGetStreamDB(pMnode); + const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, "checkpoint"); - taosMemoryFree((void*)pDb); + taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, @@ -2226,7 +2230,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2234,9 +2238,9 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p int32_t numOfTasks = taosArrayGetSize(pLevel); for (int32_t j = 0; j < numOfTasks; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); + void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); if (p == NULL) { STaskStatusEntry entry = { .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; @@ -2250,21 +2254,23 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p } // todo: this process should be executed by the write queue worker of the mnode - int32_t mndProcessStreamHb(SRpcMsg *pReq) { +int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; int32_t code = TSDB_CODE_SUCCESS; - SDecoder decoder = {0}; + SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { + tDecoderClear(&decoder); terrno = TSDB_CODE_INVALID_MSG; return -1; } + tDecoderClear(&decoder); -// int64_t now = taosGetTimestampSec(); + // int64_t now = taosGetTimestampSec(); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execNodeList.lock); @@ -2273,12 +2279,12 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p doExtractTasksFromStream(pMnode); } - for(int32_t i = 0; i < req.numOfTasks; ++i) { - STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + for (int32_t i = 0; i < req.numOfTasks; ++i) { + STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); - STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status)); @@ -2286,64 +2292,66 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p } taosThreadMutexUnlock(&execNodeList.lock); + taosArrayDestroy(req.pTaskStatus); + // bool nodeChanged = false; -// SArray* pList = taosArrayInit(4, sizeof(int32_t)); -/* - // record the timeout node - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { - SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); - int64_t duration = now - pEntry->hbTimestamp; - if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next - taosArrayPush(pList, &pEntry); - mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); - continue; - } - - if (pEntry->nodeId != req.vgId) { - continue; - } - - pEntry->hbTimestamp = now; - - // check epset to identify whether the node has been transferred to other dnodes. - // node the epset is changed, which means the node transfer has occurred for this node. -// if (!isEpsetEqual(&pEntry->epset, &req.epset)) { -// nodeChanged = true; -// break; -// } - } - - // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, - // to identify whether the dnode is truely offline or not. - - // handle the node changed case - if (!nodeChanged) { - return TSDB_CODE_SUCCESS; - } - - int32_t nodeId = req.vgId; - - {// check all streams that involved this vnode should update the epset info - SStreamObj *pStream = NULL; - void *pIter = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; + // SArray* pList = taosArrayInit(4, sizeof(int32_t)); + /* + // record the timeout node + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { + SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); + int64_t duration = now - pEntry->hbTimestamp; + if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next + taosArrayPush(pList, &pEntry); + mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); + continue; } - // update the related upstream and downstream tasks, todo remove this, no need this function - taosWLockLatch(&pStream->lock); -// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); -// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); - taosWUnLockLatch(&pStream->lock); + if (pEntry->nodeId != req.vgId) { + continue; + } -// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); -// if (code != TSDB_CODE_SUCCESS) { -// todo -//// } -// } - } -*/ + pEntry->hbTimestamp = now; + + // check epset to identify whether the node has been transferred to other dnodes. + // node the epset is changed, which means the node transfer has occurred for this node. + // if (!isEpsetEqual(&pEntry->epset, &req.epset)) { + // nodeChanged = true; + // break; + // } + } + + // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, + // to identify whether the dnode is truely offline or not. + + // handle the node changed case + if (!nodeChanged) { + return TSDB_CODE_SUCCESS; + } + + int32_t nodeId = req.vgId; + + {// check all streams that involved this vnode should update the epset info + SStreamObj *pStream = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + // update the related upstream and downstream tasks, todo remove this, no need this function + taosWLockLatch(&pStream->lock); + // streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); + // streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); + taosWUnLockLatch(&pStream->lock); + + // code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); + // if (code != TSDB_CODE_SUCCESS) { + // todo + //// } + // } + } + */ return TSDB_CODE_SUCCESS; }