From 5313543a9eae58e14e98ac9254cbcf21b3e7db8c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 21 Oct 2023 15:21:02 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 156 ++++++++++++------------ 1 file changed, 80 insertions(+), 76 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e70aed79f7..211af8223a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -42,22 +42,22 @@ typedef struct SNodeEntry { int64_t hbTimestamp; // second } SNodeEntry; -typedef struct SStreamExecNodeInfo { +typedef struct SStreamExecInfo { SArray *pNodeEntryList; int64_t ts; // snapshot ts int64_t activeCheckpoint; // active check point id SHashObj *pTaskMap; SArray *pTaskList; TdThreadMutex lock; -} SStreamExecNodeInfo; +} SStreamExecInfo; typedef struct SVgroupChangeInfo { SHashObj *pDBMap; SArray *pUpdateNodeList; // SArray } SVgroupChangeInfo; -static int32_t mndNodeCheckSentinel = 0; -static SStreamExecNodeInfo execNodeList; +static int32_t mndNodeCheckSentinel = 0; +static SStreamExecInfo execInfo; static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); @@ -77,18 +77,17 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in int64_t streamId, int32_t taskId); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); +static SArray *extractNodeListFromStream(SMnode *pMnode); +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); -static SArray *extractNodeListFromStream(SMnode *pMnode); -static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); - -static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); +static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillActiveCheckpointTrans(SMnode *pMnode); @@ -130,18 +129,18 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); - taosThreadMutexInit(&execNodeList.lock, NULL); - execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); - execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId)); + taosThreadMutexInit(&execInfo.lock, NULL); + execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); return sdbSetTable(pMnode->pSdb, table); } void mndCleanupStream(SMnode *pMnode) { - taosArrayDestroy(execNodeList.pTaskList); - taosHashCleanup(execNodeList.pTaskMap); - taosThreadMutexDestroy(&execNodeList.lock); - mDebug("mnd stream cleanup"); + taosArrayDestroy(execInfo.pTaskList); + taosHashCleanup(execInfo.pTaskMap); + taosThreadMutexDestroy(&execInfo.lock); + mDebug("mnd stream exec info cleanup"); } SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { @@ -848,10 +847,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); - taosThreadMutexLock(&execNodeList.lock); + taosThreadMutexLock(&execInfo.lock); mDebug("register to stream task node list"); - keepStreamTasksInBuf(&streamObj, &execNodeList); - taosThreadMutexUnlock(&execNodeList.lock); + keepStreamTasksInBuf(&streamObj, &execInfo); + taosThreadMutexUnlock(&execInfo.lock); code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -883,9 +882,8 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { return 0; } - int64_t checkpointId = taosGetTimestampMs(); SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->checkpointId = checkpointId; + pMsg->checkpointId = taosGetTimestampMs(); int32_t size = sizeof(SMStreamDoCheckpointMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; @@ -1085,6 +1083,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream pStream->checkpointId = chkptId; pStream->checkpointFreq = taosGetTimestampMs(); pStream->currentTick = 0; + // 3. commit log: stream checkpoint info pStream->version = pStream->version + 1; @@ -1134,22 +1133,22 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { { // check if the node update happens or not int64_t ts = taosGetTimestampSec(); - if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { - if (execNodeList.pNodeEntryList != NULL) { - execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); + if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) { + if (execInfo.pNodeEntryList != NULL) { + execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); } - execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode); + execInfo.pNodeEntryList = extractNodeListFromStream(pMnode); } - if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { + if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); - execNodeList.ts = ts; + execInfo.ts = ts; return 0; } - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i); + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); return 0; @@ -1158,7 +1157,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); @@ -1173,10 +1172,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { { // check if all tasks are in TASK_STATUS__READY status bool ready = true; - taosThreadMutexLock(&execNodeList.lock); - for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { - STaskId *p = taosArrayGet(execNodeList.pTaskList, i); - STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); + taosThreadMutexLock(&execInfo.lock); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); if (pEntry == NULL) { continue; } @@ -1188,7 +1187,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { break; } } - taosThreadMutexUnlock(&execNodeList.lock); + taosThreadMutexUnlock(&execInfo.lock); if (!ready) { return 0; } @@ -1229,11 +1228,16 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { if (code == 0) { if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("failed to prepre trans rebalance since %s", terrstr()); + mError("failed to prepare trans rebalance since %s", terrstr()); } } mndTransDrop(pTrans); + + // only one trans here + taosThreadMutexLock(&execInfo.lock); + execInfo.activeCheckpoint = checkpointId; + taosThreadMutexUnlock(&execInfo.lock); return code; } @@ -1311,7 +1315,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - removeStreamTasksInBuf(pStream, &execNodeList); + removeStreamTasksInBuf(pStream, &execInfo); SName name = {0}; tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB); @@ -1562,7 +1566,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock char status[20 + VARSTR_HEADER_SIZE] = {0}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id)); + STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (pe == NULL) { continue; } @@ -2196,12 +2200,12 @@ static void doExtractTasksFromStream(SMnode *pMnode) { break; } - keepStreamTasksInBuf(pStream, &execNodeList); + keepStreamTasksInBuf(pStream, &execInfo); sdbRelease(pSdb, pStream); } } -static int32_t doRemoveFromTask(SStreamExecNodeInfo* pExecNode, STaskId* pRemovedId) { +static int32_t doRemoveFromTask(SStreamExecInfo* pExecNode, STaskId* pRemovedId) { void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); if (p != NULL) { @@ -2236,10 +2240,10 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) { int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId)); - int32_t numOfTask = taosArrayGetSize(execNodeList.pTaskList); + int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); for(int32_t i = 0; i < numOfTask; ++i) { - STaskId* pId = taosArrayGet(execNodeList.pTaskList, i); - STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, pId, sizeof(*pId)); + STaskId* pId = taosArrayGet(execInfo.pTaskList, i); + STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { @@ -2249,16 +2253,16 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) { STaskId* pId = taosArrayGet(pRemoveTaskList, i); - doRemoveFromTask(&execNodeList, pId); + doRemoveFromTask(&execInfo, pId); } mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList), - (int32_t) taosArrayGetSize(execNodeList.pTaskList)); + (int32_t) taosArrayGetSize(execInfo.pTaskList)); int32_t size = taosArrayGetSize(pNodeSnapshot); SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); - for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { - SNodeEntry* p = taosArrayGet(execNodeList.pNodeEntryList, i); + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { + SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i); for(int32_t j = 0; j < size; ++j) { SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j); @@ -2269,8 +2273,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { } } - execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); - execNodeList.pNodeEntryList = pValidNodeEntryList; + execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); + execInfo.pNodeEntryList = pValidNodeEntryList; taosArrayDestroy(pRemoveTaskList); return 0; @@ -2289,26 +2293,26 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int64_t ts = taosGetTimestampSec(); SMnode *pMnode = pMsg->info.node; - if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { - if (execNodeList.pNodeEntryList != NULL) { - execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); + if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) { + if (execInfo.pNodeEntryList != NULL) { + execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); } - execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode); + execInfo.pNodeEntryList = extractNodeListFromStream(pMnode); } - if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { + if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) { mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); - execNodeList.ts = ts; + execInfo.ts = ts; atomic_store_32(&mndNodeCheckSentinel, 0); return 0; } SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); - taosThreadMutexLock(&execNodeList.lock); + taosThreadMutexLock(&execInfo.lock); removeExpirednodeEntryAndTask(pNodeSnapshot); - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { // kill current active checkpoint transaction, since the transaction is vnode wide. @@ -2318,16 +2322,16 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { // keep the new vnode snapshot if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("create trans successfully, update cached node list"); - taosArrayDestroy(execNodeList.pNodeEntryList); - execNodeList.pNodeEntryList = pNodeSnapshot; - execNodeList.ts = ts; + taosArrayDestroy(execInfo.pNodeEntryList); + execInfo.pNodeEntryList = pNodeSnapshot; + execInfo.ts = ts; } } else { mDebug("no update found in nodeList"); taosArrayDestroy(pNodeSnapshot); } - taosThreadMutexUnlock(&execNodeList.lock); + taosThreadMutexUnlock(&execInfo.lock); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); @@ -2359,7 +2363,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) { +void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { @@ -2384,7 +2388,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) { } } -void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode) { +void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2577,24 +2581,24 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); - taosThreadMutexLock(&execNodeList.lock); - int32_t numOfExisted = taosHashGetSize(execNodeList.pTaskMap); + taosThreadMutexLock(&execInfo.lock); + int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); if (numOfExisted == 0) { doExtractTasksFromStream(pMnode); } for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id)); + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); continue; } if (p->stage != pEntry->stage && pEntry->stage != -1) { - int32_t numOfNodes = taosArrayGetSize(execNodeList.pNodeEntryList); + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); for(int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, j); + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); if (pNodeEntry->nodeId == pEntry->nodeId) { mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId); @@ -2628,16 +2632,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (checkpointFailed && activeCheckpointId != 0) { - if (execNodeList.activeCheckpoint != activeCheckpointId) { - mInfo("checkpointId:%"PRId64" failed, issue task-reset trans to reset all tasks status", activeCheckpointId); - execNodeList.activeCheckpoint = activeCheckpointId; - mndResetFromCheckpoint(pMnode); - } else { - mDebug("checkpoint:%"PRId64" reset has issued already, ignore it", activeCheckpointId); - } + ASSERT(execInfo.activeCheckpoint == activeCheckpointId); + mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId); + // execInfo.activeCheckpoint = activeCheckpointId; + mndResetFromCheckpoint(pMnode); + // } else { + // mDebug("checkpoint:%"PRId64" reset has issued already, ignore it", activeCheckpointId); + // } } - taosThreadMutexUnlock(&execNodeList.lock); + taosThreadMutexUnlock(&execInfo.lock); taosArrayDestroy(req.pTaskStatus); return TSDB_CODE_SUCCESS;