fix(stream): add the new node info when adding stream tasks.

This commit is contained in:
Haojun Liao 2024-06-04 23:28:08 +08:00
parent 4fc21cc8b0
commit 29648be30d
4 changed files with 83 additions and 42 deletions

View File

@ -120,6 +120,7 @@ void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
void mndInitExecInfo();
void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
#ifdef __cplusplus
}

View File

@ -62,7 +62,7 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -692,6 +692,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SStreamObj streamObj = {0};
char *sql = NULL;
int32_t sqlLen = 0;
const char* pMsg = "create stream tasks on dnodes";
terrno = TSDB_CODE_SUCCESS;
SCMCreateStreamReq createReq = {0};
@ -704,8 +706,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql);
if (mndCheckCreateStreamReq(&createReq) != 0) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr());
goto _OVER;
@ -745,8 +747,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
STrans *pTrans =
doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes");
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg);
if (pTrans == NULL) {
goto _OVER;
}
@ -789,7 +790,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task_node_list", createReq.name);
mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name);
saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
@ -1030,7 +1031,7 @@ _ERR:
}
int32_t initStreamNodeList(SMnode *pMnode) {
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
}
@ -2203,8 +2204,8 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
epsetToStr(&pEntry->epset, buf, tListLen(buf));
mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf);
}
taosHashCleanup(pHash);
taosHashCleanup(pHash);
return plist;
}
@ -2242,15 +2243,17 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) {
return false;
}
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) {
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
STaskId *pId = taosArrayGet(execInfo.pTaskList, i);
if (pEntry->nodeId == SNODE_HANDLE) continue;
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if (pEntry->nodeId == SNODE_HANDLE) {
continue;
}
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
@ -2266,24 +2269,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t)taosArrayGetSize(execInfo.pTaskList));
int32_t size = taosArrayGetSize(pNodeSnapshot);
SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
removeExpiredNodeInfo(pNodeSnapshot);
for (int32_t j = 0; j < size; ++j) {
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
if (pEntry->nodeId == p->nodeId) {
taosArrayPush(pValidNodeEntryList, p);
break;
}
}
}
taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = pValidNodeEntryList;
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
taosArrayDestroy(pRemovedTasks);
return 0;
}
@ -2314,9 +2301,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
bool allVgroupsReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVgroupsReady);
if (!allVgroupsReady) {
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
taosArrayDestroy(pNodeSnapshot);
atomic_store_32(&mndNodeCheckSentinel, 0);
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
@ -2324,31 +2311,30 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
}
taosThreadMutexLock(&execInfo.lock);
removeExpirednodeEntryAndTask(pNodeSnapshot);
removeExpiredNodeEntryAndTask(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
killAllCheckpointTrans(pMnode, &changeInfo);
code = mndProcessVgroupChange(pMnode, &changeInfo);
// keep the new vnode snapshot if success
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("create trans successfully, update cached node list");
taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = pNodeSnapshot;
execInfo.pNodeList = extractNodeListFromStream(pMnode);
execInfo.ts = ts;
mDebug("create trans successfully, update cached node list, numOfNodes:%d", taosArrayGetSize(execInfo.pNodeList));
} else {
mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
taosArrayDestroy(pNodeSnapshot);
}
} else {
mDebug("no update found in nodeList");
taosArrayDestroy(pNodeSnapshot);
}
taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
@ -2385,8 +2371,27 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num);
// add the new vgroups if not added yet
bool exist = false;
for(int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
SNodeEntry* pEntry = taosArrayGet(pExecNode->pNodeList, j);
if (pEntry->nodeId == pTask->info.nodeId) {
exist = true;
break;
}
}
if (!exist) {
SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
taosArrayPush(pExecNode->pNodeList, &nodeEntry);
mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList));
}
}
}
@ -2394,6 +2399,8 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
@ -2416,8 +2423,10 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
}
destroyStreamTaskIter(pIter);
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
taosThreadMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter);
}
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {

View File

@ -131,18 +131,26 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t *pVgId = taosArrayGet(pNodeList, k);
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
bool setFlag = false;
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
pNodeEntry->stageUpdated = true;
setFlag = true;
break;
}
}
}
if (!setFlag) {
mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist, update it", *pVgId);
ASSERT(0);
return TSDB_CODE_FAILED;
}
}
return TSDB_CODE_SUCCESS;
}
@ -361,7 +369,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pHead->vgId = htonl(req.vgId);
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}

View File

@ -135,6 +135,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
char buf[256] = {0};
epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pObj);
}
@ -571,6 +572,29 @@ void mndInitExecInfo() {
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
}
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry));
int32_t size = taosArrayGetSize(pNodeSnapshot);
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i);
for (int32_t j = 0; j < size; ++j) {
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
if (pEntry->nodeId == p->nodeId) {
taosArrayPush(pValidList, p);
break;
}
}
}
taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = pValidList;
mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList));
}