fix(stream): extact stream nodes list if not initialized.

This commit is contained in:
Haojun Liao 2023-11-10 16:24:20 +08:00
parent e957e4ad5f
commit 3bacd7516e
2 changed files with 48 additions and 39 deletions

View File

@ -43,7 +43,7 @@ typedef struct SNodeEntry {
} SNodeEntry; } SNodeEntry;
typedef struct SStreamExecInfo { typedef struct SStreamExecInfo {
SArray *pNodeEntryList; SArray *pNodeList;
int64_t ts; // snapshot ts int64_t ts; // snapshot ts
int64_t activeCheckpoint; // active check point id int64_t activeCheckpoint; // active check point id
SHashObj *pTaskMap; SHashObj *pTaskMap;
@ -850,7 +850,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
mDebug("register to stream task node list"); mDebug("stream tasks register into node list");
keepStreamTasksInBuf(&streamObj, &execInfo); keepStreamTasksInBuf(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
@ -1125,6 +1125,15 @@ static const char *mndGetStreamDB(SMnode *pMnode) {
return p; return p;
} }
static int32_t initStreamNodeList(SMnode* pMnode) {
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = extractNodeListFromStream(pMnode);
}
return taosArrayGetSize(execInfo.pNodeList);
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
@ -1135,22 +1144,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
{ // check if the node update happens or not { // check if the node update happens or not
int64_t ts = taosGetTimestampSec(); int64_t ts = taosGetTimestampSec();
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) { taosThreadMutexLock(&execInfo.lock);
if (execInfo.pNodeEntryList != NULL) { int32_t numOfNodes = initStreamNodeList(pMnode);
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); taosThreadMutexUnlock(&execInfo.lock);
}
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode); if (numOfNodes == 0) {
}
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts; execInfo.ts = ts;
return 0; return 0;
} }
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { for(int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) { if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued"); mDebug("stream task not ready due to node update detected, checkpoint not issued");
return 0; return 0;
@ -1165,7 +1170,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
return 0; return 0;
} }
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList); taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap); taosHashCleanup(changeInfo.pDBMap);
@ -2080,20 +2085,21 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
break; break;
} }
SNodeEntry entry = {0}; SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup); entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
entry.nodeId = pVgroup->vgId;
entry.hbTimestamp = pVgroup->updateTime;
// if not all ready till now, no need to check the remaining vgroups.
if (*allReady) { if (*allReady) {
for (int32_t i = 0; i < pVgroup->replica; ++i) { for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (!pVgroup->vnodeGid[i].syncRestore) { if (!pVgroup->vnodeGid[i].syncRestore) {
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
*allReady = false; *allReady = false;
break; break;
} }
ESyncState state = pVgroup->vnodeGid[i].syncState; ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId);
*allReady = false; *allReady = false;
break; break;
} }
@ -2300,8 +2306,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
int32_t size = taosArrayGetSize(pNodeSnapshot); int32_t size = taosArrayGetSize(pNodeSnapshot);
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i); SNodeEntry* p = taosArrayGet(execInfo.pNodeList, i);
for(int32_t j = 0; j < size; ++j) { for(int32_t j = 0; j < size; ++j) {
SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j); SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j);
@ -2312,8 +2318,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
} }
} }
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeEntryList = pValidNodeEntryList; execInfo.pNodeList = pValidNodeEntryList;
mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList)); mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList));
taosArrayDestroy(pRemovedTasks); taosArrayDestroy(pRemovedTasks);
@ -2323,6 +2329,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
// this function runs by only one thread, so it is not multi-thread safe // this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
if (old != 0) { if (old != 0) {
mDebug("still in checking node change"); mDebug("still in checking node change");
@ -2333,23 +2340,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int64_t ts = taosGetTimestampSec(); int64_t ts = taosGetTimestampSec();
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) {
if (execInfo.pNodeEntryList != NULL) {
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
}
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode);
}
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) { taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
taosThreadMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) {
mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
execInfo.ts = ts; execInfo.ts = ts;
atomic_store_32(&mndNodeCheckSentinel, 0); atomic_store_32(&mndNodeCheckSentinel, 0);
return 0; return 0;
} }
bool allVnodeReady = true; bool allVgroupsReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady); SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVgroupsReady);
if (!allVnodeReady) { if (!allVgroupsReady) {
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
atomic_store_32(&mndNodeCheckSentinel, 0); atomic_store_32(&mndNodeCheckSentinel, 0);
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check"); mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
@ -2359,7 +2364,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
removeExpirednodeEntryAndTask(pNodeSnapshot); removeExpirednodeEntryAndTask(pNodeSnapshot);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide. // kill current active checkpoint transaction, since the transaction is vnode wide.
@ -2369,8 +2374,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
// keep the new vnode snapshot // keep the new vnode snapshot
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("create trans successfully, update cached node list"); mDebug("create trans successfully, update cached node list");
taosArrayDestroy(execInfo.pNodeEntryList); taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeEntryList = pNodeSnapshot; execInfo.pNodeList = pNodeSnapshot;
execInfo.ts = ts; execInfo.ts = ts;
} else { } else {
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code)); mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
@ -2616,9 +2621,9 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t* pVgId = taosArrayGet(pNodeList, k); int32_t* pVgId = taosArrayGet(pNodeList, k);
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int i = 0; i < numOfNodes; ++i) { for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->nodeId == *pVgId) { if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
@ -2632,9 +2637,9 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
} }
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for(int32_t j = 0; j < numOfNodes; ++j) { for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) { if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
@ -2667,14 +2672,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
// extract stream task list
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
if (numOfExisted == 0) { if (numOfExisted == 0) {
doExtractTasksFromStream(pMnode); doExtractTasksFromStream(pMnode);
} }
initStreamNodeList(pMnode);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) { if (numOfUpdated > 0) {
mDebug("%d stream nodes needs updated from tasks' report", (int32_t)taosArrayGetSize(req.pUpdateNodes)); mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
setNodeEpsetExpiredFlag(req.pUpdateNodes); setNodeEpsetExpiredFlag(req.pUpdateNodes);
} }

View File

@ -1020,7 +1020,7 @@ void metaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbCount += 1; pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount); pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg); tmsgSendReq(&epset, &msg);
} else { } else {