From d4bab8c09be35ed39be454071ca151f2435e8d2a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Jan 2024 16:53:30 +0800 Subject: [PATCH 1/3] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 3 +- source/dnode/mnode/impl/inc/mndStream.h | 36 +- source/dnode/mnode/impl/src/mndDef.c | 2 +- source/dnode/mnode/impl/src/mndSma.c | 6 +- source/dnode/mnode/impl/src/mndStream.c | 724 +------------------ source/dnode/mnode/impl/src/mndStreamTrans.c | 103 +++ source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +- source/libs/stream/src/streamMeta.c | 19 +- source/libs/stream/src/streamTask.c | 7 +- 10 files changed, 169 insertions(+), 737 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 46f4b0959f..7ff47d2d59 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -462,7 +462,6 @@ struct SStreamTask { struct SStreamMeta* pMeta; SSHashObj* pNameMap; void* pBackend; - int64_t backendRefId; char reserve[256]; }; @@ -535,7 +534,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory SArray* pTaskList, bool hasFillhistory); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); -void tFreeStreamTask(SStreamTask* pTask, bool metaLock); +void tFreeStreamTask(SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index e72b2ed536..871e12c5e6 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -17,11 +17,15 @@ #define _TD_MND_STREAM_H_ #include "mndInt.h" +#include "mndTrans.h" #ifdef __cplusplus extern "C" { #endif +#define MND_STREAM_RESERVE_SIZE 64 +#define MND_STREAM_VER_NUMBER 4 + typedef struct SStreamTransInfo { int64_t startTime; int64_t streamUid; @@ -53,6 +57,19 @@ typedef struct SStreamExecInfo { SHashObj *pTransferStateStreams; } SStreamExecInfo; +typedef struct SNodeEntry { + int32_t nodeId; + bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. + SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. + int64_t hbTimestamp; // second +} SNodeEntry; + +typedef struct SFailedCheckpointInfo { + int64_t streamUid; + int64_t checkpointId; + int32_t transId; +} SFailedCheckpointInfo; + #define MND_STREAM_CREATE_NAME "stream-create" #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" #define MND_STREAM_PAUSE_NAME "stream-pause" @@ -68,7 +85,7 @@ void mndCleanupStream(SMnode *pMnode); SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); -int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid); int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); @@ -80,7 +97,22 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid); int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); +int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); +int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); +SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); +void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); +void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode); +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); +int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); +SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); +int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); +int32_t mndProcessStreamHb(SRpcMsg *pReq); +void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); +int32_t initStreamNodeList(SMnode *pMnode); +int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); +int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index ae72172bbb..172c3952ad 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -182,7 +182,7 @@ void *freeStreamTasks(SArray *pTaskLevel) { int32_t taskSz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < taskSz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - tFreeStreamTask(pTask, true); + tFreeStreamTask(pTask); } taosArrayDestroy(pLevel); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 44842084c5..a89136e7d3 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -639,7 +639,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER; - if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; + if (mndPersistStream(pTrans, &streamObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name, @@ -872,7 +872,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p } // drop stream - if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) { mError("stream:%s, failed to drop log since %s", pStream->name, terrstr()); sdbRelease(pMnode->pSdb, pStream); goto _OVER; @@ -923,7 +923,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p goto _OVER; } - if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) { mndReleaseStream(pMnode, pStream); goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b8e0126650..5e03ec6447 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -27,17 +27,8 @@ #include "tmisce.h" #include "tname.h" -#define MND_STREAM_VER_NUMBER 4 -#define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_MAX_NUM 60 -typedef struct SNodeEntry { - int32_t nodeId; - bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. - SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. - int64_t hbTimestamp; // second -} SNodeEntry; - typedef struct SVgroupChangeInfo { SHashObj *pDBMap; SArray *pUpdateNodeList; // SArray @@ -54,7 +45,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq); -static int32_t mndProcessStreamHb(SRpcMsg *pReq); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -66,28 +56,18 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); -static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); -static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); -static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode); static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); -static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); -static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static void freeCheckpointCandEntry(void *); static void freeTaskList(void *param); -static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -176,53 +156,6 @@ void mndCleanupStream(SMnode *pMnode) { mDebug("mnd stream exec info cleanup"); } -SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - void *buf = NULL; - - SEncoder encoder; - tEncoderInit(&encoder, NULL, 0); - if (tEncodeSStreamObj(&encoder, pStream) < 0) { - tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; - } - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - - int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); - if (pRaw == NULL) goto STREAM_ENCODE_OVER; - - buf = taosMemoryMalloc(tlen); - if (buf == NULL) goto STREAM_ENCODE_OVER; - - tEncoderInit(&encoder, buf, tlen); - if (tEncodeSStreamObj(&encoder, pStream) < 0) { - tEncoderClear(&encoder); - goto STREAM_ENCODE_OVER; - } - tEncoderClear(&encoder); - - int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); - SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); - - terrno = TSDB_CODE_SUCCESS; - -STREAM_ENCODE_OVER: - taosMemoryFreeClear(buf); - if (terrno != TSDB_CODE_SUCCESS) { - mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); - sdbFreeRaw(pRaw); - return NULL; - } - - mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream, - pStream->checkpointId); - return pRaw; -} - SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -548,7 +481,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { +int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -582,30 +515,12 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea return 0; } -int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - if (mndPersistStreamTasks(pMnode, pTrans, pStream) < 0) { +int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) { + if (mndPersistStreamTasks(pTrans, pStream) < 0) { return -1; } - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - return -1; - } - - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - return 0; -} - -int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - return -1; - } - - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - return 0; + return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); } static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { @@ -699,40 +614,7 @@ _OVER: return -1; } -static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) { - *hasEpset = false; - pEpSet->numOfEps = 0; - if (nodeId == SNODE_HANDLE) { - SSnodeObj *pObj = NULL; - void *pIter = NULL; - - pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); - if (pIter != NULL) { - addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); - sdbRelease(pMnode->pSdb, pObj); - sdbCancelFetch(pMnode->pSdb, pIter); - *hasEpset = true; - return TSDB_CODE_SUCCESS; - } else { - mError("failed to acquire snode epset"); - return TSDB_CODE_INVALID_PARA; - } - } else { - SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId); - if (pVgObj != NULL) { - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); - - epsetAssign(pEpSet, &epset); - *hasEpset = true; - return TSDB_CODE_SUCCESS; - } else { - mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId); - return TSDB_CODE_SUCCESS; - } - } -} static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); @@ -900,7 +782,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } // add stream to trans - if (mndPersistStream(pMnode, pTrans, &streamObj) < 0) { + if (mndPersistStream(pTrans, &streamObj) < 0) { mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; @@ -1126,7 +1008,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre pStream->version = pStream->version + 1; taosWUnLockLatch(&pStream->lock); - if ((code = mndPersistTransLog(pStream, pTrans)) != TSDB_CODE_SUCCESS) { + if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) { return code; } @@ -1141,7 +1023,7 @@ _ERR: return code; } -static int32_t initStreamNodeList(SMnode *pMnode) { +int32_t initStreamNodeList(SMnode *pMnode) { if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) { execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); execInfo.pNodeList = extractNodeListFromStream(pMnode); @@ -1367,7 +1249,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // drop stream - if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); @@ -1386,7 +1268,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); - killTransImpl(pMnode, transId, pStream->sourceDb); + mndKillTransImpl(pMnode, transId, pStream->sourceDb); } removeStreamTasksInBuf(pStream, &execInfo); @@ -1434,13 +1316,13 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); - killTransImpl(pMnode, transId, pStream->sourceDb); + mndKillTransImpl(pMnode, transId, pStream->sourceDb); } // drop the stream obj in execInfo removeStreamTasksInBuf(pStream, &execInfo); - if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) { sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); return -1; @@ -1741,69 +1623,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { - SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); - if (pReq == NULL) { - mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SEpSet epset = {0}; - mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps); - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - taosMemoryFree(pReq); - return -1; - } - - // no valid epset, return directly without redoAction - if (!hasEpset) { - taosMemoryFree(pReq); - return TSDB_CODE_SUCCESS; - } - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - return 0; -} - -int32_t mndPauseAllStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SArray *tasks = pStream->tasks; - - int32_t size = taosArrayGetSize(tasks); - for (int32_t i = 0; i < size; i++) { - SArray *pTasks = taosArrayGetP(tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) { - return -1; - } - - if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } - } - } - return 0; -} - static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) { - // SStreamObj streamObj = {0}; - // memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); taosWLockLatch(&pStream->lock); pStream->status = status; SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); @@ -1882,7 +1702,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); // if nodeUpdate happened, not send pause trans - if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1909,57 +1729,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { - SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); - if (pReq == NULL) { - mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - pReq->igUntreated = igUntreated; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - taosMemoryFree(pReq); - return -1; - } - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - return 0; -} - -int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { - int32_t size = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < size; i++) { - SArray *pTasks = taosArrayGetP(pStream->tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { - return -1; - } - - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup); - } - } - } - return 0; -} - static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -2019,7 +1788,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); // resume all tasks - if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { + if (mndResumeStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2097,40 +1866,6 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha return TSDB_CODE_SUCCESS; } -int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) { - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - if (pCommitRaw == NULL) { - mError("failed to encode stream since %s", terrstr()); - mndTransDrop(pTrans); - return -1; - } - - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - sdbFreeRaw(pCommitRaw); - mndTransDrop(pTrans); - return -1; - } - - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) { - mError("stream trans:%d failed to set raw status since %s", pTrans->id, terrstr()); - sdbFreeRaw(pCommitRaw); - mndTransDrop(pTrans); - return -1; - } - - return 0; -} - -void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode) { - pAction->epSet = *pEpset; - pAction->contLen = contLen; - pAction->pCont = pCont; - pAction->msgType = msgType; - pAction->retryCode = retryCode; -} - // todo extract method: traverse stream tasks // build trans to update the epset static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { @@ -2224,69 +1959,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SVgObj *pVgroup = NULL; - - *allReady = true; - SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) { - break; - } - - SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; - entry.epset = mndGetVgroupEpset(pMnode, pVgroup); - - // if not all ready till now, no need to check the remaining vgroups. - if (*allReady) { - for (int32_t i = 0; i < pVgroup->replica; ++i) { - if (!pVgroup->vnodeGid[i].syncRestore) { - mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); - *allReady = false; - break; - } - - ESyncState state = pVgroup->vnodeGid[i].syncState; - if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { - mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId); - *allReady = false; - break; - } - } - } - - char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); - mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); - taosArrayPush(pVgroupListSnapshot, &entry); - sdbRelease(pSdb, pVgroup); - } - - SSnodeObj *pObj = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); - if (pIter == NULL) { - break; - } - - SNodeEntry entry = {0}; - addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); - entry.nodeId = SNODE_HANDLE; - - char buf[256] = {0}; - EPSET_TO_STR(&entry.epset, buf); - mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); - taosArrayPush(pVgroupListSnapshot, &entry); - sdbRelease(pSdb, pObj); - } - - return pVgroupListSnapshot; -} - static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; @@ -2349,7 +2021,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange continue; } - code = mndPersistTransLog(pStream, pTrans); + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); sdbRelease(pSdb, pStream); if (code != TSDB_CODE_SUCCESS) { @@ -2419,22 +2091,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { return plist; } -static void doExtractTasksFromStream(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; - } - - saveStreamTasksInfo(pStream, &execInfo); - sdbRelease(pSdb, pStream); - } -} - static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) { void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); if (p == NULL) { @@ -2679,114 +2335,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name); - if (pTrans == NULL) { - mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - terrno = TSDB_CODE_MND_TRANS_CONFLICT; - mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno)); - mndTransDrop(pTrans); - return NULL; - } - - terrno = 0; - return pTrans; -} - -int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { - STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); - if (pTrans == NULL) { - return terrno; - } - - taosWLockLatch(&pStream->lock); - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - - // todo extract method, with pause stream task - SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - taosWUnLockLatch(&pStream->lock); - return terrno; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pReq); - continue; - } - - if (!hasEpset) { - taosMemoryFree(pReq); - continue; - } - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - taosWUnLockLatch(&pStream->lock); - mndTransDrop(pTrans); - return terrno; - } - } - } - - taosWUnLockLatch(&pStream->lock); - - int32_t code = mndPersistTransLog(pStream, pTrans); - if (code != TSDB_CODE_SUCCESS) { - sdbRelease(pMnode->pSdb, pStream); - return -1; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - - return TSDB_CODE_ACTION_IN_PROGRESS; -} - -void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) { - STrans *pTrans = mndAcquireTrans(pMnode, transId); - if (pTrans != NULL) { - mInfo("kill active transId:%d in Db:%s", transId, pDbName); - mndKillTrans(pMnode, pTrans); - mndReleaseTrans(pMnode, pTrans); - } else { - mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId); - } -} - int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { // data in the hash table will be removed automatically, no need to remove it here. SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); @@ -2801,238 +2349,12 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { } char *pDupDBName = strndup(pDBName, len); - killTransImpl(pMnode, pTransInfo->transId, pDupDBName); + mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); taosMemoryFree(pDupDBName); return TSDB_CODE_SUCCESS; } -static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { - int32_t code = TSDB_CODE_SUCCESS; - killTransImpl(pMnode, transId, ""); - - SStreamObj *pStream = mndGetStreamObj(pMnode, streamId); - if (pStream == NULL) { - code = TSDB_CODE_STREAM_TASK_NOT_EXIST; - mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); - } else { - bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); - if (conflict) { - mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name, - pStream->sourceDb, pStream->targetSTbName); - } else { - mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, - pStream->uid, transId); - code = createStreamResetStatusTrans(pMnode, pStream); - } - } - - mndReleaseStream(pMnode, pStream); - return code; -} - -static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { - for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - - int32_t numOfLevels = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < numOfLevels; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); - if (pTask->id.taskId == pId->taskId) { - return pTask; - } - } - } - - return NULL; -} - -static int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { - int32_t num = 0; - for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) { - SArray* pLevel = taosArrayGetP(pStream->tasks, i); - num += taosArrayGetSize(pLevel); - } - - return num; -} - -int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { - int32_t num = taosArrayGetSize(pNodeList); - mInfo("set node expired for %d nodes", num); - - for (int k = 0; k < num; ++k) { - int32_t *pVgId = taosArrayGet(pNodeList, k); - mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); - - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); - for (int i = 0; i < numOfNodes; ++i) { - SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); - - if (pNodeEntry->nodeId == *pVgId) { - mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); - pNodeEntry->stageUpdated = true; - break; - } - } - } - - return TSDB_CODE_SUCCESS; -} - -static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); - for (int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j); - if (pNodeEntry->nodeId == pTaskEntry->nodeId) { - mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, - pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); - - pNodeEntry->stageUpdated = true; - pTaskEntry->stage = stage; - break; - } - } -} - -typedef struct SFailedCheckpointInfo { - int64_t streamUid; - int64_t checkpointId; - int32_t transId; -} SFailedCheckpointInfo; - -static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pInfo) { - int32_t num = taosArrayGetSize(pList); - for(int32_t i = 0; i < num; ++i) { - SFailedCheckpointInfo* p = taosArrayGet(pList, i); - if (p->transId == pInfo->transId) { - return; - } - } - - taosArrayPush(pList, pInfo); -} - -int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SStreamHbMsg req = {0}; - -// bool checkpointFailed = false; -// int64_t checkpointId = 0; -// int64_t streamId = 0; -// int32_t transId = 0; - SArray* pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); - - SDecoder decoder = {0}; - tDecoderInit(&decoder, pReq->pCont, pReq->contLen); - - if (tDecodeStreamHbMsg(&decoder, &req) < 0) { - streamMetaClearHbMsg(&req); - tDecoderClear(&decoder); - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - tDecoderClear(&decoder); - - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); - - taosThreadMutexLock(&execInfo.lock); - - // extract stream task list - int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); - if (numOfExisted == 0) { - doExtractTasksFromStream(pMnode); - } - - initStreamNodeList(pMnode); - - int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); - if (numOfUpdated > 0) { - mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId); - setNodeEpsetExpiredFlag(req.pUpdateNodes); - } - - bool snodeChanged = false; - for (int32_t i = 0; i < req.numOfTasks; ++i) { - STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - - STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); - if (pTaskEntry == NULL) { - mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); - continue; - } - - if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { - updateStageInfo(pTaskEntry, p->stage); - if (pTaskEntry->nodeId == SNODE_HANDLE) { - snodeChanged = true; - } - } else { - // task is idle for more than 50 sec. - if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { - if (!pTaskEntry->inputQChanging) { - pTaskEntry->inputQUnchangeCounter++; - } else { - pTaskEntry->inputQChanging = false; - } - } else { - pTaskEntry->inputQChanging = true; - pTaskEntry->inputQUnchangeCounter = 0; - } - - streamTaskStatusCopy(pTaskEntry, p); - if (p->checkpointId != 0) { - if (p->checkpointFailed) { - mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, - p->checkpointId, p->chkpointTransId); - - SFailedCheckpointInfo info = { - .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pList, &info); - } - } - } - - if (p->status == pTaskEntry->status) { - pTaskEntry->statusLastDuration++; - } else { - pTaskEntry->status = p->status; - pTaskEntry->statusLastDuration = 0; - } - - if (p->status != TASK_STATUS__READY) { - mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); - } - } - - // current checkpoint is failed, rollback from the checkpoint trans - // kill the checkpoint trans and then set all tasks status to be normal - if (taosArrayGetSize(pList) > 0) { - bool allReady = true; - SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); - taosArrayDestroy(p); - - if (allReady || snodeChanged) { - // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i); - mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", - pInfo->checkpointId, pInfo->transId); - - mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId); - } - } else { - mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status"); - } - } - - taosThreadMutexUnlock(&execInfo.lock); - streamMetaClearHbMsg(&req); - - taosArrayDestroy(pList); - return TSDB_CODE_SUCCESS; -} - void freeCheckpointCandEntry(void *param) { SCheckpointCandEntry *pEntry = param; taosMemoryFreeClear(pEntry->pName); @@ -3043,22 +2365,6 @@ void freeTaskList(void* param) { taosArrayDestroy(*pList); } -SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { - void *pIter = NULL; - SSdb *pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; - - while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { - if (pStream->uid == streamId) { - sdbCancelFetch(pSdb, pIter); - return pStream; - } - sdbRelease(pSdb, pStream); - } - - return NULL; -} - static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { int32_t num = taosArrayGetSize(pList); for(int32_t i = 0; i < num; ++i) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index a6dd1c4856..959f69944c 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -160,3 +160,106 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId) return TSDB_CODE_SUCCESS; } + +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name); + if (pTrans == NULL) { + mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg); + + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + terrno = TSDB_CODE_MND_TRANS_CONFLICT; + mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno)); + mndTransDrop(pTrans); + return NULL; + } + + terrno = 0; + return pTrans; +} + +SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + void *buf = NULL; + + SEncoder encoder; + tEncoderInit(&encoder, NULL, 0); + if (tEncodeSStreamObj(&encoder, pStream) < 0) { + tEncoderClear(&encoder); + goto STREAM_ENCODE_OVER; + } + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); + if (pRaw == NULL) goto STREAM_ENCODE_OVER; + + buf = taosMemoryMalloc(tlen); + if (buf == NULL) goto STREAM_ENCODE_OVER; + + tEncoderInit(&encoder, buf, tlen); + if (tEncodeSStreamObj(&encoder, pStream) < 0) { + tEncoderClear(&encoder); + goto STREAM_ENCODE_OVER; + } + tEncoderClear(&encoder); + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + + STREAM_ENCODE_OVER: + taosMemoryFreeClear(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream, + pStream->checkpointId); + return pRaw; +} + +int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) { + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL) { + mError("failed to encode stream since %s", terrstr()); + mndTransDrop(pTrans); + return -1; + } + + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + sdbFreeRaw(pCommitRaw); + mndTransDrop(pTrans); + return -1; + } + + if (sdbSetRawStatus(pCommitRaw, status) != 0) { + mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr()); + sdbFreeRaw(pCommitRaw); + mndTransDrop(pTrans); + return -1; + } + + return 0; +} + +void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode) { + pAction->epSet = *pEpset; + pAction->contLen = contLen; + pAction->pCont = pCont; + pAction->msgType = msgType; + pAction->retryCode = retryCode; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index dd20f38093..138bcbb133 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -97,7 +97,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { } if (pItem->pStreamTask) { - tFreeStreamTask(pItem->pStreamTask, true); + tFreeStreamTask(pItem->pStreamTask); } taosArrayDestroy(pItem->pResList); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b457b1da87..ac1818f877 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -617,7 +617,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve if (code < 0) { tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); - tFreeStreamTask(pTask, true); + tFreeStreamTask(pTask); return code; } @@ -645,7 +645,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve } } else { tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); - tFreeStreamTask(pTask, true); + tFreeStreamTask(pTask); } return code; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 331cf60077..db71b56815 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -257,8 +257,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { STaskDbWrapper* pBackend = *ppBackend; pBackend->pMeta = pMeta; - - pTask->backendRefId = pBackend->refId; pTask->pBackend = pBackend; taosThreadMutexUnlock(&pMeta->backendMutex); @@ -283,7 +281,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { } int64_t tref = taosAddRef(taskDbWrapperId, pBackend); - pTask->backendRefId = tref; pTask->pBackend = pBackend; pBackend->refId = tref; pBackend->pTask = pTask; @@ -599,19 +596,19 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - tFreeStreamTask(pTask, false); + tFreeStreamTask(pTask); return -1; } taosArrayPush(pMeta->pTaskList, &pTask->id); if (streamMetaSaveTask(pMeta, pTask) < 0) { - tFreeStreamTask(pTask, false); + tFreeStreamTask(pTask); return -1; } if (streamMetaCommit(pMeta) < 0) { - tFreeStreamTask(pTask, false); + tFreeStreamTask(pTask); return -1; } @@ -661,7 +658,7 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); - tFreeStreamTask(pTask, true); + tFreeStreamTask(pTask); } else if (ref < 0) { stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); } @@ -871,7 +868,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (tDecodeStreamTask(&decoder, pTask) < 0) { tDecoderClear(&decoder); doClear(pKey, pVal, pCur, pRecycleList); - tFreeStreamTask(pTask, false); + tFreeStreamTask(pTask); stError( "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " "stream manually", @@ -882,7 +879,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { int32_t taskId = pTask->id.taskId; - tFreeStreamTask(pTask, false); + tFreeStreamTask(pTask); STaskId id = streamTaskGetTaskId(pTask); taosArrayPush(pRecycleList, &id); @@ -898,7 +895,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) { doClear(pKey, pVal, pCur, pRecycleList); - tFreeStreamTask(pTask, false); + tFreeStreamTask(pTask); return -1; } @@ -912,7 +909,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { doClear(pKey, pVal, pCur, pRecycleList); - tFreeStreamTask(pTask, false); + tFreeStreamTask(pTask); return -1; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 66d34d8712..3018894132 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -340,16 +340,11 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { return 0; } -void tFreeStreamTask(SStreamTask* pTask, bool metaLock) { +void tFreeStreamTask(SStreamTask* pTask) { char* p = NULL; int32_t taskId = pTask->id.taskId; STaskExecStatisInfo* pStatis = &pTask->execInfo; - // check for mnode -// if (pTask->pMeta != NULL) { -// streamTaskClearHTaskAttr(pTask, metaLock); -// } - ETaskStatus status1 = TASK_STATUS__UNINIT; taosThreadMutexLock(&pTask->lock); if (pTask->status.pSM != NULL) { From 43c035678f4b4b6916a5b27dea5378eba24e04de Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Jan 2024 16:55:05 +0800 Subject: [PATCH 2/3] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStreamHb.c | 297 ++++++++++++++++++++ source/dnode/mnode/impl/src/mndStreamUtil.c | 281 ++++++++++++++++++ 2 files changed, 578 insertions(+) create mode 100644 source/dnode/mnode/impl/src/mndStreamHb.c create mode 100644 source/dnode/mnode/impl/src/mndStreamUtil.c diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c new file mode 100644 index 0000000000..3fe736926b --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndStream.h" +#include "mndTrans.h" + +static void doExtractTasksFromStream(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + saveStreamTasksInfo(pStream, &execInfo); + sdbRelease(pSdb, pStream); + } +} + +static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); + for (int32_t j = 0; j < numOfNodes; ++j) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j); + if (pNodeEntry->nodeId == pTaskEntry->nodeId) { + mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, + pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); + + pNodeEntry->stageUpdated = true; + pTaskEntry->stage = stage; + break; + } + } +} + +static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pInfo) { + int32_t num = taosArrayGetSize(pList); + for(int32_t i = 0; i < num; ++i) { + SFailedCheckpointInfo* p = taosArrayGet(pList, i); + if (p->transId == pInfo->transId) { + return; + } + } + + taosArrayPush(pList, pInfo); +} + +static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); + if (pTrans == NULL) { + return terrno; + } + + taosWLockLatch(&pStream->lock); + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + + // todo extract method, with pause stream task + SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + taosWUnLockLatch(&pStream->lock); + return terrno; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReq); + continue; + } + + if (!hasEpset) { + taosMemoryFree(pReq); + continue; + } + + STransAction action = {0}; + initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + taosWUnLockLatch(&pStream->lock); + mndTransDrop(pTrans); + return terrno; + } + } + } + + taosWUnLockLatch(&pStream->lock); + + int32_t code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + if (code != TSDB_CODE_SUCCESS) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} + +static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { + int32_t code = TSDB_CODE_SUCCESS; + mndKillTransImpl(pMnode, transId, ""); + + SStreamObj *pStream = mndGetStreamObj(pMnode, streamId); + if (pStream == NULL) { + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); + } else { + bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); + if (conflict) { + mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name, + pStream->sourceDb, pStream->targetSTbName); + } else { + mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, + pStream->uid, transId); + code = createStreamResetStatusTrans(pMnode, pStream); + } + } + + mndReleaseStream(pMnode, pStream); + return code; +} + +static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { + int32_t num = taosArrayGetSize(pNodeList); + mInfo("set node expired for %d nodes", num); + + for (int k = 0; k < num; ++k) { + int32_t *pVgId = taosArrayGet(pNodeList, k); + mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); + + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); + for (int i = 0; i < numOfNodes; ++i) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); + + if (pNodeEntry->nodeId == *pVgId) { + mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); + pNodeEntry->stageUpdated = true; + break; + } + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t mndProcessStreamHb(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SStreamHbMsg req = {0}; + SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + + SDecoder decoder = {0}; + tDecoderInit(&decoder, pReq->pCont, pReq->contLen); + + if (tDecodeStreamHbMsg(&decoder, &req) < 0) { + streamMetaClearHbMsg(&req); + tDecoderClear(&decoder); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + tDecoderClear(&decoder); + + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + + taosThreadMutexLock(&execInfo.lock); + + // extract stream task list + int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); + if (numOfExisted == 0) { + doExtractTasksFromStream(pMnode); + } + + initStreamNodeList(pMnode); + + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); + if (numOfUpdated > 0) { + mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId); + setNodeEpsetExpiredFlag(req.pUpdateNodes); + } + + bool snodeChanged = false; + for (int32_t i = 0; i < req.numOfTasks; ++i) { + STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); + if (pTaskEntry == NULL) { + mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); + continue; + } + + if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { + updateStageInfo(pTaskEntry, p->stage); + if (pTaskEntry->nodeId == SNODE_HANDLE) { + snodeChanged = true; + } + } else { + // task is idle for more than 50 sec. + if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging) { + pTaskEntry->inputQUnchangeCounter++; + } else { + pTaskEntry->inputQChanging = false; + } + } else { + pTaskEntry->inputQChanging = true; + pTaskEntry->inputQUnchangeCounter = 0; + } + + streamTaskStatusCopy(pTaskEntry, p); + if (p->checkpointId != 0) { + if (p->checkpointFailed) { + mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, + p->checkpointId, p->chkpointTransId); + + SFailedCheckpointInfo info = { + .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; + addIntoCheckpointList(pList, &info); + } + } + } + + if (p->status == pTaskEntry->status) { + pTaskEntry->statusLastDuration++; + } else { + pTaskEntry->status = p->status; + pTaskEntry->statusLastDuration = 0; + } + + if (p->status != TASK_STATUS__READY) { + mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); + } + } + + // current checkpoint is failed, rollback from the checkpoint trans + // kill the checkpoint trans and then set all tasks status to be normal + if (taosArrayGetSize(pList) > 0) { + bool allReady = true; + SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); + taosArrayDestroy(p); + + if (allReady || snodeChanged) { + // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i); + mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", + pInfo->checkpointId, pInfo->transId); + + mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId); + } + } else { + mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status"); + } + } + + taosThreadMutexUnlock(&execInfo.lock); + streamMetaClearHbMsg(&req); + + taosArrayDestroy(pList); + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c new file mode 100644 index 0000000000..b8bd323fa3 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndStream.h" +#include "mndTrans.h" +#include "tmisce.h" +#include "mndVgroup.h" + +SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + + *allReady = true; + SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) { + break; + } + + SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; + entry.epset = mndGetVgroupEpset(pMnode, pVgroup); + + // if not all ready till now, no need to check the remaining vgroups. + if (*allReady) { + for (int32_t i = 0; i < pVgroup->replica; ++i) { + if (!pVgroup->vnodeGid[i].syncRestore) { + mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); + *allReady = false; + break; + } + + ESyncState state = pVgroup->vnodeGid[i].syncState; + if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { + mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId); + *allReady = false; + break; + } + } + } + + char buf[256] = {0}; + EPSET_TO_STR(&entry.epset, buf); + mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); + taosArrayPush(pVgroupListSnapshot, &entry); + sdbRelease(pSdb, pVgroup); + } + + SSnodeObj *pObj = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + SNodeEntry entry = {0}; + addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); + entry.nodeId = SNODE_HANDLE; + + char buf[256] = {0}; + EPSET_TO_STR(&entry.epset, buf); + mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + taosArrayPush(pVgroupListSnapshot, &entry); + sdbRelease(pSdb, pObj); + } + + return pVgroupListSnapshot; +} + +SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { + void *pIter = NULL; + SSdb *pSdb = pMnode->pSdb; + SStreamObj *pStream = NULL; + + while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { + if (pStream->uid == streamId) { + sdbCancelFetch(pSdb, pIter); + return pStream; + } + sdbRelease(pSdb, pStream); + } + + return NULL; +} + +void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) { + STrans *pTrans = mndAcquireTrans(pMnode, transId); + if (pTrans != NULL) { + mInfo("kill active transId:%d in Db:%s", transId, pDbName); + mndKillTrans(pMnode, pTrans); + mndReleaseTrans(pMnode, pTrans); + } else { + mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId); + } +} + +int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) { + *hasEpset = false; + + pEpSet->numOfEps = 0; + if (nodeId == SNODE_HANDLE) { + SSnodeObj *pObj = NULL; + void *pIter = NULL; + + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter != NULL) { + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + *hasEpset = true; + return TSDB_CODE_SUCCESS; + } else { + mError("failed to acquire snode epset"); + return TSDB_CODE_INVALID_PARA; + } + } else { + SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId); + if (pVgObj != NULL) { + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + epsetAssign(pEpSet, &epset); + *hasEpset = true; + return TSDB_CODE_SUCCESS; + } else { + mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId); + return TSDB_CODE_SUCCESS; + } + } +} + +static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { + SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); + if (pReq == NULL) { + mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + pReq->igUntreated = igUntreated; + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + taosMemoryFree(pReq); + return -1; + } + + STransAction action = {0}; + initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + return 0; +} + +SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { + for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfLevels = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfLevels; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + if (pTask->id.taskId == pId->taskId) { + return pTask; + } + } + } + + return NULL; +} + +int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { + int32_t num = 0; + for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) { + SArray* pLevel = taosArrayGetP(pStream->tasks, i); + num += taosArrayGetSize(pLevel); + } + + return num; +} + +int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { + int32_t size = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < size; i++) { + SArray *pTasks = taosArrayGetP(pStream->tasks, i); + int32_t sz = taosArrayGetSize(pTasks); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pTasks, j); + if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { + return -1; + } + + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { + atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup); + } + } + } + return 0; +} + +static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { + SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); + if (pReq == NULL) { + mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SEpSet epset = {0}; + mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps); + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + taosMemoryFree(pReq); + return -1; + } + + // no valid epset, return directly without redoAction + if (!hasEpset) { + taosMemoryFree(pReq); + return TSDB_CODE_SUCCESS; + } + + STransAction action = {0}; + initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + return 0; +} + +int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + SArray *tasks = pStream->tasks; + + int32_t size = taosArrayGetSize(tasks); + for (int32_t i = 0; i < size; i++) { + SArray *pTasks = taosArrayGetP(tasks, i); + int32_t sz = taosArrayGetSize(pTasks); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pTasks, j); + if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) { + return -1; + } + + if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { + atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + } + } + } + return 0; +} \ No newline at end of file From 3751e11394475df2a9915039b0c6ce0c0715fdda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Jan 2024 17:11:16 +0800 Subject: [PATCH 3/3] fix(stream): fix dead lock. --- source/libs/stream/src/streamCheckpoint.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 98963967fb..50a010d779 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -317,8 +317,9 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { pCKInfo->checkpointVer = pCKInfo->processedVer; streamTaskClearCheckInfo(p, false); - code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&p->lock); + + code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); } else { stDebug("s-task:%s vgId:%d status:%s not keep the checkpoint metaInfo, checkpoint:%" PRId64 " failed", id, vgId, pStatus->name, pCKInfo->checkpointingId);