enh(stream): add node stage check.

This commit is contained in:
Haojun Liao 2023-09-20 10:35:01 +08:00
parent 927f2d896c
commit e27111ecd1
3 changed files with 61 additions and 25 deletions

View File

@ -556,6 +556,8 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct STaskStatusEntry { typedef struct STaskStatusEntry {
STaskId id; STaskId id;
int32_t status; int32_t status;
int32_t stage;
int32_t nodeId;
} STaskStatusEntry; } STaskStatusEntry;
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {

View File

@ -37,17 +37,18 @@
typedef struct SNodeEntry { typedef struct SNodeEntry {
int32_t nodeId; int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second int64_t hbTimestamp; // second
} SNodeEntry; } SNodeEntry;
typedef struct SStreamVnodeRevertIndex { typedef struct SStreamExecNodeInfo {
SArray *pNodeEntryList; SArray *pNodeEntryList;
int64_t ts; // snapshot ts int64_t ts; // snapshot ts
SHashObj *pTaskMap; SHashObj *pTaskMap;
SArray *pTaskList; SArray *pTaskList;
TdThreadMutex lock; TdThreadMutex lock;
} SStreamVnodeRevertIndex; } SStreamExecNodeInfo;
typedef struct SVgroupChangeInfo { typedef struct SVgroupChangeInfo {
SHashObj *pDBMap; SHashObj *pDBMap;
@ -55,7 +56,7 @@ typedef struct SVgroupChangeInfo {
} SVgroupChangeInfo; } SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0; static int32_t mndNodeCheckSentinel = 0;
static SStreamVnodeRevertIndex execNodeList; static SStreamExecNodeInfo execNodeList;
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
@ -75,7 +76,6 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
int64_t streamId, int32_t taskId); int64_t streamId, int32_t taskId);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
static SArray *doExtractNodeListFromStream(SMnode *pMnode); static SArray *doExtractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
@ -83,8 +83,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
int32_t mndInitStream(SMnode *pMnode) { int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
@ -1158,12 +1158,19 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
} }
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
execNodeList.ts = ts; execNodeList.ts = ts;
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0; return 0;
} }
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return 0;
}
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
@ -1173,7 +1180,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) { if (nodeUpdated) {
mDebug("stream task not ready due to node update, not generate checkpoint"); mDebug("stream task not ready due to node update, checkpoint not issued");
return 0; return 0;
} }
} }
@ -1190,7 +1197,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
} }
if (pEntry->status != TASK_STATUS__NORMAL) { if (pEntry->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false; ready = false;
break; break;
@ -2028,7 +2035,7 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j); SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
if (pCurrent->nodeId == pPrevEntry->nodeId) { if (pCurrent->nodeId == pPrevEntry->nodeId) {
if (isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) { if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0}; char buf[256] = {0};
@ -2202,6 +2209,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
taosThreadMutexLock(&execNodeList.lock);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
code = mndProcessVgroupChange(pMnode, &changeInfo); code = mndProcessVgroupChange(pMnode, &changeInfo);
@ -2218,6 +2226,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
} }
taosThreadMutexUnlock(&execNodeList.lock);
taosArrayDestroy(changeInfo.pUpdateNodeList); taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap); taosHashCleanup(changeInfo.pDBMap);
@ -2244,7 +2253,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0; return 0;
} }
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks); int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) { for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i); SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2256,8 +2265,11 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) { if (p == NULL) {
STaskStatusEntry entry = { STaskStatusEntry entry = {.id.streamId = pTask->id.streamId,
.id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; .id.taskId = pTask->id.taskId,
.stage = -1,
.nodeId = pTask->info.nodeId,
.status = TASK_STATUS__STOP};
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id); taosArrayPush(pExecNode->pTaskList, &id);
} }
@ -2265,7 +2277,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod
} }
} }
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks); int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) { for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i); SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2296,9 +2308,8 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecN
// todo: this process should be executed by the write queue worker of the mnode // todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen); tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -2326,11 +2337,29 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
continue; continue;
} }
if (p->stage != pEntry->stage && pEntry->stage != -1) {
int32_t numOfNodes = taosArrayGetSize(execNodeList.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate should be trigger by s-task:0x%" PRIx64,
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pEntry->stage = p->stage;
break;
}
}
} else {
pEntry->stage = p->stage;
}
pEntry->status = p->status; pEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) { if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
} }
} }
taosThreadMutexUnlock(&execNodeList.lock); taosThreadMutexUnlock(&execNodeList.lock);
taosArrayDestroy(req.pTaskStatus); taosArrayDestroy(req.pTaskStatus);

View File

@ -767,6 +767,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
} }
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
@ -779,15 +781,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < pReq->numOfTasks; ++i) { for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry hb = {0}; int32_t taskId = 0;
if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1; STaskStatusEntry entry = {0};
int32_t taskId = 0;
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1; if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
hb.id.taskId = taskId; entry.id.taskId = taskId;
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1; taosArrayPush(pReq->pTaskStatus, &entry);
taosArrayPush(pReq->pTaskStatus, &hb);
} }
tEndDecode(pDecoder); tEndDecode(pDecoder);
@ -856,7 +860,8 @@ void metaHbToMnode(void* param, void* tmrId) {
continue; continue;
} }
STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus}; STaskStatusEntry entry = {
.id = *pId, .status = (*pTask)->status.taskStatus, .nodeId = pMeta->vgId, .stage = pMeta->stage};
taosArrayPush(hbMsg.pTaskStatus, &entry); taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasValEpset) { if (!hasValEpset) {