diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 6d2a89ddc9..e4dc85ff97 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -107,7 +107,7 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); -int32_t initStreamNodeList(SMnode *pMnode); +int32_t extractStreamNodeList(SMnode *pMnode); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); @@ -122,6 +122,9 @@ SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); void mndInitExecInfo(); void removeExpiredNodeInfo(const SArray *pNodeSnapshot); +int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); +void removeInvalidTasks(SArray* pTaskIds); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b3495da122..f01cb928b7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -62,7 +62,7 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot); +static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -1031,7 +1031,7 @@ _ERR: return code; } -int32_t initStreamNodeList(SMnode *pMnode) { +int32_t extractStreamNodeList(SMnode *pMnode) { if (taosArrayGetSize(execInfo.pNodeList) == 0) { execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); execInfo.pNodeList = extractNodeListFromStream(pMnode); @@ -1044,7 +1044,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { // check if the node update happens or not taosThreadMutexLock(&execInfo.lock); - int32_t numOfNodes = initStreamNodeList(pMnode); + int32_t numOfNodes = extractStreamNodeList(pMnode); if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = taosGetTimestampSec(); @@ -2212,27 +2212,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { return plist; } -static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { - void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); - if (p == NULL) { - return TSDB_CODE_SUCCESS; - } - taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); - - for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); - if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { - taosArrayRemove(pExecNode->pTaskList, k); - - int32_t num = taosArrayGetSize(pExecNode->pTaskList); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num); - break; - } - } - - return TSDB_CODE_SUCCESS; -} - static bool taskNodeExists(SArray *pList, int32_t nodeId) { size_t num = taosArrayGetSize(pList); @@ -2246,7 +2225,7 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) { return false; } -int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) { +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); @@ -2264,10 +2243,7 @@ int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) { } } - for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { - STaskId *pId = taosArrayGet(pRemovedTasks, i); - doRemoveTasks(&execInfo, pId); - } + removeInvalidTasks(pRemovedTasks); mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), (int32_t)taosArrayGetSize(execInfo.pTaskList)); @@ -2294,7 +2270,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; taosThreadMutexLock(&execInfo.lock); - int32_t numOfNodes = initStreamNodeList(pMnode); + int32_t numOfNodes = extractStreamNodeList(pMnode); taosThreadMutexUnlock(&execInfo.lock); if (numOfNodes == 0) { @@ -2314,7 +2290,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } taosThreadMutexLock(&execInfo.lock); - removeExpiredNodeEntryAndTask(pNodeSnapshot); + + removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { @@ -2410,21 +2387,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { SStreamTask *pTask = streamTaskIterGetCurrent(pIter); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); - if (p != NULL) { - taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); - - for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); - if (pId->taskId == id.taskId && pId->streamId == id.streamId) { - taosArrayRemove(pExecNode->pTaskList, k); - - int32_t num = taosArrayGetSize(pExecNode->pTaskList); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num); - break; - } - } - } + doRemoveTasks(pExecNode, &id); } ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 778fd295f7..0b5113cc7b 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,7 +22,7 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void doExtractTasksFromStream(SMnode *pMnode) { +static void extractStreamTasks(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -38,6 +38,33 @@ static void doExtractTasksFromStream(SMnode *pMnode) { } } +static void removeDroppedStreams(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + int32_t num = taosArrayGetSize(pExecInfo->pTaskList); + + SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + + SArray* pInvalid = taosArrayInit(4, sizeof(STaskId)); + for(int32_t i = 0; i < num; ++i) { + STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i); + + void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t)); + if (p != NULL) { + continue; + } + + void* pObj = mndGetStreamObj(pMnode, pId->streamId); + if (pObj != NULL) { + mndReleaseStream(pMnode, pObj); + taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0); + } else { + taosArrayPush(pInvalid, pId); + } + } + + removeInvalidTasks(pInvalid); + taosHashCleanup(pHash); +} + static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { @@ -230,7 +257,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - SArray *pFailedTasks = NULL; + SArray *pFailedChkpt = NULL; SArray *pOrphanTasks = NULL; if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { @@ -252,17 +279,21 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); - pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); - pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); + pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); taosThreadMutexLock(&execInfo.lock); // extract stream task list if (taosHashGetSize(execInfo.pTaskMap) == 0) { - doExtractTasksFromStream(pMnode); + extractStreamTasks(pMnode); + } else { + // the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty. + // let's drop them here. + removeDroppedStreams(pMnode, &execInfo); } - initStreamNodeList(pMnode); + extractStreamNodeList(pMnode); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { @@ -310,7 +341,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pFailedTasks, &info); + addIntoCheckpointList(pFailedChkpt, &info); } } @@ -328,7 +359,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal - if (taosArrayGetSize(pFailedTasks) > 0) { + if (taosArrayGetSize(pFailedChkpt) > 0) { bool allReady = true; if (pMnode != NULL) { SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); @@ -339,8 +370,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { - SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); + for(int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) { + SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i); mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", pInfo->checkpointId, pInfo->transId); @@ -359,7 +390,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); - taosArrayDestroy(pFailedTasks); + taosArrayDestroy(pFailedChkpt); taosArrayDestroy(pOrphanTasks); { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 54279161ab..f276c70c06 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -597,4 +597,33 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { execInfo.pNodeList = pValidList; mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList)); +} + +int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { + void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + if (p == NULL) { + return TSDB_CODE_SUCCESS; + } + + taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { + taosArrayRemove(pExecNode->pTaskList, k); + + int32_t num = taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +void removeInvalidTasks(SArray *pTaskIds) { + for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) { + STaskId *pId = taosArrayGet(pTaskIds, i); + doRemoveTasks(&execInfo, pId); + } } \ No newline at end of file diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7f912d20eb..ad14a9b9bf 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -279,10 +279,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); - // we need to transfer state here. The transfer of state may generate new data that need to dispatch to downstream, - // to transfer the new data to downstream before checkpoint-trigger reaching the downstream tasks. - // Otherwise, those new generated data may be lost, if crash before next checkpoint data generatd, which the - // the new generated data is kept in outputQ, and failed to dispatch to downstream tasks. + // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks. + // The transfer of state may generate new data that need to dispatch to downstream tasks, + // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed + // before the next checkpoint. { bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); if (dropRelHTask) {