From 0a7022693c03fd5e016b28eef522f2cbf76acae4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Sep 2024 18:13:46 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/mnode/impl/inc/mndStream.h | 16 +- source/dnode/mnode/impl/src/mndStream.c | 349 +++------ source/dnode/mnode/impl/src/mndStreamTrans.c | 14 +- source/dnode/mnode/impl/src/mndStreamUtil.c | 731 +++++-------------- source/libs/stream/src/streamExtraInfo.c | 39 + 5 files changed, 318 insertions(+), 831 deletions(-) create mode 100644 source/libs/stream/src/streamExtraInfo.c diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index dceb86c963..b97eaf31d1 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -35,6 +35,7 @@ extern "C" { #define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" #define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" #define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen" +#define MND_STREAM_RESTART_NAME "stream-restart" typedef struct SStreamTransInfo { int64_t startTime; @@ -120,6 +121,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList); +void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode, int32_t acceptCode); @@ -127,31 +129,39 @@ int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnC const char *pMsg, STrans **pTrans1); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); -void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream); +bool mndStreamNodeIsUpdated(SMnode *pMnode); + int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); -void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t extractStreamNodeList(SMnode *pMnode); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); +int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts); +int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream); +int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, + int8_t mndTrigger); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, int64_t ts); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); +int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, + SVgroupChangeInfo *pInfo); +void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask); + int32_t mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); void mndStreamResetInitTaskListLoadFlag(); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 511cc8f984..3ec99f6e44 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -62,9 +62,8 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static int32_t mndProcessCheckpointReport(SRpcMsg *pReq); static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg); static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code); -static int32_t mndProcessDropOrphanTaskReq(SRpcMsg* pReq); -static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo* pInfo); -static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo); +static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq); +static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); @@ -920,6 +919,85 @@ _OVER: return code; } +static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SStreamObj *pStream = NULL; + int32_t code = 0; + SMPauseStreamReq pauseReq = {0}; + + if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { + return TSDB_CODE_INVALID_MSG; + } + + code = mndAcquireStream(pMnode, pauseReq.name, &pStream); + if (pStream == NULL || code != 0) { + if (pauseReq.igNotExists) { + mInfo("stream:%s, not exist, not restart stream", pauseReq.name); + return 0; + } else { + mError("stream:%s not exist, failed to restart stream", pauseReq.name); + TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); + } + } + + mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid); + if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) { + sdbRelease(pMnode->pSdb, pStream); + return code; + } + + // check if it is conflict with other trans in both sourceDb and targetDb. + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true); + if (code) { + sdbRelease(pMnode->pSdb, pStream); + return code; + } + + bool updated = mndStreamNodeIsUpdated(pMnode); + if (updated) { + mError("tasks are not ready for restart, node update detected"); + sdbRelease(pMnode->pSdb, pStream); + TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); + } + + STrans *pTrans = NULL; + code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", &pTrans); + if (pTrans == NULL || code) { + mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code)); + sdbRelease(pMnode->pSdb, pStream); + return code; + } + + code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid); + if (code) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + // if nodeUpdate happened, not send pause trans + code = mndStreamSetRestartAction(pMnode, pTrans, pStream); + if (code) { + mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code)); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code)); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} + int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { SStreamObj *pStream = NULL; void *pIter = NULL; @@ -973,82 +1051,6 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { return maxChkptId + 1; } -static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, - int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) { - SStreamCheckpointSourceReq req = {0}; - req.checkpointId = checkpointId; - req.nodeId = nodeId; - req.expireTime = -1; - req.streamId = streamId; // pTask->id.streamId; - req.taskId = taskId; // pTask->id.taskId; - req.transId = transId; - req.mndTrigger = mndTrigger; - - int32_t code; - int32_t blen; - - tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code); - if (code < 0) { - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - - int32_t tlen = sizeof(SMsgHead) + blen; - - void *buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - return terrno; - } - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req); - if (pos == -1) { - tEncoderClear(&encoder); - return TSDB_CODE_INVALID_MSG; - } - - SMsgHead *pMsgHead = (SMsgHead *)buf; - pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(nodeId); - - tEncoderClear(&encoder); - - *pBuf = buf; - *pLen = tlen; - - return 0; -} - -static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, - int8_t mndTrigger) { - void *buf; - int32_t tlen; - int32_t code = 0; - SEpSet epset = {0}; - bool hasEpset = false; - - if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId, pTrans->id, mndTrigger)) < 0) { - taosMemoryFree(buf); - return code; - } - - code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - taosMemoryFree(buf); - return code; - } - - code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, - TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != 0) { - taosMemoryFree(buf); - } - - return code; -} - static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, int8_t mndTrigger, bool lock) { int32_t code = TSDB_CODE_SUCCESS; @@ -1096,7 +1098,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - code = doSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger); + code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger); if (code != TSDB_CODE_SUCCESS) { taosWUnLockLatch(&pStream->lock); @@ -1143,70 +1145,9 @@ int32_t extractStreamNodeList(SMnode *pMnode) { return taosArrayGetSize(execInfo.pNodeList); } -static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) { - bool allReady = false; - bool nodeUpdated = false; - SVgroupChangeInfo changeInfo = {0}; - - int32_t numOfNodes = extractStreamNodeList(pMnode); - - if (numOfNodes == 0) { - mDebug("stream task node change checking done, no vgroups exist, do nothing"); - execInfo.ts = taosGetTimestampSec(); - return false; - } - - for (int32_t i = 0; i < numOfNodes; ++i) { - SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); - if (pNodeEntry == NULL) { - continue; - } - - if (pNodeEntry->stageUpdated) { - mDebug("stream task not ready due to node update detected, checkpoint not issued"); - return true; - } - } - - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot); - if (code) { - mError("failed to get the vgroup snapshot, ignore it and continue"); - } - - if (!allReady) { - mWarn("not all vnodes ready, quit from vnodes status check"); - return true; - } - - code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo); - if (code) { - nodeUpdated = false; - } else { - nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - if (nodeUpdated) { - mDebug("stream tasks not ready due to node update"); - } - } - - mndDestroyVgroupChangeInfo(&changeInfo); - return nodeUpdated; -} - -// check if the node update happens or not -static bool taskNodeIsUpdated(SMnode *pMnode) { - SArray *pNodeSnapshot = NULL; - - streamMutexLock(&execInfo.lock); - bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot); - streamMutexUnlock(&execInfo.lock); - - taosArrayDestroy(pNodeSnapshot); - return updated; -} - static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { bool ready = true; - if (taskNodeIsUpdated(pMnode)) { + if (mndStreamNodeIsUpdated(pMnode)) { TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS); } @@ -1605,32 +1546,6 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return 0; } -int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { - SSdb *pSdb = pMnode->pSdb; - SDbObj *pDb = mndAcquireDb(pMnode, dbName); - if (pDb == NULL) { - TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED); - } - - int32_t numOfStreams = 0; - void *pIter = NULL; - while (1) { - SStreamObj *pStream = NULL; - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) break; - - if (pStream->sourceDbUid == pDb->uid) { - numOfStreams++; - } - - sdbRelease(pSdb, pStream); - } - - *pNumOfStreams = numOfStreams; - mndReleaseDb(pMnode, pDb); - return 0; -} - static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1770,7 +1685,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { TAOS_RETURN(code); } - bool updated = taskNodeIsUpdated(pMnode); + bool updated = mndStreamNodeIsUpdated(pMnode); if (updated) { mError("tasks are not ready for pause, node update detected"); sdbRelease(pMnode->pSdb, pStream); @@ -1965,102 +1880,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { - const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); - const SEp *p = GET_ACTIVE_EP(pCurrent); - - if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { - return false; - } - return true; -} - -// 1. increase the replica does not affect the stream process. -// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream -// tasks on the will be removed replica. -// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we -// will handle it as mentioned in 1 & 2 items. -static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, - SVgroupChangeInfo *pInfo) { - int32_t code = 0; - int32_t lino = 0; - - if (pInfo == NULL) { - return TSDB_CODE_INVALID_PARA; - } - - pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), - pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); - - if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { - mndDestroyVgroupChangeInfo(pInfo); - TSDB_CHECK_NULL(NULL, code, lino, _err, terrno); - } - - int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); - for (int32_t i = 0; i < numOfNodes; ++i) { - SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i); - if (pPrevEntry == NULL) { - continue; - } - - int32_t num = taosArrayGetSize(pNodeList); - for (int32_t j = 0; j < num; ++j) { - SNodeEntry *pCurrent = taosArrayGet(pNodeList, j); - if(pCurrent == NULL) { - continue; - } - - if (pCurrent->nodeId == pPrevEntry->nodeId) { - if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) { - const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); - - char buf[256] = {0}; - code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error - if (code) { - mError("failed to convert epset string, code:%s", tstrerror(code)); - TSDB_CHECK_CODE(code, lino, _err); - } - - mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, - pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); - - SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId}; - epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); - epsetAssign(&updateInfo.newEp, &pCurrent->epset); - - void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo); - TSDB_CHECK_NULL(p, code, lino, _err, terrno); - } - - // todo handle the snode info - if (pCurrent->nodeId != SNODE_HANDLE) { - SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); - code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); - mndReleaseVgroup(pMnode, pVgroup); - TSDB_CHECK_CODE(code, lino, _err); - } - - break; - } - } - } - - return code; - - _err: - mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino); - mndDestroyVgroupChangeInfo(pInfo); - return code; -} - -static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) { - if (pInfo != NULL) { - taosArrayDestroy(pInfo->pUpdateNodeList); - taosHashCleanup(pInfo->pDBMap); - } -} - static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 25a735e152..e5b4447a39 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -21,6 +21,10 @@ typedef struct SKeyInfo { int32_t keyLen; } SKeyInfo; +static bool identicalName(const char *pDb, const char *pParam, int32_t len) { + return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0); +} + int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) { SStreamTransInfo info = { .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId}; @@ -117,7 +121,8 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char } if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { - if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) { + if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) && + (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); return TSDB_CODE_MND_TRANS_CONFLICT; @@ -126,7 +131,8 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char } } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) || - strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { + (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) || + strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); return TSDB_CODE_MND_TRANS_CONFLICT; @@ -282,10 +288,6 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg return mndTransAppendRedoAction(pTrans, &action); } -static bool identicalName(const char *pDb, const char *pParam, int32_t len) { - return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0); -} - int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { void *pIter = NULL; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 5d8ba02781..6e48c58b30 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -304,41 +304,6 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t } } -static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { - terrno = 0; - - SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); - if (pReq == NULL) { - mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - pReq->igUntreated = igUntreated; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { - terrno = code; - taosMemoryFree(pReq); - return terrno; - } - - code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != 0) { - taosMemoryFree(pReq); - return terrno; - } - - mDebug("set the resume action for trans:%d", pTrans->id); - return 0; -} - int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) { *pTask = NULL; @@ -377,396 +342,29 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { return num; } -int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { - SStreamTaskIter *pIter = NULL; - int32_t code = createStreamTaskIter(pStream, &pIter); - if (code) { - mError("failed to create stream task iter:%s", pStream->name); - return code; +int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) { + SSdb *pSdb = pMnode->pSdb; + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED); } - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = NULL; - code = streamTaskIterGetCurrent(pIter, &pTask); - if (code || pTask == NULL) { - destroyStreamTaskIter(pIter); - return code; + int32_t numOfStreams = 0; + void *pIter = NULL; + while (1) { + SStreamObj *pStream = NULL; + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + if (pStream->sourceDbUid == pDb->uid) { + numOfStreams++; } - code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated); - if (code) { - destroyStreamTaskIter(pIter); - return code; - } - - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup); - } - } - destroyStreamTaskIter(pIter); - return 0; -} - -static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { - SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); - if (pReq == NULL) { - mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + sdbRelease(pSdb, pStream); } - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - terrno = code; - taosMemoryFree(pReq); - return code; - } - - char buf[256] = {0}; - code = epsetToStr(&epset, buf, tListLen(buf)); - if (code != 0) { // print error and continue - mError("failed to convert epset to str, code:%s", tstrerror(code)); - } - - mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); - code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != 0) { - taosMemoryFree(pReq); - return code; - } - return 0; -} - -int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = NULL; - - int32_t code = createStreamTaskIter(pStream, &pIter); - if (code) { - mError("failed to create stream task iter:%s", pStream->name); - return code; - } - - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = NULL; - code = streamTaskIterGetCurrent(pIter, &pTask); - if (code) { - destroyStreamTaskIter(pIter); - return code; - } - - code = doSetPauseAction(pMnode, pTrans, pTask); - if (code) { - destroyStreamTaskIter(pIter); - return code; - } - - if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } - } - - destroyStreamTaskIter(pIter); - return code; -} - -static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { - SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); - if (pReq == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction - return code; - } - - // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != 0) { - taosMemoryFree(pReq); - return code; - } - - return 0; -} - -int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = NULL; - - int32_t code = createStreamTaskIter(pStream, &pIter); - if (code) { - mError("failed to create stream task iter:%s", pStream->name); - return code; - } - - while(streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = NULL; - code = streamTaskIterGetCurrent(pIter, &pTask); - if (code) { - destroyStreamTaskIter(pIter); - return code; - } - - code = doSetDropAction(pMnode, pTrans, pTask); - if (code) { - destroyStreamTaskIter(pIter); - return code; - } - } - destroyStreamTaskIter(pIter); - return 0; -} - -static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) { - SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); - if (pReq == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; - } - - pReq->head.vgId = htonl(pTask->nodeId); - pReq->taskId = pTask->taskId; - pReq->streamId = pTask->streamId; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId); - if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction - taosMemoryFree(pReq); - return code; - } - - // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != 0) { - taosMemoryFree(pReq); - return code; - } - - return 0; -} - -int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) { - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - SOrphanTask* pTask = taosArrayGet(pList, i); - if (pTask == NULL) { - return terrno; - } - - int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask); - if (code != 0) { - return code; - } else { - mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId); - } - } - return 0; -} - -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, - int32_t transId) { - int32_t code = 0; - - pMsg->streamId = pId->streamId; - pMsg->taskId = pId->taskId; - pMsg->transId = transId; - pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); - if (pMsg->pNodeList == NULL) { - mError("failed to prepare node list, code:%s", tstrerror(terrno)); - code = terrno; - } - - if (code == 0) { - void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); - if (p == NULL) { - mError("failed to add update node list into nodeList"); - } - } -} - -static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, - SStreamTaskId *pId, int32_t transId) { - SStreamTaskNodeUpdateMsg req = {0}; - initNodeUpdateMsg(&req, pInfo, pId, transId); - - int32_t code = 0; - int32_t blen; - - tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); - return terrno; - } - - int32_t tlen = sizeof(SMsgHead) + blen; - - void *buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - taosArrayDestroy(req.pNodeList); - return terrno; - } - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - code = tEncodeStreamTaskUpdateMsg(&encoder, &req); - if (code == -1) { - tEncoderClear(&encoder); - taosMemoryFree(buf); - taosArrayDestroy(req.pNodeList); - return code; - } - - SMsgHead *pMsgHead = (SMsgHead *)buf; - pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(nodeId); - - tEncoderClear(&encoder); - - *pBuf = buf; - *pLen = tlen; - - taosArrayDestroy(req.pNodeList); - return TSDB_CODE_SUCCESS; -} - -static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { - void *pBuf = NULL; - int32_t len = 0; - SEpSet epset = {0}; - bool hasEpset = false; - - bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); - if (code) { - mError("failed to build stream task epset update msg, code:%s", tstrerror(code)); - return code; - } - - code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - mError("failed to extract epset during create update epset, code:%s", tstrerror(code)); - return code; - } - - code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != TSDB_CODE_SUCCESS) { - mError("failed to create update task epset trans, code:%s", tstrerror(code)); - taosMemoryFree(pBuf); - } - - return code; -} - -// build trans to update the epset -int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { - mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); - SStreamTaskIter *pIter = NULL; - - taosWLockLatch(&pStream->lock); - int32_t code = createStreamTaskIter(pStream, &pIter); - if (code) { - taosWUnLockLatch(&pStream->lock); - mError("failed to create stream task iter:%s", pStream->name); - return code; - } - - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = NULL; - code = streamTaskIterGetCurrent(pIter, &pTask); - if (code) { - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return code; - } - - code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo); - if (code != TSDB_CODE_SUCCESS) { - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return code; - } - } - - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return 0; -} - -static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { - SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); - if (pReq == NULL) { - mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), - tstrerror(terrno)); - return terrno; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - taosMemoryFree(pReq); - return code; - } - - code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pReq); - } - - return code; -} - -int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = NULL; - - taosWLockLatch(&pStream->lock); - int32_t code = createStreamTaskIter(pStream, &pIter); - if (code) { - taosWUnLockLatch(&pStream->lock); - mError("failed to create stream task iter:%s", pStream->name); - return code; - } - - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = NULL; - code = streamTaskIterGetCurrent(pIter, &pTask); - if (code) { - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return code; - } - - code = doSetResetAction(pMnode, pTrans, pTask); - if (code != TSDB_CODE_SUCCESS) { - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return code; - } - } - - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); + *pNumOfStreams = numOfStreams; + mndReleaseDb(pMnode, pDb); return 0; } @@ -1000,90 +598,6 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { return 0; } -static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { - SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq)); - if (pReq == NULL) { - mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq), - tstrerror(terrno)); - return terrno; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); - if (pStreamItem == NULL) { - return TSDB_CODE_INVALID_PARA; - } - - int32_t size = taosArrayGetSize(pStreamItem->pTaskList); - for(int32_t i = 0; i < size; ++i) { - STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i); - if (pInfo == NULL) { - continue; - } - - if (pInfo->taskId == pTask->id.taskId) { - pReq->checkpointId = pInfo->checkpointId; - pReq->checkpointVer = pInfo->version; - pReq->checkpointTs = pInfo->ts; - pReq->dropRelHTask = pInfo->dropHTask; - pReq->transId = pInfo->transId; - pReq->hStreamId = pTask->hTaskInfo.id.streamId; - pReq->hTaskId = pTask->hTaskInfo.id.taskId; - } - } - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - taosMemoryFree(pReq); - return code; - } - - code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pReq); - } - - return code; -} - -int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = NULL; - - taosWLockLatch(&pStream->lock); - int32_t code = createStreamTaskIter(pStream, &pIter); - if (code) { - taosWUnLockLatch(&pStream->lock); - mError("failed to create stream task iter:%s", pStream->name); - return code; - } - - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = NULL; - code = streamTaskIterGetCurrent(pIter, &pTask); - if (code) { - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return code; - } - - code = doSetUpdateChkptAction(pMnode, pTrans, pTask); - if (code != TSDB_CODE_SUCCESS) { - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return code; - } - } - - destroyStreamTaskIter(pIter); - taosWUnLockLatch(&pStream->lock); - return code; -} - int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; void *pIter = NULL; @@ -1172,60 +686,6 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; } -static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) { - SRestoreCheckpointInfo req = { - .taskId = pTask->id.taskId, - .streamId = pTask->id.streamId, - .checkpointId = checkpointId, - .startTs = ts, - .nodeId = pTask->info.nodeId, - .transId = pTrans->id, - }; - - int32_t code = 0; - int32_t blen; - tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code); - if (code < 0) { - return terrno; - } - - int32_t tlen = sizeof(SMsgHead) + blen; - - void *pBuf = taosMemoryMalloc(tlen); - if (pBuf == NULL) { - return terrno; - } - - void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - code = tEncodeRestoreCheckpointInfo(&encoder, &req); - tEncoderClear(&encoder); - if (code == -1) { - taosMemoryFree(pBuf); - return code; - } - - SMsgHead *pMsgHead = (SMsgHead *)pBuf; - pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(pTask->info.nodeId); - - SEpSet epset = {0}; - bool hasEpset = false; - code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - taosMemoryFree(pBuf); - return code; - } - - code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pBuf); - } - - return code; -} - int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, int64_t ts) { char msg[128] = {0}; @@ -1880,6 +1340,163 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo return code; } +static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { + const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); + const SEp *p = GET_ACTIVE_EP(pCurrent); + + if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { + return false; + } + return true; +} + +void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) { + if (pInfo != NULL) { + taosArrayDestroy(pInfo->pUpdateNodeList); + taosHashCleanup(pInfo->pDBMap); + } +} + +// 1. increase the replica does not affect the stream process. +// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream +// tasks on the will be removed replica. +// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we +// will handle it as mentioned in 1 & 2 items. +int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, + SVgroupChangeInfo *pInfo) { + int32_t code = 0; + int32_t lino = 0; + + if (pInfo == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), + pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + + if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { + mndDestroyVgroupChangeInfo(pInfo); + TSDB_CHECK_NULL(NULL, code, lino, _err, terrno); + } + + int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); + for (int32_t i = 0; i < numOfNodes; ++i) { + SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i); + if (pPrevEntry == NULL) { + continue; + } + + int32_t num = taosArrayGetSize(pNodeList); + for (int32_t j = 0; j < num; ++j) { + SNodeEntry *pCurrent = taosArrayGet(pNodeList, j); + if(pCurrent == NULL) { + continue; + } + + if (pCurrent->nodeId == pPrevEntry->nodeId) { + if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) { + const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); + + char buf[256] = {0}; + code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error + if (code) { + mError("failed to convert epset string, code:%s", tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _err); + } + + mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, + pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); + + SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId}; + epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); + epsetAssign(&updateInfo.newEp, &pCurrent->epset); + + void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo); + TSDB_CHECK_NULL(p, code, lino, _err, terrno); + } + + // todo handle the snode info + if (pCurrent->nodeId != SNODE_HANDLE) { + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); + code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); + mndReleaseVgroup(pMnode, pVgroup); + TSDB_CHECK_CODE(code, lino, _err); + } + + break; + } + } + } + + return code; + + _err: + mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino); + mndDestroyVgroupChangeInfo(pInfo); + return code; + } + +static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) { + bool allReady = false; + bool nodeUpdated = false; + SVgroupChangeInfo changeInfo = {0}; + + int32_t numOfNodes = extractStreamNodeList(pMnode); + + if (numOfNodes == 0) { + mDebug("stream task node change checking done, no vgroups exist, do nothing"); + execInfo.ts = taosGetTimestampSec(); + return false; + } + + for (int32_t i = 0; i < numOfNodes; ++i) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); + if (pNodeEntry == NULL) { + continue; + } + + if (pNodeEntry->stageUpdated) { + mDebug("stream task not ready due to node update detected, checkpoint not issued"); + return true; + } + } + + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot); + if (code) { + mError("failed to get the vgroup snapshot, ignore it and continue"); + } + + if (!allReady) { + mWarn("not all vnodes ready, quit from vnodes status check"); + return true; + } + + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo); + if (code) { + nodeUpdated = false; + } else { + nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + if (nodeUpdated) { + mDebug("stream tasks not ready due to node update"); + } + } + + mndDestroyVgroupChangeInfo(&changeInfo); + return nodeUpdated; +} + +// check if the node update happens or not +bool mndStreamNodeIsUpdated(SMnode *pMnode) { + SArray *pNodeSnapshot = NULL; + + streamMutexLock(&execInfo.lock); + bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot); + streamMutexUnlock(&execInfo.lock); + + taosArrayDestroy(pNodeSnapshot); + return updated; +} + uint32_t seed = 0; static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; diff --git a/source/libs/stream/src/streamExtraInfo.c b/source/libs/stream/src/streamExtraInfo.c new file mode 100644 index 0000000000..168571363f --- /dev/null +++ b/source/libs/stream/src/streamExtraInfo.c @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" + +#define UP_TASKS_NOT_SEND_CHKPT_TRIGGER 1 +#define DOWN_TASKS_NOT_READY 2 +#define DOWN_TASKS_BACKPRESSURE 3 +#define DOWN_TASKS_INPUTQ_CLOSED 4 +#define TASK_OUTPUTQ_FULL 5 +#define TASK_SINK_QUOTA_REACHED 6 + +typedef struct SStreamTaskExtraInfo { + int32_t infoId; + char* pMsg; +} SStreamTaskExtraInfo; + +SStreamTaskExtraInfo extraInfoList[8] = { + {0}, + {.infoId = UP_TASKS_NOT_SEND_CHKPT_TRIGGER, .pMsg = "%d(us) not send checkpoint-trigger"}, + {.infoId = DOWN_TASKS_NOT_READY, .pMsg = "%d(ds) tasks not ready"}, + {.infoId = DOWN_TASKS_BACKPRESSURE, .pMsg = "0x%x(ds) backpressure"}, + {.infoId = DOWN_TASKS_INPUTQ_CLOSED, .pMsg = "0x%x(ds) inputQ closed"}, + {.infoId = TASK_OUTPUTQ_FULL, .pMsg = "outputQ is full"}, + {.infoId = TASK_SINK_QUOTA_REACHED, .pMsg = "sink quota reached"}, +}; +