From 9a852b723d590bb4eff979401d163879fd7f870d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Nov 2023 23:26:16 +0800 Subject: [PATCH 01/25] refactor: refactor the stream trans conflict check --- source/dnode/mnode/impl/inc/mndDef.h | 3 +- source/dnode/mnode/impl/inc/mndStream.h | 34 ++- source/dnode/mnode/impl/src/mndScheduler.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 264 +++++++++++++-------- 4 files changed, 189 insertions(+), 114 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f4236964ca..ddb6841238 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -669,8 +669,7 @@ typedef struct SStreamConf { } SStreamConf; typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - // ctl + char name[TSDB_STREAM_FNAME_LEN]; SRWLatch lock; // create info diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 19fd2a3fd4..244a6d08dd 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -22,17 +22,37 @@ extern "C" { #endif -int32_t mndInitStream(SMnode *pMnode); -void mndCleanupStream(SMnode *pMnode); +typedef struct SStreamTransInfo { + int64_t startTime; + int32_t transId; + const char *name; +} SStreamTransInfo; +typedef struct SStreamTransMgmt { + SHashObj *pDBTrans; +} SStreamTransMgmt; + +typedef struct SStreamExecInfo { + SArray *pNodeList; + int64_t ts; // snapshot ts + SStreamTransMgmt transMgmt; + int64_t activeCheckpoint; // active check point id + SHashObj * pTaskMap; + SArray * pTaskList; + TdThreadMutex lock; +} SStreamExecInfo; + +extern SStreamExecInfo execInfo; + +int32_t mndInitStream(SMnode *pMnode); +void mndCleanupStream(SMnode *pMnode); SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); +int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); -SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); - -int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); -int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb); +bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 88a06cb513..3ef4c9a4d2 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -28,7 +28,7 @@ extern bool tsDeployOnSnode; static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, - SEpSet* pEpset, bool isFillhistory); + SEpSet* pEpset, bool isFillhistory); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7c3c54537d..96199d0af5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -33,7 +33,13 @@ #define MND_STREAM_VER_NUMBER 4 #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_MAX_NUM 60 -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" + +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" +#define MND_STREAM_PAUSE_NAME "stream-pause" +#define MND_STREAM_RESUME_NAME "stream-resume" +#define MND_STREAM_DROP_NAME "stream-drop" +#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" +#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" typedef struct SNodeEntry { int32_t nodeId; @@ -42,22 +48,13 @@ typedef struct SNodeEntry { int64_t hbTimestamp; // second } SNodeEntry; -typedef struct SStreamExecInfo { - SArray *pNodeList; - int64_t ts; // snapshot ts - int64_t activeCheckpoint; // active check point id - SHashObj * pTaskMap; - SArray * pTaskList; - TdThreadMutex lock; -} SStreamExecInfo; - typedef struct SVgroupChangeInfo { SHashObj *pDBMap; SArray * pUpdateNodeList; // SArray } SVgroupChangeInfo; -static int32_t mndNodeCheckSentinel = 0; -static SStreamExecInfo execInfo; +static int32_t mndNodeCheckSentinel = 0; +SStreamExecInfo execInfo; static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); @@ -82,17 +79,20 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name); +static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode); static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); +static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); +static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); + int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -334,7 +334,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 .pAstRoot = pAst, .topicQuery = false, .streamQuery = true, - .triggerType = triggerType == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, + .triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, .watermark = watermark, }; code = qCreateQueryPlan(&cxt, &pPlan, NULL); @@ -714,6 +714,52 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) return 0; } +static bool checkDbPrecision(SMnode* pMnode, SStreamObj* pStreamObj) { + if (pStreamObj->sourceDbUid != pStreamObj->targetDbUid) { + SDbObj *pSrcDb = mndAcquireDb(pMnode, pStreamObj->sourceDb); + SDbObj *pDstDb = mndAcquireDb(pMnode, pStreamObj->targetDb); + + bool isIdentical = (pSrcDb->cfg.precision != pDstDb->cfg.precision); + mndReleaseDb(pMnode, pSrcDb); + mndReleaseDb(pMnode, pDstDb); + + if (!isIdentical) { + mError("stream:%s failed to create since target/source db precision not identical", pStreamObj->name); + return false; + } + } + + return true; +} + +static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks + int32_t numOfStream = 0; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { + if (pStream->sourceDbUid == pStreamObj->sourceDbUid) { + ++numOfStream; + } + + sdbRelease(pMnode->pSdb, pStream); + + if (numOfStream > MND_STREAM_MAX_NUM) { + mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_MND_TOO_MANY_STREAMS; + } + + if (pStream->targetStbUid == pStreamObj->targetStbUid) { + mError("Cannot write the same stable as other stream:%s", pStream->name); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_MND_INVALID_TARGET_TABLE; + } + } + + return TSDB_CODE_SUCCESS; +} + static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode * pMnode = pReq->info.node; int32_t code = -1; @@ -726,6 +772,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; } + #ifdef WINDOWS terrno = TSDB_CODE_MND_INVALID_PLATFORM; goto _OVER; @@ -757,42 +804,15 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - { - int32_t numOfStream = 0; + // check if the time precision for source&target DB is identical. + bool isIdentical = checkDbPrecision(pMnode, &streamObj); + if (!isIdentical) { + goto _OVER; + } - SStreamObj *pStream = NULL; - void * pIter = NULL; - - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - if (numOfStream > MND_STREAM_MAX_NUM) { - mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); - terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; - goto _OVER; - } - break; - } - - if (pStream->sourceDbUid == streamObj.sourceDbUid) { - ++numOfStream; - } - - sdbRelease(pMnode->pSdb, pStream); - if (numOfStream > MND_STREAM_MAX_NUM) { - mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); - terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; - sdbCancelFetch(pMnode->pSdb, pIter); - goto _OVER; - } - - if (pStream->targetStbUid == streamObj.targetStbUid) { - mError("Cannot write the same stable as other stream:%s", pStream->name); - terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; - sdbCancelFetch(pMnode->pSdb, pIter); - goto _OVER; - } - } + code = checkForNumOfStreams(pMnode, &streamObj); + if (code != TSDB_CODE_SUCCESS) { + goto _OVER; } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); @@ -851,7 +871,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); mDebug("stream tasks register into node list"); - keepStreamTasksInBuf(&streamObj, &execInfo); + saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -1224,7 +1244,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, MND_STREAM_CHECKPOINT_NAME); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME); if (pTrans == NULL) { mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; @@ -1233,7 +1253,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); const char *pDb = mndGetStreamDB(pMnode); - mndTransSetDbName(pTrans, pDb, "checkpoint"); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1300,7 +1320,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream"); + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (!conflict) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1308,7 +1335,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); + mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1318,6 +1345,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->sourceDb, pStream->targetDb); + // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); @@ -1347,7 +1376,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - // reuse this function for stream auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen); @@ -1751,9 +1779,16 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (!conflict) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); if (pTrans == NULL) { - mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr()); + mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } @@ -1767,6 +1802,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); + // pause all tasks if (mndPauseAllStreamTasks(pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); @@ -1868,13 +1905,21 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); - if (pTrans == NULL) { - mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr()); + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (!conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } - mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_RESUME_NAME); + if (pTrans == NULL) { + mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1883,6 +1928,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb); + // resume all tasks if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); @@ -2147,7 +2194,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // here create only one trans if (pTrans == NULL) { - pTrans = doCreateTrans(pMnode, pStream, "stream-task-update"); + pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); if (pTrans == NULL) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); @@ -2257,7 +2304,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) { break; } - keepStreamTasksInBuf(pStream, &execInfo); + saveStreamTasksInfo(pStream, &execInfo); sdbRelease(pSdb, pStream); } } @@ -2341,6 +2388,14 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { return 0; } +static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInfo) { + void* pIter = NULL; + while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { + char* pDb = (char*) pIter; + killActiveCheckpointTrans(pMnode, pDb); + } +} + // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; @@ -2382,7 +2437,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { // kill current active checkpoint transaction, since the transaction is vnode wide. - doKillActiveCheckpointTrans(pMnode); + killAllCheckpointTrans(pMnode, &changeInfo); + code = mndProcessVgroupChange(pMnode, &changeInfo); // keep the new vnode snapshot @@ -2428,7 +2484,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { +void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { @@ -2471,8 +2527,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); if (pId->taskId == id.taskId && pId->streamId == id.streamId) { taosArrayRemove(pExecNode->pTaskList, k); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, - (int32_t)taosArrayGetSize(pExecNode->pTaskList)); + + int32_t num = taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num); break; } } @@ -2483,15 +2540,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name); +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name); if (pTrans == NULL) { mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); + mDebug("s-task:0x%"PRIx64" start to build trans %s", pStream->uid, pMsg); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2506,7 +2563,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) { } int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { - STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset"); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); if (pTrans == NULL) { return terrno; } @@ -2566,43 +2623,36 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { - int32_t transId = 0; - SSdb * pSdb = pMnode->pSdb; - STrans *pTrans = NULL; - void * pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); - if (pIter == NULL) { - break; - } - - if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) { - transId = pTrans->id; - sdbRelease(pSdb, pTrans); - sdbCancelFetch(pSdb, pIter); - break; - } - - sdbRelease(pSdb, pTrans); - } - - if (transId == 0) { - mDebug("failed to find the checkpoint trans, reset not executed"); +int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) { + // data in the hash table will be removed automatically, no need to remove it here. + SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName)); + if (pTransInfo == NULL) { return TSDB_CODE_SUCCESS; } - pTrans = mndAcquireTrans(pMnode, transId); - mInfo("kill checkpoint trans:%d", transId); + // not checkpoint trans, ignore + if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { + mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); + return TSDB_CODE_SUCCESS; + } + + STrans* pTrans = mndAcquireTrans(pMnode, pTransInfo->transId); + if (pTrans != NULL) { + mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDBName); + mndKillTrans(pMnode, pTrans); + mndReleaseTrans(pMnode, pTrans); + } - mndKillTrans(pMnode, pTrans); - mndReleaseTrans(pMnode, pTrans); return TSDB_CODE_SUCCESS; } -int32_t mndResetFromCheckpoint(SMnode *pMnode) { - doKillActiveCheckpointTrans(pMnode); +int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { + STrans* pTrans = mndAcquireTrans(pMnode, transId); + if (pTrans != NULL) { + mInfo("kill checkpoint transId:%d to reset task status", transId); + mndKillTrans(pMnode, pTrans); + mndReleaseTrans(pMnode, pTrans); + } // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. SSdb * pSdb = pMnode->pSdb; @@ -2614,7 +2664,13 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) { break; } - // todo this transaction should exist be only one + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (conflict) { + mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", + pStream->name, pStream->sourceDb, pStream->targetDb); + continue; + } + mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); int32_t code = createStreamResetStatusTrans(pMnode, pStream); if (code != TSDB_CODE_SUCCESS) { @@ -2741,7 +2797,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint); - mndResetFromCheckpoint(pMnode); + mndResetStatusFromCheckpoint(pMnode, activeCheckpointId); } else { mInfo("not all vgroups are ready, wait for next HB from stream tasks"); } From 9f6b0d25c5debc3bf68ae68c5cf34ee839571fd4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Nov 2023 23:59:05 +0800 Subject: [PATCH 02/25] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 103 +++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 source/dnode/mnode/impl/src/mndStreamTrans.c diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c new file mode 100644 index 0000000000..aec72339e0 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndTrans.h" +#include "mndStream.h" + +typedef struct SKeyInfo { + void* pKey; + int32_t keyLen; +} SKeyInfo; + +static int32_t clearFinishedTrans(SMnode* pMnode); + +int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb) { + SStreamTransInfo info = {.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pName}; + taosHashPut(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb), &info, sizeof(SStreamTransInfo)); + + if (strcmp(pSrcDb, pDstDb) != 0) { + taosHashPut(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb), &info, sizeof(SStreamTransInfo)); + } + + return 0; +} + +int32_t clearFinishedTrans(SMnode* pMnode) { + SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); + size_t keyLen = 0; + + taosThreadMutexLock(&execInfo.lock); + + void* pIter = NULL; + while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { + SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter; + STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId); + + // let's clear the finished trans + if (pTrans == NULL) { + void* pKey = taosHashGetKey(pEntry, &keyLen); + // key is the name of src/dst db name + SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; + + mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name, + pEntry->startTime); + taosArrayPush(pList, &info); + } else { + mndReleaseTrans(pMnode, pTrans); + } + } + + size_t num = taosArrayGetSize(pList); + for(int32_t i = 0; i < num; ++i) { + SKeyInfo* pKey = taosArrayGet(pList, i); + taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen); + } + + mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); + taosThreadMutexUnlock(&execInfo.lock); + + terrno = TSDB_CODE_SUCCESS; + taosArrayDestroy(pList); + return 0; +} + +bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) { + clearFinishedTrans(pMnode); + + taosThreadMutexLock(&execInfo.lock); + int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); + if (num <= 0) { + taosThreadMutexUnlock(&execInfo.lock); + return false; + } + + SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); + if (pEntry != NULL) { + taosThreadMutexUnlock(&execInfo.lock); + return true; + } + + pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); + if (pEntry != NULL) { + taosThreadMutexUnlock(&execInfo.lock); + return true; + } + + taosThreadMutexUnlock(&execInfo.lock); + return false; +} + + + From 18333d7347ac02b81fc3fd5382f9d50a19965e05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Nov 2023 15:48:00 +0800 Subject: [PATCH 03/25] fix(stream): send msg by using epset extract from mnode, instead of cached epset in streamObj. --- source/dnode/mnode/impl/src/mndStream.c | 52 +++++++++++++++++-------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 96199d0af5..20777e12e0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -678,7 +678,7 @@ _OVER: return -1; } -static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -690,7 +690,12 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { pReq->streamId = pTask->id.streamId; STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. + initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -706,7 +711,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPersistTaskDropReq(pTrans, pTask) < 0) { + if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) { return -1; } } @@ -1105,9 +1110,10 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream STransAction action = {0}; SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); - mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); @@ -1303,12 +1309,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (dropReq.igNotExists) { - mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name); + mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); sdbRelease(pMnode->pSdb, pStream); tFreeSMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + mError("stream:%s not exist failed to drop", dropReq.name); tFreeSMDropStreamReq(&dropReq); return -1; } @@ -1688,7 +1695,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndPauseStreamTask(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) { SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), @@ -1701,8 +1708,12 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1710,7 +1721,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { +int32_t mndPauseAllStreamTasks(SMnode* pMnode, STrans *pTrans, SStreamObj *pStream) { SArray *tasks = pStream->tasks; int32_t size = taosArrayGetSize(tasks); @@ -1719,7 +1730,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPauseStreamTask(pTrans, pTask) < 0) { + if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) { return -1; } @@ -1805,7 +1816,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); // pause all tasks - if (mndPauseAllStreamTasks(pTrans, pStream) < 0) { + if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1832,7 +1843,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { +static int32_t mndResumeStreamTask(STrans *pTrans, SMnode* pMnode, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1843,8 +1854,12 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig pReq->streamId = pTask->id.streamId; pReq->igUntreated = igUntreated; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1852,14 +1867,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig return 0; } -int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) { +int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode* pMnode, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { return -1; } @@ -1868,7 +1883,6 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn } } } - // pStream->pHTasksList is null return 0; } @@ -1931,7 +1945,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb); // resume all tasks - if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) { + if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2591,8 +2605,12 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); taosWUnLockLatch(&pStream->lock); From 4af844ee616eab5c0aef146f87480f3ada4ff245 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Nov 2023 23:35:09 +0800 Subject: [PATCH 04/25] fix(stream): fix the checkpoint msg failure. --- source/libs/stream/src/streamDispatch.c | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6bb15dfd23..9e929bb6c0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1109,8 +1109,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { - stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); + // todo handle the agg task failure, add test case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER && + pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 + ", set the current checkpoint failed, and send rsp to mnode", + id, pTask->checkpointingId); + { // send checkpoint failure msg to mnode directly + pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id + pTask->checkpointingId = pTask->checkpointingId; + streamTaskSendCheckpointReadyMsg(pTask); + } + } else { + stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } } } From 62c78771760863403e046870df9ae87242b6b4c5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 11:13:06 +0800 Subject: [PATCH 05/25] fix(tsdb): fix invalid read. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b4f4891ca1..7d7432553f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -572,7 +572,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN if (isEmptyQueryTimeWindow(&w)) { k += 1; - continue; + + if (k >= numOfTables) { + break; + } else { + continue; + } } // 1. time range check From 51897a679fdeccc880f62cdb1be07b21453f22fc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 11:21:31 +0800 Subject: [PATCH 06/25] fix(stream): fix invalid read. --- source/dnode/mnode/impl/src/mndStream.c | 13 ++++++++----- source/libs/stream/src/streamDispatch.c | 7 +++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 20777e12e0..b1b6dccf35 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -87,7 +87,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName); +static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); @@ -1259,7 +1259,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); const char *pDb = mndGetStreamDB(pMnode); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pDb, pDb); taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2406,7 +2406,10 @@ static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInf void* pIter = NULL; while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { char* pDb = (char*) pIter; - killActiveCheckpointTrans(pMnode, pDb); + + size_t len = 0; + void* pKey = taosHashGetKey(pDb, &len); + killActiveCheckpointTrans(pMnode, pKey, len); } } @@ -2641,9 +2644,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) { +int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) { // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName)); + SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); if (pTransInfo == NULL) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9e929bb6c0..545d6310f7 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1068,6 +1068,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; int32_t msgId = pTask->execInfo.dispatch; +#if 0 + // for test purpose, build the failure case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) { + pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED; + } +#endif + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); From f299bdb387d8c0cd573126bf0a100ec75046b50d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 14:29:01 +0800 Subject: [PATCH 07/25] fix(stream): register checkpoint trans. --- source/dnode/mnode/impl/src/mndStream.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b1b6dccf35..cadf369c5b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1262,6 +1262,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mndTransSetDbName(pTrans, pDb, pDb); taosMemoryFree((void *)pDb); + mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1815,7 +1817,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); - // pause all tasks + // if nodeUpdate happened, not send pause trans if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); From 1b1ad3068dbf746f1bc0256306738366de3afd52 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:02:56 +0800 Subject: [PATCH 08/25] fix(stream): check node status before start pause trans. --- include/libs/stream/tstream.h | 4 - source/dnode/mnode/impl/src/mndStream.c | 101 +++++++++++++++--------- 2 files changed, 62 insertions(+), 43 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3579d62eda..b535a44e61 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -140,8 +140,6 @@ typedef enum EStreamTaskEvent { TASK_EVENT_RESUME = 0x9, TASK_EVENT_HALT = 0xA, TASK_EVENT_DROPPING = 0xB, - TASK_EVENT_SCAN_TSDB = 0xC, - TASK_EVENT_SCAN_WAL = 0xD, } EStreamTaskEvent; typedef struct { @@ -316,7 +314,6 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t keepTaskStatus; bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it - int8_t pauseAllowed; // allowed task status to be set to be paused int32_t timerActive; // timer is active int32_t inScanHistorySentinel; } SStreamStatus; @@ -789,7 +786,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamRestoreParam(SStreamTask* pTask); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask); -void streamTaskEnablePause(SStreamTask* pTask); int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask); void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cadf369c5b..d3f96d46d2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1176,6 +1176,50 @@ static int32_t initStreamNodeList(SMnode* pMnode) { return taosArrayGetSize(execInfo.pNodeList); } +static bool taskNodeIsUpdated(SMnode* pMnode) { + // check if the node update happens or not + taosThreadMutexLock(&execInfo.lock); + int32_t numOfNodes = initStreamNodeList(pMnode); + + if (numOfNodes == 0) { + mDebug("stream task node change checking done, no vgroups exist, do nothing"); + execInfo.ts = taosGetTimestampSec(); + taosThreadMutexUnlock(&execInfo.lock); + return false; + } + + for (int32_t i = 0; i < numOfNodes; ++i) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); + if (pNodeEntry->stageUpdated) { + mDebug("stream task not ready due to node update detected, checkpoint not issued"); + taosThreadMutexUnlock(&execInfo.lock); + return true; + } + } + + bool allReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + if (!allReady) { + mWarn("not all vnodes ready"); + taosArrayDestroy(pNodeSnapshot); + taosThreadMutexUnlock(&execInfo.lock); + return 0; + } + + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); + bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + taosArrayDestroy(changeInfo.pUpdateNodeList); + taosHashCleanup(changeInfo.pDBMap); + taosArrayDestroy(pNodeSnapshot); + + if (nodeUpdated) { + mDebug("stream task not ready due to node update"); + } + + taosThreadMutexUnlock(&execInfo.lock); + return nodeUpdated; +} + static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode * pMnode = pReq->info.node; SSdb * pSdb = pMnode->pSdb; @@ -1183,50 +1227,23 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; - { // check if the node update happens or not - int64_t ts = taosGetTimestampSec(); - - taosThreadMutexLock(&execInfo.lock); - int32_t numOfNodes = initStreamNodeList(pMnode); - taosThreadMutexUnlock(&execInfo.lock); - - if (numOfNodes == 0) { - mDebug("stream task node change checking done, no vgroups exist, do nothing"); - execInfo.ts = ts; - return 0; - } - - for(int32_t i = 0; i < numOfNodes; ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i); - if (pNodeEntry->stageUpdated) { - mDebug("stream task not ready due to node update detected, checkpoint not issued"); - return 0; - } - } - - bool allReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); - if (!allReady) { - mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot); - return 0; - } - - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - taosArrayDestroy(changeInfo.pUpdateNodeList); - taosHashCleanup(changeInfo.pDBMap); - taosArrayDestroy(pNodeSnapshot); - - if (nodeUpdated) { - mDebug("stream task not ready due to node update, checkpoint not issued"); - return 0; - } + // check if the node update happens or not + bool updated = taskNodeIsUpdated(pMnode); + if (updated) { + mWarn("checkpoint ignore, stream task nodes update detected"); } { // check if all tasks are in TASK_STATUS__READY status bool ready = true; - taosThreadMutexLock(&execInfo.lock); + + // no streams exists, abort + int32_t numOfTasks = taosArrayGetSize(execInfo.pTaskList); + if (numOfTasks <= 0) { + taosThreadMutexUnlock(&execInfo.lock); + return 0; + } + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId * p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); @@ -1799,6 +1816,12 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } + bool updated = taskNodeIsUpdated(pMnode); + if (updated) { + mError("tasks are not ready for pause, node update detected"); + return -1; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); if (pTrans == NULL) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); From 9d2256ec9cb5ca632979661c4a4d381f4d765f36 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:03:23 +0800 Subject: [PATCH 09/25] refactor: do some internal refactor. --- include/libs/executor/storageapi.h | 56 +++------------------------ source/dnode/vnode/src/tq/tq.c | 6 --- source/libs/stream/src/streamTaskSm.c | 49 ++++++++--------------- 3 files changed, 23 insertions(+), 88 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index dbcd682dab..d0b9722552 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -149,19 +149,6 @@ typedef struct { int32_t colNum; } SMetaStbStats; -// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -// int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); -// int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); -// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); -// bool tqCurrentBlockConsumed(const STqReader* pReader); -// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -// bool tqNextBlockInWal(STqReader* pReader, const char* idstr); -// bool tqNextBlockImpl(STqReader *pReader, const char* idstr); -// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t -// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext -// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx); - // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ typedef struct TsdReader { @@ -196,27 +183,6 @@ typedef struct SStoreCacheReader { // clang-format on /*------------------------------------------------------------------------------------------------------------------*/ -/* -void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); -int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); -int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); -bool tqCurrentBlockConsumed(const STqReader* pReader); - -int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -bool tqNextBlockInWal(STqReader* pReader, const char* idstr); -bool tqNextBlockImpl(STqReader *pReader, const char* idstr); - - int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr); -STqReader *tqReaderOpen(void *pVnode); -void tqReaderClose(STqReader *); - -int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); -bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -SWalReader* tqGetWalReader(STqReader* pReader); -int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); -*/ // todo rename typedef struct SStoreTqReader { struct STqReader* (*tqReaderOpen)(); @@ -279,28 +245,18 @@ typedef struct SStoreMeta { void* (*storeGetIndexInfo)(); void* (*getInvertIndex)(void* pVnode); - int32_t (*getChildTableList)( - void* pVnode, int64_t suid, - SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList + // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] + int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list); + int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); void* storeGetVersionRange; void* storeGetLastTimestamp; int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema + int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); + void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); - // db name, vgId, numOfTables, numOfSTables - int32_t (*getNumOfChildTables)( - void* pVnode, int64_t uid, int64_t* numOfTables, - int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); - void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, - int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & - // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); int64_t (*getNumOfRowsInMem)(void* pVnode); - /** -int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); -int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); -int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); - */ + SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 904dd49227..6c7d07d34d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1133,16 +1133,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // let's decide which step should be executed now if (pTask->execInfo.step1Start == 0) { - ASSERT(pTask->status.pauseAllowed == false); int64_t ts = taosGetTimestampMs(); pTask->execInfo.step1Start = ts; tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts); - - // NOTE: in case of stream task, scan-history data in wal is not allowed to pause - if (pTask->info.fillHistory == 1) { - streamTaskEnablePause(pTask); - } } else { if (pTask->execInfo.step2Start == 0) { tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs", diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 04b449aaaf..1c951e1452 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -269,6 +269,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); taosMsleep(100); } else { + // no active event trans exists, handle this event directly pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); @@ -451,60 +452,43 @@ int32_t initStateTransferTable() { return TSDB_CODE_SUCCESS; } +//clang-format off void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, - streamTaskInitStatus, onNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, - streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, - streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, - NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, - NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, &info, true); - taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, - &info, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // checkpoint related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, - streamTaskDoCheckpoint, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = - createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // pause & resume related event handle @@ -571,4 +555,5 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); -} \ No newline at end of file +} +//clang-format on \ No newline at end of file From d7fb9c5b6e2201b232b91c71aad6b5371b35d54a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:10:26 +0800 Subject: [PATCH 10/25] fix(stream): add missing pushed data. --- source/libs/stream/src/streamStart.c | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 97eb7b79a2..9b27281915 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -168,7 +168,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { } else if (level == TASK_LEVEL__AGG) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); - streamTaskEnablePause(pTask); } } else if (level == TASK_LEVEL__SINK) { stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); @@ -345,7 +344,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus); } - streamTaskEnablePause(pTask); return TSDB_CODE_SUCCESS; } @@ -659,9 +657,6 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { streamMetaCommit(pMeta); streamMetaWUnLock(pMeta); - // history data scan in the stream time window finished, now let's enable the pause - streamTaskEnablePause(pTask); - // for source tasks, let's continue execute. if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { streamSchedExec(pTask); @@ -1040,11 +1035,6 @@ void streamTaskResume(SStreamTask* pTask) { } } -void streamTaskEnablePause(SStreamTask* pTask) { - stDebug("s-task:%s enable task pause", pTask->id.idStr); - pTask->status.pauseAllowed = 1; -} - static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { int32_t vgId = pMeta->vgId; void* pIter = NULL; From f7ee65d059daa12a4a11a83c0ad47c8aeb438989 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:31:07 +0800 Subject: [PATCH 11/25] fix(stream): fix send msg error. --- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamDispatch.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 48b6486e05..ed59f36af8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -297,7 +297,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId && p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 545d6310f7..505fc6d099 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1125,7 +1125,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i { // send checkpoint failure msg to mnode directly pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id pTask->checkpointingId = pTask->checkpointingId; - streamTaskSendCheckpointReadyMsg(pTask); + streamTaskSendCheckpointSourceRsp(pTask); } } else { stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, From 41aa175099d37e795f99f2acee5b00a40c23479e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:53:51 +0800 Subject: [PATCH 12/25] fix(stream): add return. --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d3f96d46d2..7ada3c33fd 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1231,6 +1231,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { bool updated = taskNodeIsUpdated(pMnode); if (updated) { mWarn("checkpoint ignore, stream task nodes update detected"); + return -1; } { // check if all tasks are in TASK_STATUS__READY status From 298755cc4efca4554e253a471c58e77386344849 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 16:34:47 +0800 Subject: [PATCH 13/25] fix(stream): wait for task to be normal and then send data block. --- source/libs/stream/src/streamDispatch.c | 33 ++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 505fc6d099..763d626774 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1039,6 +1039,25 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { return 0; } +static void dispatchDataInFuture(void* param, void* tmrId) { + SStreamTask* pTask = param; + if (streamTaskShouldStop(pTask)) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + return; + } + + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); + taosTmrReset(doRetryDispatchData, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); + streamDispatchStreamBlock(pTask); + } +} + // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); @@ -1059,7 +1078,19 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); + ETaskStatus status = streamTaskGetStatus(pTask, NULL); + if (status == TASK_STATUS__CK) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); + if (pTask->msgInfo.pTimer != NULL) { + taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + } else { + pTask->msgInfo.pTimer = taosTmrStart(dispatchDataInFuture, 500, pTask, streamEnv.timer); + } + } else { + streamDispatchStreamBlock(pTask); + } + return 0; } From 957ce07dc4d4a1c364cd939cae42f78a44f881df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 17:00:50 +0800 Subject: [PATCH 14/25] fix(stream): fix syntax error. --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 763d626774..81c3ba9c23 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1050,7 +1050,7 @@ static void dispatchDataInFuture(void* param, void* tmrId) { ETaskStatus status = streamTaskGetStatus(pTask, NULL); if (status == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint status, wait for 500ms to dispatch data downstream", pTask->id.idStr); - taosTmrReset(doRetryDispatchData, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + taosTmrReset(dispatchDataInFuture, 500, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); } else { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start to dispatch data, jump out of timer, ref:%d", pTask->id.idStr, ref); From 466c80e3700626f707136b951e6f43902d33eef7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 17:34:47 +0800 Subject: [PATCH 15/25] fix(stream): fix bug in delay send. --- source/libs/stream/src/streamDispatch.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 81c3ba9c23..1f9b21becc 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1061,7 +1061,11 @@ static void dispatchDataInFuture(void* param, void* tmrId) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); + + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + pTask->msgInfo.pData = NULL; + pTask->msgInfo.dispatchMsgType = 0; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; @@ -1078,8 +1082,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline - ETaskStatus status = streamTaskGetStatus(pTask, NULL); - if (status == TASK_STATUS__CK) { + if (delayDispatch) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s in checkpoint status, add in timer, try dispatch data in 500ms, ref:%d", pTask->id.idStr, ref); if (pTask->msgInfo.pTimer != NULL) { From 3edb25a49ea76b157276d115cb8914c8b2c31ff9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Nov 2023 23:18:07 +0800 Subject: [PATCH 16/25] fix(stream): add check for assert. --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6c7d07d34d..f0a69e0c40 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1223,7 +1223,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pStreamTask); } else { STimeWindow* pWindow = &pTask->dataRange.window; - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) || streamTaskShouldStop(pTask)); // Not update the fill-history time window until the state transfer is completed. tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64 From b7347a1d64cc6bf72ebb6bec39bc9e1fad4957d7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Nov 2023 15:57:46 +0800 Subject: [PATCH 17/25] fix(stream): remove invalid assert --- source/libs/stream/src/streamMeta.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 40f9ab54fe..2b3a05e041 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -488,7 +488,6 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) if (ref > 0) { stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { - ASSERT(streamTaskShouldStop(pTask)); stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); tFreeStreamTask(pTask); } else if (ref < 0) { @@ -1072,7 +1071,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; - stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId, + stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount); streamMetaWLock(pMeta); From 8204db13aaae0024bcb59fbdad7903cc431d4f29 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 11:46:11 +0800 Subject: [PATCH 18/25] enh(stream): add sink_quota column for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index dc584437bf..5c57f13091 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -155,6 +155,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7ada3c33fd..6c0b4577ed 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1491,7 +1491,6 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB if (pShow->pIter == NULL) break; SColumnInfoData *pColInfo; - SName n; int32_t cols = 0; char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; From 6fca8c170c20de8aedbe62398b28409ce9e3c975 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 11:59:49 +0800 Subject: [PATCH 19/25] enh(stream): add sink_quota/scan-history-idle-duration column for stream tasks. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5c57f13091..5d6486ec1a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -156,6 +156,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema streamTaskSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6c0b4577ed..6755204622 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1543,6 +1543,21 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; + sinkQuota[0] = '0'; + char dstStr[20] = {0}; + STR_TO_VARSTR(dstStr, sinkQuota) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + + char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0}; + strcpy(scanHistoryIdle, "100a"); + + memset(dstStr, 0, tListLen(dstStr)); + STR_TO_VARSTR(dstStr, scanHistoryIdle) + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*) dstStr, false); + numOfRows++; sdbRelease(pSdb, pStream); } From 0b1fd192ad594ac8fc93855723185a2506cbe0e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 13:06:12 +0800 Subject: [PATCH 20/25] fix(test): adjust test case accordingly. --- tests/system-test/0-others/information_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 51347f5f64..8dd1c3e168 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -217,7 +217,7 @@ class TDTestCase: tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkEqual(198, len(tdSql.queryResult)) + tdSql.checkEqual(210, len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) From 00f81032629b5f4ea4d8a3c3f0492fdab120f18a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 15:26:50 +0800 Subject: [PATCH 21/25] fix(stream): fix error in generating token in bucket. --- source/libs/stream/inc/streamInt.h | 3 ++- source/libs/stream/src/streamQueue.c | 20 ++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4a7d3a2a05..2850e8e733 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -75,7 +75,8 @@ struct STokenBucket { double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second double quotaRemain; // not consumed bytes per second double quotaRate; // number of token per second - int64_t fillTimestamp; // fill timestamp + int64_t tokenFillTimestamp; // fill timestamp + int64_t quotaFillTimestamp; // fill timestamp }; struct SStreamQueue { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 556de169b4..d19dfc13bf 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO; pBucket->quotaRemain = pBucket->quotaCapacity; - pBucket->fillTimestamp = taosGetTimestampMs(); + pBucket->tokenFillTimestamp = taosGetTimestampMs(); + pBucket->quotaFillTimestamp = taosGetTimestampMs(); stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate); return TSDB_CODE_SUCCESS; } static void fillTokenBucket(STokenBucket* pBucket, const char* id) { int64_t now = taosGetTimestampMs(); - int64_t delta = now - pBucket->fillTimestamp; + + int64_t deltaToken = now - pBucket->tokenFillTimestamp; ASSERT(pBucket->numOfToken >= 0); - int32_t incNum = (delta / 1000.0) * pBucket->numRate; + int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate; if (incNum > 0) { pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); - pBucket->fillTimestamp = now; + pBucket->tokenFillTimestamp = now; } // increase the new available quota as time goes on - double incSize = (delta / 1000.0) * pBucket->quotaRate; + int64_t deltaQuota = now - pBucket->quotaFillTimestamp; + double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate; if (incSize > 0) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); - pBucket->fillTimestamp = now; + pBucket->quotaFillTimestamp = now; } if (incNum > 0 || incSize > 0) { - stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s", - pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id); + stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64 + ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s", + pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id); } } From f3355000a3a16d852bfdccb3eb63c56e5249cf58 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Nov 2023 10:09:59 +0800 Subject: [PATCH 22/25] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqStreamTask.c | 8 ++++++-- source/libs/stream/src/streamState.c | 1 - 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4c0491da86..aa531a0ba8 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -212,8 +212,10 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + bool alreadyRestored = pTq->pVnode->restored; + // do not launch the stream tasks, if it is a follower or not restored vnode. - if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { + if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { return TSDB_CODE_SUCCESS; } @@ -255,7 +257,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return -1; } - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks, + alreadyRestored); + pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fb0090ec6d..fc1fb1f8c3 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1081,7 +1081,6 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - stDebug("try to write to cf parname"); #ifdef USE_ROCKSDB if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { From ff9c0c0df4515e45615c1942807d20f7d3e12f1f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Nov 2023 16:20:59 +0800 Subject: [PATCH 23/25] fix(stream): fix error in check trans conflict. --- include/common/tmsg.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 58 ++++++++----------------- 2 files changed, 19 insertions(+), 41 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d08b424e9c..328a1e383c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3182,7 +3182,7 @@ typedef struct { int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq); int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); -void tFreeSMDropStreamReq(SMDropStreamReq* pReq); +void tFreeMDropStreamReq(SMDropStreamReq* pReq); typedef struct { char name[TSDB_STREAM_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6755204622..21bf8b8dc5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -132,8 +132,11 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); taosThreadMutexInit(&execInfo.lock, NULL); - execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); + execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); return sdbSetTable(pMnode->pSdb, table); } @@ -719,24 +722,6 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) return 0; } -static bool checkDbPrecision(SMnode* pMnode, SStreamObj* pStreamObj) { - if (pStreamObj->sourceDbUid != pStreamObj->targetDbUid) { - SDbObj *pSrcDb = mndAcquireDb(pMnode, pStreamObj->sourceDb); - SDbObj *pDstDb = mndAcquireDb(pMnode, pStreamObj->targetDb); - - bool isIdentical = (pSrcDb->cfg.precision != pDstDb->cfg.precision); - mndReleaseDb(pMnode, pSrcDb); - mndReleaseDb(pMnode, pDstDb); - - if (!isIdentical) { - mError("stream:%s failed to create since target/source db precision not identical", pStreamObj->name); - return false; - } - } - - return true; -} - static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks int32_t numOfStream = 0; SStreamObj *pStream = NULL; @@ -809,12 +794,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - // check if the time precision for source&target DB is identical. - bool isIdentical = checkDbPrecision(pMnode, &streamObj); - if (!isIdentical) { - goto _OVER; - } - code = checkForNumOfStreams(pMnode, &streamObj); if (code != TSDB_CODE_SUCCESS) { goto _OVER; @@ -896,7 +875,6 @@ _OVER: } mndReleaseStream(pMnode, pStream); - tFreeSCMCreateStreamReq(&createStreamReq); tFreeStreamObj(&streamObj); return code; @@ -1278,9 +1256,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, pDb); - taosMemoryFree((void *)pDb); - mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb); + taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, @@ -1331,26 +1308,27 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (dropReq.igNotExists) { mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; mError("stream:%s not exist failed to drop", dropReq.name); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } } if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1358,7 +1336,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1368,7 +1346,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (mndTransCheckConflict(pMnode, pTrans) != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1379,7 +1357,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1387,7 +1365,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1395,7 +1373,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1408,7 +1386,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -1826,7 +1804,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } @@ -1961,7 +1939,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } From 630b259d7f0304763c614ef409411bbf90f7322d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 14:29:01 +0800 Subject: [PATCH 24/25] fix(stream): fix error in stream. --- source/common/src/tmsg.c | 2 +- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 ++ source/libs/parser/src/parTranslater.c | 2 +- tests/script/tsim/stream/pauseAndResume.sim | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c67b9e5e68..e5e94d3d35 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7003,7 +7003,7 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq * return 0; } -void tFreeSMDropStreamReq(SMDropStreamReq *pReq) { +void tFreeMDropStreamReq(SMDropStreamReq *pReq) { FREESQL(); } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index aec72339e0..2345de290a 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -86,12 +86,14 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const cha SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); if (pEntry != NULL) { taosThreadMutexUnlock(&execInfo.lock); + mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); if (pEntry != NULL) { taosThreadMutexUnlock(&execInfo.lock); + mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1c31993a92..d735e1fd0e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7382,7 +7382,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt tNameGetFullDbName(&name, dropReq.name); dropReq.igNotExists = pStmt->ignoreNotExists; int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return code; } diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index aa1cb4dbb9..5eb9eef010 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -16,8 +16,8 @@ sql create table ts2 using st tags(2,2,2); sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); +sleep 2000 -sleep 1000 sql pause stream streams1; sql insert into ts1 values(1648791213001,1,12,3,1.0); From 0654b9299aea8056df667c3a4c70443713fe822a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Nov 2023 11:44:25 +0800 Subject: [PATCH 25/25] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStream.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index eca84ab2be..0eec204719 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -145,6 +145,7 @@ int32_t mndInitStream(SMnode *pMnode) { void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execInfo.pTaskList); taosHashCleanup(execInfo.pTaskMap); + taosHashCleanup(execInfo.transMgmt.pDBTrans); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); }