diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index e4dc85ff97..6aed50e508 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -106,7 +106,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); -void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); +void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t extractStreamNodeList(SMnode *pMnode); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); @@ -121,9 +121,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); void mndInitExecInfo(); void removeExpiredNodeInfo(const SArray *pNodeSnapshot); - -int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); -void removeInvalidTasks(SArray* pTaskIds); +void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f01cb928b7..5fc6e465ad 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -56,12 +56,12 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static SArray *extractNodeListFromStream(SMnode *pMnode); +static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -792,7 +792,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. taosThreadMutexLock(&execInfo.lock); mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name); - saveStreamTasksInfo(&streamObj, &execInfo); + saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); // execute creation @@ -815,7 +815,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen); } else { char detail[1000] = {0}; - sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname); + snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname); auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail)); } @@ -827,6 +827,7 @@ _OVER: mndReleaseStream(pMnode, pStream); tFreeSCMCreateStreamReq(&createReq); tFreeStreamObj(&streamObj); + if (sql != NULL) { taosMemoryFreeClear(sql); } @@ -839,6 +840,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { void *pIter = NULL; SSdb *pSdb = pMnode->pSdb; int64_t maxChkptId = 0; + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; @@ -1033,8 +1035,7 @@ _ERR: int32_t extractStreamNodeList(SMnode *pMnode) { if (taosArrayGetSize(execInfo.pNodeList) == 0) { - execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); - execInfo.pNodeList = extractNodeListFromStream(pMnode); + extractNodeListFromStream(pMnode, execInfo.pNodeList); } return taosArrayGetSize(execInfo.pNodeList); @@ -1071,22 +1072,23 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + + bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); taosArrayDestroy(pNodeSnapshot); if (nodeUpdated) { - mDebug("stream task not ready due to node update"); + mDebug("stream tasks not ready due to node update"); } taosThreadMutexUnlock(&execInfo.lock); return nodeUpdated; } -static int32_t mndCheckNodeStatus(SMnode *pMnode) { +static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { bool ready = true; - int64_t ts = taosGetTimestampSec(); if (taskNodeIsUpdated(pMnode)) { return -1; } @@ -1094,7 +1096,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { taosThreadMutexLock(&execInfo.lock); if (taosArrayGetSize(execInfo.pNodeList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); - execInfo.ts = ts; + ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); } for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { @@ -1148,12 +1150,13 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t code = 0; int32_t numOfCheckpointTrans = 0; - if ((code = mndCheckNodeStatus(pMnode)) != 0) { + if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { return code; } SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval)); int64_t now = taosGetTimestampMs(); + while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { int64_t duration = now - pStream->checkpointFreq; if (duration < tsStreamCheckpointInterval * 1000) { @@ -2166,7 +2169,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return 0; } -static SArray *extractNodeListFromStream(SMnode *pMnode) { +static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2195,13 +2198,13 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { sdbRelease(pSdb, pStream); } - SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); + taosArrayClear(pNodeList); // convert to list pIter = NULL; while ((pIter = taosHashIterate(pHash, pIter)) != NULL) { SNodeEntry *pEntry = (SNodeEntry *)pIter; - taosArrayPush(plist, pEntry); + taosArrayPush(pNodeList, pEntry); char buf[256] = {0}; epsetToStr(&pEntry->epset, buf, tListLen(buf)); @@ -2209,7 +2212,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { } taosHashCleanup(pHash); - return plist; + return TSDB_CODE_SUCCESS; } static bool taskNodeExists(SArray *pList, int32_t nodeId) { @@ -2243,7 +2246,7 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { } } - removeInvalidTasks(pRemovedTasks); + removeTasksInBuf(pRemovedTasks, &execInfo); mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), (int32_t)taosArrayGetSize(execInfo.pTaskList)); @@ -2301,8 +2304,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { // keep the new vnode snapshot if success if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { - taosArrayDestroy(execInfo.pNodeList); - execInfo.pNodeList = extractNodeListFromStream(pMnode); + extractNodeListFromStream(pMnode, execInfo.pNodeList); execInfo.ts = ts; mDebug("create trans successfully, update cached node list, numOfNodes:%d", (int)taosArrayGetSize(execInfo.pNodeList)); @@ -2339,7 +2341,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { +void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { SStreamTask *pTask = streamTaskIterGetCurrent(pIter); @@ -2379,23 +2381,6 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { destroyStreamTaskIter(pIter); } -void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - taosThreadMutexLock(&pExecNode->lock); - - SStreamTaskIter *pIter = createStreamTaskIter(pStream); - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - doRemoveTasks(pExecNode, &id); - } - - ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); - taosThreadMutexUnlock(&pExecNode->lock); - - destroyStreamTaskIter(pIter); -} - static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 0b5113cc7b..3b7461c7e0 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,7 +22,7 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void extractStreamTasks(SMnode *pMnode) { +static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -33,18 +33,18 @@ static void extractStreamTasks(SMnode *pMnode) { break; } - saveStreamTasksInfo(pStream, &execInfo); + saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); sdbRelease(pSdb, pStream); } } -static void removeDroppedStreams(SMnode *pMnode, SStreamExecInfo *pExecInfo) { +static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { int32_t num = taosArrayGetSize(pExecInfo->pTaskList); SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + SArray *pIdList = taosArrayInit(4, sizeof(STaskId)); - SArray* pInvalid = taosArrayInit(4, sizeof(STaskId)); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i); void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t)); @@ -57,11 +57,13 @@ static void removeDroppedStreams(SMnode *pMnode, SStreamExecInfo *pExecInfo) { mndReleaseStream(pMnode, pObj); taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0); } else { - taosArrayPush(pInvalid, pId); + taosArrayPush(pIdList, pId); } } - removeInvalidTasks(pInvalid); + removeTasksInBuf(pIdList, &execInfo); + + taosArrayDestroy(pIdList); taosHashCleanup(pHash); } @@ -286,11 +288,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // extract stream task list if (taosHashGetSize(execInfo.pTaskMap) == 0) { - extractStreamTasks(pMnode); + addAllStreamTasksIntoBuf(pMnode, &execInfo); } else { // the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty. // let's drop them here. - removeDroppedStreams(pMnode, &execInfo); + removeDroppedStreamTasksInBuf(pMnode, &execInfo); } extractStreamNodeList(pMnode); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index f276c70c06..d138254afd 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -26,6 +26,8 @@ struct SStreamTaskIter { SStreamTask *pTask; }; +int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); + SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) { SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); if (pIter == NULL) { @@ -613,7 +615,7 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { taosArrayRemove(pExecNode->pTaskList, k); int32_t num = taosArrayGetSize(pExecNode->pTaskList); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num); + mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num); break; } } @@ -621,9 +623,26 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { return TSDB_CODE_SUCCESS; } -void removeInvalidTasks(SArray *pTaskIds) { +void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) { STaskId *pId = taosArrayGet(pTaskIds, i); - doRemoveTasks(&execInfo, pId); + doRemoveTasks(pExecInfo, pId); } +} + +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { + taosThreadMutexLock(&pExecNode->lock); + + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + doRemoveTasks(pExecNode, &id); + } + + ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); + taosThreadMutexUnlock(&pExecNode->lock); + + destroyStreamTaskIter(pIter); } \ No newline at end of file