refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-06-07 10:09:40 +08:00
parent 385e1a8b0d
commit 34205ba62c
4 changed files with 57 additions and 53 deletions

View File

@ -106,7 +106,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t extractStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
@ -121,9 +121,7 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
void mndInitExecInfo();
void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
void removeInvalidTasks(SArray* pTaskIds);
void removeTasksInBuf(SArray* pTaskIds, SStreamExecInfo* pExecInfo);
#ifdef __cplusplus
}

View File

@ -56,12 +56,12 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -792,7 +792,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name);
saveStreamTasksInfo(&streamObj, &execInfo);
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
// execute creation
@ -815,7 +815,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen);
} else {
char detail[1000] = {0};
sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
snprintf(detail, tListLen(detail), "dbname:%s, stream name:%s", dbname.dbname, name.dbname);
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail));
}
@ -827,6 +827,7 @@ _OVER:
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createReq);
tFreeStreamObj(&streamObj);
if (sql != NULL) {
taosMemoryFreeClear(sql);
}
@ -839,6 +840,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
int64_t maxChkptId = 0;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
@ -1033,8 +1035,7 @@ _ERR:
int32_t extractStreamNodeList(SMnode *pMnode) {
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
extractNodeListFromStream(pMnode, execInfo.pNodeList);
}
return taosArrayGetSize(execInfo.pNodeList);
@ -1071,22 +1072,23 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update");
mDebug("stream tasks not ready due to node update");
}
taosThreadMutexUnlock(&execInfo.lock);
return nodeUpdated;
}
static int32_t mndCheckNodeStatus(SMnode *pMnode) {
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
int64_t ts = taosGetTimestampSec();
if (taskNodeIsUpdated(pMnode)) {
return -1;
}
@ -1094,7 +1096,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
taosThreadMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts;
ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0);
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
@ -1148,12 +1150,13 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t code = 0;
int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
return code;
}
SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval));
int64_t now = taosGetTimestampMs();
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
int64_t duration = now - pStream->checkpointFreq;
if (duration < tsStreamCheckpointInterval * 1000) {
@ -2166,7 +2169,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return 0;
}
static SArray *extractNodeListFromStream(SMnode *pMnode) {
static SArray *extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -2195,13 +2198,13 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
sdbRelease(pSdb, pStream);
}
SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry));
taosArrayClear(pNodeList);
// convert to list
pIter = NULL;
while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
SNodeEntry *pEntry = (SNodeEntry *)pIter;
taosArrayPush(plist, pEntry);
taosArrayPush(pNodeList, pEntry);
char buf[256] = {0};
epsetToStr(&pEntry->epset, buf, tListLen(buf));
@ -2209,7 +2212,7 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
taosHashCleanup(pHash);
return plist;
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
@ -2243,7 +2246,7 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
}
}
removeInvalidTasks(pRemovedTasks);
removeTasksInBuf(pRemovedTasks, &execInfo);
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList));
@ -2301,8 +2304,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
// keep the new vnode snapshot if success
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
extractNodeListFromStream(pMnode, execInfo.pNodeList);
execInfo.ts = ts;
mDebug("create trans successfully, update cached node list, numOfNodes:%d",
(int)taosArrayGetSize(execInfo.pNodeList));
@ -2339,7 +2341,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
@ -2379,23 +2381,6 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
destroyStreamTaskIter(pIter);
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
doRemoveTasks(pExecNode, &id);
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {

View File

@ -22,7 +22,7 @@ typedef struct SFailedCheckpointInfo {
int32_t transId;
} SFailedCheckpointInfo;
static void extractStreamTasks(SMnode *pMnode) {
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo* pExecInfo) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -33,17 +33,17 @@ static void extractStreamTasks(SMnode *pMnode) {
break;
}
saveStreamTasksInfo(pStream, &execInfo);
saveTaskAndNodeInfoIntoBuf(pStream, pExecInfo);
sdbRelease(pSdb, pStream);
}
}
static void removeDroppedStreams(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
static void removeDroppedStreamTasksInBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
int32_t num = taosArrayGetSize(pExecInfo->pTaskList);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SArray *pIdList = taosArrayInit(4, sizeof(STaskId));
SArray* pInvalid = taosArrayInit(4, sizeof(STaskId));
for (int32_t i = 0; i < num; ++i) {
STaskId* pId = taosArrayGet(pExecInfo->pTaskList, i);
@ -57,11 +57,13 @@ static void removeDroppedStreams(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
mndReleaseStream(pMnode, pObj);
taosHashPut(pHash, &pId->streamId, sizeof(int64_t), NULL, 0);
} else {
taosArrayPush(pInvalid, pId);
taosArrayPush(pIdList, pId);
}
}
removeInvalidTasks(pInvalid);
removeTasksInBuf(pIdList, &execInfo);
taosArrayDestroy(pIdList);
taosHashCleanup(pHash);
}
@ -286,11 +288,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// extract stream task list
if (taosHashGetSize(execInfo.pTaskMap) == 0) {
extractStreamTasks(pMnode);
addAllStreamTasksIntoBuf(pMnode, &execInfo);
} else {
// the already dropped tasks may be added by hb from vnode at the time when the pTaskMap happens to be empty.
// let's drop them here.
removeDroppedStreams(pMnode, &execInfo);
removeDroppedStreamTasksInBuf(pMnode, &execInfo);
}
extractStreamNodeList(pMnode);

View File

@ -26,6 +26,8 @@ struct SStreamTaskIter {
SStreamTask *pTask;
};
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (pIter == NULL) {
@ -613,7 +615,7 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)pRemovedId->taskId, num);
mInfo("s-task:0x%x removed from buffer, remain:%d in buffer list", (int32_t)pRemovedId->taskId, num);
break;
}
}
@ -621,9 +623,26 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
return TSDB_CODE_SUCCESS;
}
void removeInvalidTasks(SArray *pTaskIds) {
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
STaskId *pId = taosArrayGet(pTaskIds, i);
doRemoveTasks(&execInfo, pId);
doRemoveTasks(pExecInfo, pId);
}
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
doRemoveTasks(pExecNode, &id);
}
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}