diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 6d2a89ddc9..6aed50e508 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -106,8 +106,8 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); -void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); -int32_t initStreamNodeList(SMnode *pMnode); +void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +int32_t extractStreamNodeList(SMnode *pMnode); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); @@ -121,6 +121,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); void mndInitExecInfo(); void removeExpiredNodeInfo(const SArray *pNodeSnapshot); +void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9c8f3f26ff..5fc6e465ad 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -56,13 +56,13 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static SArray *extractNodeListFromStream(SMnode *pMnode); +static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot); +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -792,7 +792,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. taosThreadMutexLock(&execInfo.lock); mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name); - saveStreamTasksInfo(&streamObj, &execInfo); + saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); // execute creation @@ -815,7 +815,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen); } else { char detail[1000] = {0}; - sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname); + snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname); auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail)); } @@ -827,6 +827,7 @@ _OVER: mndReleaseStream(pMnode, pStream); tFreeSCMCreateStreamReq(&createReq); tFreeStreamObj(&streamObj); + if (sql != NULL) { taosMemoryFreeClear(sql); } @@ -839,6 +840,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { void *pIter = NULL; SSdb *pSdb = pMnode->pSdb; int64_t maxChkptId = 0; + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; @@ -1031,10 +1033,9 @@ _ERR: return code; } -int32_t initStreamNodeList(SMnode *pMnode) { +int32_t extractStreamNodeList(SMnode *pMnode) { if (taosArrayGetSize(execInfo.pNodeList) == 0) { - execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); - execInfo.pNodeList = extractNodeListFromStream(pMnode); + extractNodeListFromStream(pMnode, execInfo.pNodeList); } return taosArrayGetSize(execInfo.pNodeList); @@ -1044,7 +1045,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { // check if the node update happens or not taosThreadMutexLock(&execInfo.lock); - int32_t numOfNodes = initStreamNodeList(pMnode); + int32_t numOfNodes = extractStreamNodeList(pMnode); if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = taosGetTimestampSec(); @@ -1071,22 +1072,23 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + + bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); taosArrayDestroy(pNodeSnapshot); if (nodeUpdated) { - mDebug("stream task not ready due to node update"); + mDebug("stream tasks not ready due to node update"); } taosThreadMutexUnlock(&execInfo.lock); return nodeUpdated; } -static int32_t mndCheckNodeStatus(SMnode *pMnode) { +static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { bool ready = true; - int64_t ts = taosGetTimestampSec(); if (taskNodeIsUpdated(pMnode)) { return -1; } @@ -1094,7 +1096,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { taosThreadMutexLock(&execInfo.lock); if (taosArrayGetSize(execInfo.pNodeList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); - execInfo.ts = ts; + ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); } for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { @@ -1148,12 +1150,13 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { int32_t code = 0; int32_t numOfCheckpointTrans = 0; - if ((code = mndCheckNodeStatus(pMnode)) != 0) { + if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) { return code; } SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval)); int64_t now = taosGetTimestampMs(); + while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { int64_t duration = now - pStream->checkpointFreq; if (duration < tsStreamCheckpointInterval * 1000) { @@ -1572,7 +1575,9 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (pe == NULL) { - mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId); + mError("task:0x%" PRIx64 " not exists in any vnodes, streamName:%s, streamId:0x%" PRIx64 " createTs:%" PRId64 + " no valid status/stage info", + id.taskId, pStream->name, pStream->uid, pStream->createTime); return -1; } @@ -2164,7 +2169,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return 0; } -static SArray *extractNodeListFromStream(SMnode *pMnode) { +static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2193,13 +2198,13 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { sdbRelease(pSdb, pStream); } - SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); + taosArrayClear(pNodeList); // convert to list pIter = NULL; while ((pIter = taosHashIterate(pHash, pIter)) != NULL) { SNodeEntry *pEntry = (SNodeEntry *)pIter; - taosArrayPush(plist, pEntry); + taosArrayPush(pNodeList, pEntry); char buf[256] = {0}; epsetToStr(&pEntry->epset, buf, tListLen(buf)); @@ -2207,27 +2212,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { } taosHashCleanup(pHash); - return plist; -} - -static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { - void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); - if (p == NULL) { - return TSDB_CODE_SUCCESS; - } - taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); - - for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); - if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { - taosArrayRemove(pExecNode->pTaskList, k); - - int32_t num = taosArrayGetSize(pExecNode->pTaskList); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num); - break; - } - } - return TSDB_CODE_SUCCESS; } @@ -2244,7 +2228,7 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) { return false; } -int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) { +int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) { SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); @@ -2262,10 +2246,7 @@ int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) { } } - for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { - STaskId *pId = taosArrayGet(pRemovedTasks, i); - doRemoveTasks(&execInfo, pId); - } + removeTasksInBuf(pRemovedTasks, &execInfo); mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), (int32_t)taosArrayGetSize(execInfo.pTaskList)); @@ -2292,7 +2273,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; taosThreadMutexLock(&execInfo.lock); - int32_t numOfNodes = initStreamNodeList(pMnode); + int32_t numOfNodes = extractStreamNodeList(pMnode); taosThreadMutexUnlock(&execInfo.lock); if (numOfNodes == 0) { @@ -2312,7 +2293,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } taosThreadMutexLock(&execInfo.lock); - removeExpiredNodeEntryAndTask(pNodeSnapshot); + + removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { @@ -2322,8 +2304,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { // keep the new vnode snapshot if success if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { - taosArrayDestroy(execInfo.pNodeList); - execInfo.pNodeList = extractNodeListFromStream(pMnode); + extractNodeListFromStream(pMnode, execInfo.pNodeList); execInfo.ts = ts; mDebug("create trans successfully, update cached node list, numOfNodes:%d", (int)taosArrayGetSize(execInfo.pNodeList)); @@ -2360,7 +2341,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { +void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { SStreamTask *pTask = streamTaskIterGetCurrent(pIter); @@ -2400,37 +2381,6 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { destroyStreamTaskIter(pIter); } -void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - taosThreadMutexLock(&pExecNode->lock); - - SStreamTaskIter *pIter = createStreamTaskIter(pStream); - while (streamTaskIterNextTask(pIter)) { - SStreamTask *pTask = streamTaskIterGetCurrent(pIter); - - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); - if (p != NULL) { - taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); - - for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); - if (pId->taskId == id.taskId && pId->streamId == id.streamId) { - taosArrayRemove(pExecNode->pTaskList, k); - - int32_t num = taosArrayGetSize(pExecNode->pTaskList); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num); - break; - } - } - } - } - - ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); - taosThreadMutexUnlock(&pExecNode->lock); - - destroyStreamTaskIter(pIter); -} - static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 778fd295f7..42efb6589e 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,7 +22,7 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void doExtractTasksFromStream(SMnode *pMnode) { +static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -33,11 +33,44 @@ static void doExtractTasksFromStream(SMnode *pMnode) { break; } - saveStreamTasksInfo(pStream, &execInfo); + saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo); sdbRelease(pSdb, pStream); } } +static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { + if (pMnode == NULL) { + return; + } + + int32_t num = taosArrayGetSize(pExecInfo->pTaskList); + + SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + SArray *pIdList = taosArrayInit(4, sizeof(STaskId)); + + for (int32_t i = 0; i < num; ++i) { + STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i); + + void* p = taosHashGet(pHash, &pId->streamId, sizeof(int64_t)); + if (p != NULL) { + continue; + } + + void* pObj = mndGetStreamObj(pMnode, pId->streamId); + if (pObj != NULL) { + mndReleaseStream(pMnode, pObj); + taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0); + } else { + taosArrayPush(pIdList, pId); + } + } + + removeTasksInBuf(pIdList, &execInfo); + + taosArrayDestroy(pIdList); + taosHashCleanup(pHash); +} + static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { @@ -230,7 +263,7 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - SArray *pFailedTasks = NULL; + SArray *pFailedChkpt = NULL; SArray *pOrphanTasks = NULL; if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { @@ -252,17 +285,21 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); - pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); - pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); + pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); taosThreadMutexLock(&execInfo.lock); // extract stream task list if (taosHashGetSize(execInfo.pTaskMap) == 0) { - doExtractTasksFromStream(pMnode); + addAllStreamTasksIntoBuf(pMnode, &execInfo); + } else { + // the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty. + // let's drop them here. + removeDroppedStreamTasksInBuf(pMnode, &execInfo); } - initStreamNodeList(pMnode); + extractStreamNodeList(pMnode); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { @@ -290,16 +327,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } else { // task is idle for more than 50 sec. - if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { - if (!pTaskEntry->inputQChanging) { - pTaskEntry->inputQUnchangeCounter++; - } else { - pTaskEntry->inputQChanging = false; - } - } else { - pTaskEntry->inputQChanging = true; - pTaskEntry->inputQUnchangeCounter = 0; - } +// if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { +// if (!pTaskEntry->inputQChanging) { +// pTaskEntry->inputQUnchangeCounter++; +// } else { +// pTaskEntry->inputQChanging = false; +// } +// } else { +// pTaskEntry->inputQChanging = true; +// pTaskEntry->inputQUnchangeCounter = 0; +// } streamTaskStatusCopy(pTaskEntry, p); @@ -310,7 +347,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pFailedTasks, &info); + addIntoCheckpointList(pFailedChkpt, &info); } } @@ -328,7 +365,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal - if (taosArrayGetSize(pFailedTasks) > 0) { + if (taosArrayGetSize(pFailedChkpt) > 0) { bool allReady = true; if (pMnode != NULL) { SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); @@ -339,8 +376,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { - SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); + for(int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) { + SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i); mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", pInfo->checkpointId, pInfo->transId); @@ -359,7 +396,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); tCleanupStreamHbMsg(&req); - taosArrayDestroy(pFailedTasks); + taosArrayDestroy(pFailedChkpt); taosArrayDestroy(pOrphanTasks); { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 54279161ab..d138254afd 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -26,6 +26,8 @@ struct SStreamTaskIter { SStreamTask *pTask; }; +int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); + SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) { SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); if (pIter == NULL) { @@ -597,4 +599,50 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { execInfo.pNodeList = pValidList; mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList)); +} + +int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { + void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + if (p == NULL) { + return TSDB_CODE_SUCCESS; + } + + taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); + + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == pRemovedId->taskId && pId->streamId == pRemovedId->streamId) { + taosArrayRemove(pExecNode->pTaskList, k); + + int32_t num = taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num); + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { + for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) { + STaskId *pId = taosArrayGet(pTaskIds, i); + doRemoveTasks(pExecInfo, pId); + } +} + +void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { + taosThreadMutexLock(&pExecNode->lock); + + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + doRemoveTasks(pExecNode, &id); + } + + ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); + taosThreadMutexUnlock(&pExecNode->lock); + + destroyStreamTaskIter(pIter); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 8480f204d6..c9365b4318 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -138,7 +138,6 @@ void initStreamExecInfo() { } void initNodeInfo() { - execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); SNodeEntry entry = {0}; entry.nodeId = 2; entry.stageUpdated = true; @@ -207,27 +206,32 @@ TEST_F(StreamTest, kill_checkpoint_trans) { killAllCheckpointTrans(pMnode, &info); - SStreamObj stream; - memset(&stream, 0, sizeof(SStreamObj)); + void* p = alloca(sizeof(SStreamObj) + sizeof(SSdbRow)); + SSdbRow* pRow = static_cast(p); + pRow->type = SDB_MAX; - stream.uid = defStreamId; - stream.lock = 0; - stream.tasks = taosArrayInit(1, POINTER_BYTES); - stream.pHTasksList = taosArrayInit(1, POINTER_BYTES); + SStreamObj* pStream = (SStreamObj*)((char*)p + sizeof(SSdbRow)); + + memset(pStream, 0, sizeof(SStreamObj)); + + pStream->uid = defStreamId; + pStream->lock = 0; + pStream->tasks = taosArrayInit(1, POINTER_BYTES); + pStream->pHTasksList = taosArrayInit(1, POINTER_BYTES); SArray* pLevel = taosArrayInit(1, POINTER_BYTES); SStreamTask* pTask = static_cast(taosMemoryCalloc(1, sizeof(SStreamTask))); pTask->id.streamId = defStreamId; pTask->id.taskId = 1; - pTask->exec.qmsg = (char*)taosMemoryMalloc(1); + pTask->exec.qmsg = (char*)taosMemoryCalloc(1,1); taosThreadMutexInit(&pTask->lock, NULL); taosArrayPush(pLevel, &pTask); - taosArrayPush(stream.tasks, &pLevel); - mndCreateStreamResetStatusTrans(pMnode, &stream); + taosArrayPush(pStream->tasks, &pLevel); + mndCreateStreamResetStatusTrans(pMnode, pStream); - tFreeStreamObj(&stream); + tFreeStreamObj(pStream); sdbCleanup(pMnode->pSdb); taosMemoryFree(pMnode); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 69be1c76c7..20bd1056f6 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -162,7 +162,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; - } else if (type == STREAM_INPUT__CHECKPOINT) { + } else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { SPackedData tmp = {.pDataBlock = input}; taosArrayPush(pInfo->pBlockLists, &tmp); pInfo->blockType = STREAM_INPUT__CHECKPOINT; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index daac98bbfc..8ab388830f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -989,13 +989,15 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa colDataSetNULL(pColInfo, pBlock->info.rows); } } - if (bFreeRow) { - taosMemoryFree(buf); - } + if (*(int32_t*)pStart != pStart - buf) { qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart, (int32_t)(pStart - buf)); - }; + } + + if (bFreeRow) { + taosMemoryFree(buf); + } pBlock->info.dataLoad = 1; pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e9861c29b0..b271c83678 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -454,7 +454,12 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub scanPathOptSetGroupOrderScan(info.pScan); } if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { - info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); + if (pCxt->pPlanCxt->streamQuery) { + info.pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; // always load all data for stream query + } else { + info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); + } + info.pScan->pDynamicScanFuncs = info.pDsoFuncs; } if (TSDB_CODE_SUCCESS == code && info.pScan) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 154f623b9d..f3ec01cf7a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -224,6 +224,8 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 typedef int32_t (*__stream_async_exec_fn_t)(void* param); int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code); +void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d42a3b545a..597d6035d6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -195,12 +195,6 @@ int32_t getCfIdx(const char* cfName) { bool isValidCheckpoint(const char* dir) { return true; - STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir); - if (pDb == NULL) { - return false; - } - taskDbDestroy(pDb, false); - return true; } int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 94d2198e31..ab3b5d6fa0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -276,6 +276,12 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock int8_t type = pTask->outputInfo.type; pActiveInfo->allUpstreamTriggerRecv = 1; + // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks. + // The transfer of state may generate new data that need to dispatch to downstream tasks, + // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed + // before the next checkpoint. + flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointTriggerBlock(pBlock, pTask); @@ -306,8 +312,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock streamTaskBuildCheckpoint(pTask); } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task - // already. And then, dispatch check point msg to all downstream tasks + + flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + + // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by + // this task already. And then, dispatch check point msg to all downstream tasks code = continueDispatchCheckpointTriggerBlock(pBlock, pTask); } } @@ -507,7 +516,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin if (pReq->dropRelHTask) { streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, (int32_t) pReq->hTaskId, numOfTasks); } streamMetaWLock(pMeta); @@ -526,6 +535,8 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { } static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { + char buf[128] = {0}; + char* file = taosMemoryCalloc(1, strlen(path) + 32); sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); @@ -537,12 +548,17 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); - char buf[128] = {0}; + if (pFile == NULL) { + stError("%s failed to open meta file:%s for checkpoint", id, file); + code = -1; + return code; + } + if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { - stError("chkp failed to read meta file:%s", file); + stError("%s failed to read meta file:%s for checkpoint", id, file); code = -1; } else { - int32_t len = strlen(buf); + int32_t len = strnlen(buf, tListLen(buf)); for (int i = 0; i < len; i++) { if (buf[i] == '\n') { char* item = taosMemoryCalloc(1, i + 1); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2e776313e0..e100dac808 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -246,6 +246,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReq); return code; } @@ -730,8 +731,8 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } else { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), quit from timer and clear " - "checkpoint-ready msg, ref:%d", + "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " + "and quit from timer, ref:%d", id, vgId, ref); streamClearChkptReadyMsg(pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 95634b2ff3..1828409f89 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,6 +24,7 @@ #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState* pState = streamTaskGetStatus(pTask); @@ -87,8 +88,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } -static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, - int32_t* totalBlocks) { +int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -541,12 +541,81 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc //static void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } static void setLastExecTs(SStreamTask* pTask, int64_t ts) { pTask->status.lastExecTs = ts; } +static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock) { + const char* id = pTask->id.idStr; + int32_t blockSize = 0; + int64_t st = taosGetTimestampMs(); + SCheckpointInfo* pInfo = &pTask->chkInfo; + int64_t ver = pInfo->processedVer; + + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, 1, "checkpoint-trigger"); + + doSetStreamInputBlock(pTask, pBlock, &ver, id); + + int64_t totalSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, + SIZE_IN_MiB(totalSize), totalBlocks); + + pTask->execInfo.outputDataBlocks += totalBlocks; + pTask->execInfo.outputDataSize += totalSize; + if (fabs(el - 0.0) <= DBL_EPSILON) { + pTask->execInfo.procsThroughput = 0; + pTask->execInfo.outputThroughput = 0; + } else { + pTask->execInfo.outputThroughput = (totalSize / el); + pTask->execInfo.procsThroughput = (blockSize / el); + } + + // update the currentVer if processing the submit blocks. + ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); + + if (ver != pInfo->processedVer) { + stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 + " ckpt:%" PRId64, + id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + pInfo->processedVer = ver; + } +} + +void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { + const char* id = pTask->id.idStr; + + // 1. transfer the ownership of executor state + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + if (dropRelHTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); + + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask != NULL) { + streamTaskReleaseState(pHTask); + streamTaskReloadState(pTask); + stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, + streamTaskGetStatus(pHTask)->name); + + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, + (int32_t)pHTaskId->taskId); + } + } else { + stDebug("s-task:%s no transfer-state needed", id); + } + + // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. + doStreamTaskExecImpl(pTask, pCheckpointBlock); +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ static int32_t doStreamExecTask(SStreamTask* pTask) { - const char* id = pTask->id.idStr; + const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. stDebug("s-task:%s start to extract data block from inputQ", id); @@ -628,63 +697,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } - if (type == STREAM_INPUT__CHECKPOINT) { - // transfer the state from fill-history to related stream task before generating the checkpoint. - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); - if (dropRelHTask) { - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - - STaskId* pHTaskId = &pTask->hTaskInfo.id; - SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); - if (pHTask != NULL) { - // 2. transfer the ownership of executor state - streamTaskReleaseState(pHTask); - streamTaskReloadState(pTask); - stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, - streamTaskGetStatus(pHTask)->name); - - streamMetaReleaseTask(pTask->pMeta, pHTask); - } else { - stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, - (int32_t)pHTaskId->taskId); - } - } - } - - int64_t st = taosGetTimestampMs(); - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type)); - - int64_t ver = pTask->chkInfo.processedVer; - doSetStreamInputBlock(pTask, pInput, &ver, id); - - int64_t totalSize = 0; - int32_t totalBlocks = 0; - streamTaskExecImpl(pTask, pInput, &totalSize, &totalBlocks); - - double el = (taosGetTimestampMs() - st) / 1000.0; - stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MiB(totalSize), totalBlocks); - - pTask->execInfo.outputDataBlocks += totalBlocks; - pTask->execInfo.outputDataSize += totalSize; - if (fabs(el - 0.0) <= DBL_EPSILON) { - pTask->execInfo.procsThroughput = 0; - pTask->execInfo.outputThroughput = 0; - } else { - pTask->execInfo.outputThroughput = (totalSize / el); - pTask->execInfo.procsThroughput = (blockSize / el); - } - - SCheckpointInfo* pInfo = &pTask->chkInfo; - - // update the currentVer if processing the submit blocks. - ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); - - if (ver != pInfo->processedVer) { - stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 - " ckpt:%" PRId64, - id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); - pInfo->processedVer = ver; + if (type != STREAM_INPUT__CHECKPOINT) { + doStreamTaskExecImpl(pTask, pInput); } streamFreeQitem(pInput); @@ -692,7 +706,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { - // todo add lock SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { @@ -717,8 +730,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } } - - return 0; } // the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 25015c4d33..adefe97f1f 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -298,6 +298,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); if (code != 0) { + taosArrayDestroy(pSnapInfoSet); return -1; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 834daf15d0..dc9d2166e6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -624,18 +624,19 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); - if (pInfo != NULL) { + if ((pInfo != NULL) && pInfo->dataAllowed) { pInfo->dataAllowed = false; int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); + ASSERT(t <= streamTaskGetNumOfUpstream(pTask)); } } void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) { SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); - if (pInfo != NULL) { - pInfo->dataAllowed = true; + if ((pInfo != NULL) && (!pInfo->dataAllowed)) { int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); ASSERT(t >= 0); + pInfo->dataAllowed = true; } }