From 9f8a63099cedfa97dcbb65c554b33c69cab61197 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Nov 2023 15:26:50 +0800 Subject: [PATCH 01/11] fix(stream): fix error in generating token in bucket. --- source/libs/stream/inc/streamInt.h | 3 ++- source/libs/stream/src/streamQueue.c | 20 ++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index b76a967d0d..6dd1e5c1c3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -75,7 +75,8 @@ struct STokenBucket { double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second double quotaRemain; // not consumed bytes per second double quotaRate; // number of token per second - int64_t fillTimestamp; // fill timestamp + int64_t tokenFillTimestamp; // fill timestamp + int64_t quotaFillTimestamp; // fill timestamp }; struct SStreamQueue { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 556de169b4..d19dfc13bf 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO; pBucket->quotaRemain = pBucket->quotaCapacity; - pBucket->fillTimestamp = taosGetTimestampMs(); + pBucket->tokenFillTimestamp = taosGetTimestampMs(); + pBucket->quotaFillTimestamp = taosGetTimestampMs(); stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate); return TSDB_CODE_SUCCESS; } static void fillTokenBucket(STokenBucket* pBucket, const char* id) { int64_t now = taosGetTimestampMs(); - int64_t delta = now - pBucket->fillTimestamp; + + int64_t deltaToken = now - pBucket->tokenFillTimestamp; ASSERT(pBucket->numOfToken >= 0); - int32_t incNum = (delta / 1000.0) * pBucket->numRate; + int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate; if (incNum > 0) { pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); - pBucket->fillTimestamp = now; + pBucket->tokenFillTimestamp = now; } // increase the new available quota as time goes on - double incSize = (delta / 1000.0) * pBucket->quotaRate; + int64_t deltaQuota = now - pBucket->quotaFillTimestamp; + double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate; if (incSize > 0) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); - pBucket->fillTimestamp = now; + pBucket->quotaFillTimestamp = now; } if (incNum > 0 || incSize > 0) { - stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s", - pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id); + stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64 + ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s", + pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id); } } From 2b099b11dd8ed141043c21318fb3e006527ddebc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Nov 2023 10:09:59 +0800 Subject: [PATCH 02/11] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqStreamTask.c | 8 ++++++-- source/libs/stream/src/streamState.c | 1 - 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4c0491da86..aa531a0ba8 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -212,8 +212,10 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + bool alreadyRestored = pTq->pVnode->restored; + // do not launch the stream tasks, if it is a follower or not restored vnode. - if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { + if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { return TSDB_CODE_SUCCESS; } @@ -255,7 +257,9 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return -1; } - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks, + alreadyRestored); + pRunReq->head.vgId = vgId; pRunReq->streamId = 0; pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 2e51200fe4..0f32fd6879 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1086,7 +1086,6 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - stDebug("try to write to cf parname"); #ifdef USE_ROCKSDB if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { From 43e6722fd81d17e0765f966b9c49d34c91103fad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Nov 2023 23:26:16 +0800 Subject: [PATCH 03/11] refactor: refactor the stream trans conflict check --- source/dnode/mnode/impl/inc/mndDef.h | 3 +- source/dnode/mnode/impl/inc/mndStream.h | 34 ++- source/dnode/mnode/impl/src/mndScheduler.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 264 +++++++++++++-------- 4 files changed, 189 insertions(+), 114 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index efa99db74b..aa82dff7ec 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -649,8 +649,7 @@ typedef struct SStreamConf { } SStreamConf; typedef struct { - char name[TSDB_STREAM_FNAME_LEN]; - // ctl + char name[TSDB_STREAM_FNAME_LEN]; SRWLatch lock; // create info diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 19fd2a3fd4..244a6d08dd 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -22,17 +22,37 @@ extern "C" { #endif -int32_t mndInitStream(SMnode *pMnode); -void mndCleanupStream(SMnode *pMnode); +typedef struct SStreamTransInfo { + int64_t startTime; + int32_t transId; + const char *name; +} SStreamTransInfo; +typedef struct SStreamTransMgmt { + SHashObj *pDBTrans; +} SStreamTransMgmt; + +typedef struct SStreamExecInfo { + SArray *pNodeList; + int64_t ts; // snapshot ts + SStreamTransMgmt transMgmt; + int64_t activeCheckpoint; // active check point id + SHashObj * pTaskMap; + SArray * pTaskList; + TdThreadMutex lock; +} SStreamExecInfo; + +extern SStreamExecInfo execInfo; + +int32_t mndInitStream(SMnode *pMnode); +void mndCleanupStream(SMnode *pMnode); SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); +int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); -SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); - -int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); -int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb); +bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 88a06cb513..3ef4c9a4d2 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -28,7 +28,7 @@ extern bool tsDeployOnSnode; static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, - SEpSet* pEpset, bool isFillhistory); + SEpSet* pEpset, bool isFillhistory); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ee990956ff..30341f45c0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -34,7 +34,13 @@ #define MND_STREAM_VER_NUMBER 4 #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_MAX_NUM 60 -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" + +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" +#define MND_STREAM_PAUSE_NAME "stream-pause" +#define MND_STREAM_RESUME_NAME "stream-resume" +#define MND_STREAM_DROP_NAME "stream-drop" +#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" +#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" typedef struct SNodeEntry { int32_t nodeId; @@ -43,22 +49,13 @@ typedef struct SNodeEntry { int64_t hbTimestamp; // second } SNodeEntry; -typedef struct SStreamExecInfo { - SArray *pNodeList; - int64_t ts; // snapshot ts - int64_t activeCheckpoint; // active check point id - SHashObj * pTaskMap; - SArray * pTaskList; - TdThreadMutex lock; -} SStreamExecInfo; - typedef struct SVgroupChangeInfo { SHashObj *pDBMap; SArray * pUpdateNodeList; // SArray } SVgroupChangeInfo; -static int32_t mndNodeCheckSentinel = 0; -static SStreamExecInfo execInfo; +static int32_t mndNodeCheckSentinel = 0; +SStreamExecInfo execInfo; static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); @@ -83,17 +80,20 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name); +static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode); static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); +static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); +static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); + int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -335,7 +335,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 .pAstRoot = pAst, .topicQuery = false, .streamQuery = true, - .triggerType = triggerType == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, + .triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, .watermark = watermark, }; code = qCreateQueryPlan(&cxt, &pPlan, NULL); @@ -720,6 +720,52 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) return 0; } +static bool checkDbPrecision(SMnode* pMnode, SStreamObj* pStreamObj) { + if (pStreamObj->sourceDbUid != pStreamObj->targetDbUid) { + SDbObj *pSrcDb = mndAcquireDb(pMnode, pStreamObj->sourceDb); + SDbObj *pDstDb = mndAcquireDb(pMnode, pStreamObj->targetDb); + + bool isIdentical = (pSrcDb->cfg.precision != pDstDb->cfg.precision); + mndReleaseDb(pMnode, pSrcDb); + mndReleaseDb(pMnode, pDstDb); + + if (!isIdentical) { + mError("stream:%s failed to create since target/source db precision not identical", pStreamObj->name); + return false; + } + } + + return true; +} + +static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks + int32_t numOfStream = 0; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { + if (pStream->sourceDbUid == pStreamObj->sourceDbUid) { + ++numOfStream; + } + + sdbRelease(pMnode->pSdb, pStream); + + if (numOfStream > MND_STREAM_MAX_NUM) { + mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_MND_TOO_MANY_STREAMS; + } + + if (pStream->targetStbUid == pStreamObj->targetStbUid) { + mError("Cannot write the same stable as other stream:%s", pStream->name); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_MND_INVALID_TARGET_TABLE; + } + } + + return TSDB_CODE_SUCCESS; +} + static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode * pMnode = pReq->info.node; int32_t code = -1; @@ -732,6 +778,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; } + #ifdef WINDOWS terrno = TSDB_CODE_MND_INVALID_PLATFORM; goto _OVER; @@ -772,42 +819,15 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - { - int32_t numOfStream = 0; + // check if the time precision for source&target DB is identical. + bool isIdentical = checkDbPrecision(pMnode, &streamObj); + if (!isIdentical) { + goto _OVER; + } - SStreamObj *pStream = NULL; - void * pIter = NULL; - - while (1) { - pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - if (numOfStream > MND_STREAM_MAX_NUM) { - mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); - terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; - goto _OVER; - } - break; - } - - if (pStream->sourceDbUid == streamObj.sourceDbUid) { - ++numOfStream; - } - - sdbRelease(pMnode->pSdb, pStream); - if (numOfStream > MND_STREAM_MAX_NUM) { - mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); - terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; - sdbCancelFetch(pMnode->pSdb, pIter); - goto _OVER; - } - - if (pStream->targetStbUid == streamObj.targetStbUid) { - mError("Cannot write the same stable as other stream:%s", pStream->name); - terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; - sdbCancelFetch(pMnode->pSdb, pIter); - goto _OVER; - } - } + code = checkForNumOfStreams(pMnode, &streamObj); + if (code != TSDB_CODE_SUCCESS) { + goto _OVER; } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); @@ -866,7 +886,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); mDebug("stream tasks register into node list"); - keepStreamTasksInBuf(&streamObj, &execInfo); + saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -1268,7 +1288,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, NULL, MND_STREAM_CHECKPOINT_NAME); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME); if (pTrans == NULL) { mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return -1; @@ -1277,7 +1297,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); const char *pDb = mndGetStreamDB(pMnode); - mndTransSetDbName(pTrans, pDb, "checkpoint"); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1345,7 +1365,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream"); + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (!conflict) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1353,7 +1380,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); + mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1363,6 +1390,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->sourceDb, pStream->targetDb); + // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); @@ -1392,7 +1421,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - // reuse this function for stream auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen); @@ -1814,6 +1842,13 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (!conflict) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + bool updated = taskNodeIsUpdated(pMnode); if (updated) { mError("tasks are not ready for pause, node update detected"); @@ -1822,7 +1857,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); if (pTrans == NULL) { - mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr()); + mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } @@ -1836,6 +1871,8 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); + // pause all tasks if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); @@ -1940,13 +1977,21 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); - if (pTrans == NULL) { - mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr()); + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (!conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } - mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_RESUME_NAME); + if (pTrans == NULL) { + mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1955,6 +2000,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb); + // resume all tasks if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); @@ -2219,7 +2266,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // here create only one trans if (pTrans == NULL) { - pTrans = doCreateTrans(pMnode, pStream, "stream-task-update"); + pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); if (pTrans == NULL) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); @@ -2329,7 +2376,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) { break; } - keepStreamTasksInBuf(pStream, &execInfo); + saveStreamTasksInfo(pStream, &execInfo); sdbRelease(pSdb, pStream); } } @@ -2413,6 +2460,14 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { return 0; } +static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInfo) { + void* pIter = NULL; + while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { + char* pDb = (char*) pIter; + killActiveCheckpointTrans(pMnode, pDb); + } +} + // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; @@ -2454,7 +2509,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { // kill current active checkpoint transaction, since the transaction is vnode wide. - doKillActiveCheckpointTrans(pMnode); + killAllCheckpointTrans(pMnode, &changeInfo); + code = mndProcessVgroupChange(pMnode, &changeInfo); // keep the new vnode snapshot @@ -2500,7 +2556,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { +void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { @@ -2543,8 +2599,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); if (pId->taskId == id.taskId && pId->streamId == id.streamId) { taosArrayRemove(pExecNode->pTaskList, k); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, - (int32_t)taosArrayGetSize(pExecNode->pTaskList)); + + int32_t num = taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num); break; } } @@ -2555,15 +2612,15 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name); +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name); if (pTrans == NULL) { mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); + mDebug("s-task:0x%"PRIx64" start to build trans %s", pStream->uid, pMsg); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2578,7 +2635,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) { } int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { - STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset"); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); if (pTrans == NULL) { return terrno; } @@ -2642,43 +2699,36 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { - int32_t transId = 0; - SSdb * pSdb = pMnode->pSdb; - STrans *pTrans = NULL; - void * pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans); - if (pIter == NULL) { - break; - } - - if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) { - transId = pTrans->id; - sdbRelease(pSdb, pTrans); - sdbCancelFetch(pSdb, pIter); - break; - } - - sdbRelease(pSdb, pTrans); - } - - if (transId == 0) { - mDebug("failed to find the checkpoint trans, reset not executed"); +int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) { + // data in the hash table will be removed automatically, no need to remove it here. + SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName)); + if (pTransInfo == NULL) { return TSDB_CODE_SUCCESS; } - pTrans = mndAcquireTrans(pMnode, transId); - mInfo("kill checkpoint trans:%d", transId); + // not checkpoint trans, ignore + if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { + mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); + return TSDB_CODE_SUCCESS; + } + + STrans* pTrans = mndAcquireTrans(pMnode, pTransInfo->transId); + if (pTrans != NULL) { + mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDBName); + mndKillTrans(pMnode, pTrans); + mndReleaseTrans(pMnode, pTrans); + } - mndKillTrans(pMnode, pTrans); - mndReleaseTrans(pMnode, pTrans); return TSDB_CODE_SUCCESS; } -int32_t mndResetFromCheckpoint(SMnode *pMnode) { - doKillActiveCheckpointTrans(pMnode); +int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { + STrans* pTrans = mndAcquireTrans(pMnode, transId); + if (pTrans != NULL) { + mInfo("kill checkpoint transId:%d to reset task status", transId); + mndKillTrans(pMnode, pTrans); + mndReleaseTrans(pMnode, pTrans); + } // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. SSdb * pSdb = pMnode->pSdb; @@ -2690,7 +2740,13 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) { break; } - // todo this transaction should exist be only one + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + if (conflict) { + mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", + pStream->name, pStream->sourceDb, pStream->targetDb); + continue; + } + mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); int32_t code = createStreamResetStatusTrans(pMnode, pStream); if (code != TSDB_CODE_SUCCESS) { @@ -2817,7 +2873,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint); - mndResetFromCheckpoint(pMnode); + mndResetStatusFromCheckpoint(pMnode, activeCheckpointId); } else { mInfo("not all vgroups are ready, wait for next HB from stream tasks"); } From 0a2fb5fe9a9c5cac8a39dab49d48e9d3958f97c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Nov 2023 23:59:05 +0800 Subject: [PATCH 04/11] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 103 +++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 source/dnode/mnode/impl/src/mndStreamTrans.c diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c new file mode 100644 index 0000000000..aec72339e0 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndTrans.h" +#include "mndStream.h" + +typedef struct SKeyInfo { + void* pKey; + int32_t keyLen; +} SKeyInfo; + +static int32_t clearFinishedTrans(SMnode* pMnode); + +int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb) { + SStreamTransInfo info = {.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pName}; + taosHashPut(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb), &info, sizeof(SStreamTransInfo)); + + if (strcmp(pSrcDb, pDstDb) != 0) { + taosHashPut(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb), &info, sizeof(SStreamTransInfo)); + } + + return 0; +} + +int32_t clearFinishedTrans(SMnode* pMnode) { + SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); + size_t keyLen = 0; + + taosThreadMutexLock(&execInfo.lock); + + void* pIter = NULL; + while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { + SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter; + STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId); + + // let's clear the finished trans + if (pTrans == NULL) { + void* pKey = taosHashGetKey(pEntry, &keyLen); + // key is the name of src/dst db name + SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; + + mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name, + pEntry->startTime); + taosArrayPush(pList, &info); + } else { + mndReleaseTrans(pMnode, pTrans); + } + } + + size_t num = taosArrayGetSize(pList); + for(int32_t i = 0; i < num; ++i) { + SKeyInfo* pKey = taosArrayGet(pList, i); + taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen); + } + + mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); + taosThreadMutexUnlock(&execInfo.lock); + + terrno = TSDB_CODE_SUCCESS; + taosArrayDestroy(pList); + return 0; +} + +bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) { + clearFinishedTrans(pMnode); + + taosThreadMutexLock(&execInfo.lock); + int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); + if (num <= 0) { + taosThreadMutexUnlock(&execInfo.lock); + return false; + } + + SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); + if (pEntry != NULL) { + taosThreadMutexUnlock(&execInfo.lock); + return true; + } + + pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); + if (pEntry != NULL) { + taosThreadMutexUnlock(&execInfo.lock); + return true; + } + + taosThreadMutexUnlock(&execInfo.lock); + return false; +} + + + From 75420bfefbe7a3c03089b89eebdfd0b60b57a88e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Nov 2023 23:35:09 +0800 Subject: [PATCH 05/11] fix(stream): fix the checkpoint msg failure. --- source/libs/stream/src/streamDispatch.c | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42280b0d0f..82affe71d9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1143,8 +1143,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { - stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); + // todo handle the agg task failure, add test case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER && + pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 + ", set the current checkpoint failed, and send rsp to mnode", + id, pTask->checkpointingId); + { // send checkpoint failure msg to mnode directly + pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id + pTask->checkpointingId = pTask->checkpointingId; + streamTaskSendCheckpointReadyMsg(pTask); + } + } else { + stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } } } From 7a0dfdb0cc64d4b6e15938705edab4c3cb5a0453 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 11:13:06 +0800 Subject: [PATCH 06/11] fix(tsdb): fix invalid read. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6169014d9f..751df706ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -572,7 +572,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN if (isEmptyQueryTimeWindow(&w)) { k += 1; - continue; + + if (k >= numOfTables) { + break; + } else { + continue; + } } // 1. time range check From d355b78884be7e23ba197509aba97dac6268cb53 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 11:21:31 +0800 Subject: [PATCH 07/11] fix(stream): fix invalid read. --- source/dnode/mnode/impl/src/mndStream.c | 13 ++++++++----- source/libs/stream/src/streamDispatch.c | 7 +++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 30341f45c0..8b26e0a097 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -88,7 +88,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName); +static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); @@ -1297,7 +1297,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId); const char *pDb = mndGetStreamDB(pMnode); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pDb, pDb); taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -2464,7 +2464,10 @@ static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInf void* pIter = NULL; while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { char* pDb = (char*) pIter; - killActiveCheckpointTrans(pMnode, pDb); + + size_t len = 0; + void* pKey = taosHashGetKey(pDb, &len); + killActiveCheckpointTrans(pMnode, pKey, len); } } @@ -2699,9 +2702,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName) { +int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) { // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, strlen(pDBName)); + SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); if (pTransInfo == NULL) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 82affe71d9..b2f47c6d2a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1102,6 +1102,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; int32_t msgId = pTask->execInfo.dispatch; +#if 0 + // for test purpose, build the failure case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER) { + pRsp->inputStatus = TASK_INPUT_STATUS__REFUSED; + } +#endif + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); From 3907268f40c82f3851690fdc62d20d45bae85def Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 14:29:01 +0800 Subject: [PATCH 08/11] fix(stream): register checkpoint trans. --- source/dnode/mnode/impl/src/mndStream.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8b26e0a097..66b3e3cf95 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1300,6 +1300,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { mndTransSetDbName(pTrans, pDb, pDb); taosMemoryFree((void *)pDb); + mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1873,7 +1875,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); - // pause all tasks + // if nodeUpdate happened, not send pause trans if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); From b712f5e7dce21167ad3604ba3b42a986fe629728 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:03:23 +0800 Subject: [PATCH 09/11] refactor: do some internal refactor. --- include/libs/executor/storageapi.h | 56 +++------------------------ source/dnode/vnode/src/tq/tq.c | 6 --- source/libs/stream/src/streamTaskSm.c | 49 ++++++++--------------- 3 files changed, 23 insertions(+), 88 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index d5f1da957d..045f2bad70 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -150,19 +150,6 @@ typedef struct { int32_t colNum; } SMetaStbStats; -// void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -// int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); -// int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); -// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -// bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); -// bool tqCurrentBlockConsumed(const STqReader* pReader); -// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -// bool tqNextBlockInWal(STqReader* pReader, const char* idstr); -// bool tqNextBlockImpl(STqReader *pReader, const char* idstr); -// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t -// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext -// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx); - // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ typedef struct TsdReader { @@ -197,27 +184,6 @@ typedef struct SStoreCacheReader { // clang-format on /*------------------------------------------------------------------------------------------------------------------*/ -/* -void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); -int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); -int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); -int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); -bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid); -bool tqCurrentBlockConsumed(const STqReader* pReader); - -int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); -bool tqNextBlockInWal(STqReader* pReader, const char* idstr); -bool tqNextBlockImpl(STqReader *pReader, const char* idstr); - - int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr); -STqReader *tqReaderOpen(void *pVnode); -void tqReaderClose(STqReader *); - -int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); -bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -SWalReader* tqGetWalReader(STqReader* pReader); -int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); -*/ // todo rename typedef struct SStoreTqReader { struct STqReader* (*tqReaderOpen)(); @@ -281,28 +247,18 @@ typedef struct SStoreMeta { void* (*storeGetIndexInfo)(); void* (*getInvertIndex)(void* pVnode); - int32_t (*getChildTableList)( - void* pVnode, int64_t suid, - SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList + // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] + int32_t (*getChildTableList)( void* pVnode, int64_t suid, SArray* list); + int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); void* storeGetVersionRange; void* storeGetLastTimestamp; int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema + int32_t (*getNumOfChildTables)( void* pVnode, int64_t uid, int64_t* numOfTables, int32_t* numOfCols); + void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, int64_t* numOfNormalTables); - // db name, vgId, numOfTables, numOfSTables - int32_t (*getNumOfChildTables)( - void* pVnode, int64_t uid, int64_t* numOfTables, - int32_t* numOfCols); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); - void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, - int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & - // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); int64_t (*getNumOfRowsInMem)(void* pVnode); - /** -int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); -int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); -int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); - */ + SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6dbeaef6cb..2de6205804 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1136,16 +1136,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // let's decide which step should be executed now if (pTask->execInfo.step1Start == 0) { - ASSERT(pTask->status.pauseAllowed == false); int64_t ts = taosGetTimestampMs(); pTask->execInfo.step1Start = ts; tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts); - - // NOTE: in case of stream task, scan-history data in wal is not allowed to pause - if (pTask->info.fillHistory == 1) { - streamTaskEnablePause(pTask); - } } else { if (pTask->execInfo.step2Start == 0) { tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs", diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 04b449aaaf..1c951e1452 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -269,6 +269,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); taosMsleep(100); } else { + // no active event trans exists, handle this event directly pTrans = streamTaskFindTransform(pSM->current.state, event); if (pTrans == NULL) { stDebug("s-task:%s failed to handle event:%s", pTask->id.idStr, GET_EVT_NAME(event)); @@ -451,60 +452,43 @@ int32_t initStateTransferTable() { return TSDB_CODE_SUCCESS; } +//clang-format off void doInitStateTransferTable(void) { streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans)); // initialization event handle - STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, - streamTaskInitStatus, onNormalTaskReady, false, false); + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, onNormalTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, - streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, - streamTaskInitStatus, onScanhistoryTaskReady, false, false); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST, streamTaskInitStatus, onScanhistoryTaskReady, false, false); taosArrayPush(streamTaskSMTrans, &trans); // scan-history related event - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, - NULL, true); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, - NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // halt stream task, from other task status - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT}; - trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, &info, true); - taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, - &info, true); + trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, - streamTaskKeepCurrentVerInWal, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true); + taosArrayPush(streamTaskSMTrans, &trans); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // checkpoint related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, - streamTaskDoCheckpoint, NULL, true); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); - - trans = - createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); // pause & resume related event handle @@ -571,4 +555,5 @@ void doInitStateTransferTable(void) { taosArrayPush(streamTaskSMTrans, &trans); trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true); taosArrayPush(streamTaskSMTrans, &trans); -} \ No newline at end of file +} +//clang-format on \ No newline at end of file From c70c54a099f9201158411daa42127f5b312ea4b6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:10:26 +0800 Subject: [PATCH 10/11] fix(stream): add missing pushed data. --- source/libs/stream/src/streamStart.c | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 97eb7b79a2..9b27281915 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -168,7 +168,6 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { } else if (level == TASK_LEVEL__AGG) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); - streamTaskEnablePause(pTask); } } else if (level == TASK_LEVEL__SINK) { stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); @@ -345,7 +344,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) { stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus); } - streamTaskEnablePause(pTask); return TSDB_CODE_SUCCESS; } @@ -659,9 +657,6 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { streamMetaCommit(pMeta); streamMetaWUnLock(pMeta); - // history data scan in the stream time window finished, now let's enable the pause - streamTaskEnablePause(pTask); - // for source tasks, let's continue execute. if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { streamSchedExec(pTask); @@ -1040,11 +1035,6 @@ void streamTaskResume(SStreamTask* pTask) { } } -void streamTaskEnablePause(SStreamTask* pTask) { - stDebug("s-task:%s enable task pause", pTask->id.idStr); - pTask->status.pauseAllowed = 1; -} - static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { int32_t vgId = pMeta->vgId; void* pIter = NULL; From 81d3e5737ec0f8ab37a1f966d672d892565bac2b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Nov 2023 16:31:07 +0800 Subject: [PATCH 11/11] fix(stream): fix send msg error. --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b2f47c6d2a..1f9b21becc 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1159,7 +1159,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i { // send checkpoint failure msg to mnode directly pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id pTask->checkpointingId = pTask->checkpointingId; - streamTaskSendCheckpointReadyMsg(pTask); + streamTaskSendCheckpointSourceRsp(pTask); } } else { stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,