enh(stream): add node stage check.

This commit is contained in:
Haojun Liao 2023-09-20 10:35:01 +08:00
parent b99232fd7a
commit b4419bda65
3 changed files with 61 additions and 25 deletions

View File

@ -556,6 +556,8 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct STaskStatusEntry {
STaskId id;
int32_t status;
int32_t stage;
int32_t nodeId;
} STaskStatusEntry;
typedef struct SStreamHbMsg {

View File

@ -37,17 +37,18 @@
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SStreamVnodeRevertIndex {
typedef struct SStreamExecNodeInfo {
SArray *pNodeEntryList;
int64_t ts; // snapshot ts
SHashObj *pTaskMap;
SArray *pTaskList;
TdThreadMutex lock;
} SStreamVnodeRevertIndex;
} SStreamExecNodeInfo;
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
@ -55,7 +56,7 @@ typedef struct SVgroupChangeInfo {
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
static SStreamVnodeRevertIndex execNodeList;
static SStreamExecNodeInfo execNodeList;
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
@ -75,7 +76,6 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
int64_t streamId, int32_t taskId);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
static SArray *doExtractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
@ -83,8 +83,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -1158,12 +1158,19 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execNodeList.ts = ts;
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;
}
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return 0;
}
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
@ -1173,7 +1180,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update, not generate checkpoint");
mDebug("stream task not ready due to node update, checkpoint not issued");
return 0;
}
}
@ -1190,7 +1197,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
if (pEntry->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false;
break;
@ -2028,7 +2035,7 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
if (pCurrent->nodeId == pPrevEntry->nodeId) {
if (isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0};
@ -2202,6 +2209,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
taosThreadMutexLock(&execNodeList.lock);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
code = mndProcessVgroupChange(pMnode, &changeInfo);
@ -2218,6 +2226,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
taosArrayDestroy(pNodeSnapshot);
}
taosThreadMutexUnlock(&execNodeList.lock);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
@ -2244,7 +2253,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2256,8 +2265,11 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {
.id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
STaskStatusEntry entry = {.id.streamId = pTask->id.streamId,
.id.taskId = pTask->id.taskId,
.stage = -1,
.nodeId = pTask->info.nodeId,
.status = TASK_STATUS__STOP};
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
}
@ -2265,7 +2277,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod
}
}
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) {
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2298,7 +2310,6 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecN
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -2326,11 +2337,29 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
continue;
}
if (p->stage != pEntry->stage && pEntry->stage != -1) {
int32_t numOfNodes = taosArrayGetSize(execNodeList.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, j);
if (pNodeEntry->nodeId == pEntry->nodeId) {
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate should be trigger by s-task:0x%" PRIx64,
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pEntry->stage = p->stage;
break;
}
}
} else {
pEntry->stage = p->stage;
}
pEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status));
}
}
taosThreadMutexUnlock(&execNodeList.lock);
taosArrayDestroy(req.pTaskStatus);

View File

@ -766,6 +766,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
@ -778,15 +780,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry hb = {0};
if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1;
int32_t taskId = 0;
STaskStatusEntry entry = {0};
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
hb.id.taskId = taskId;
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1;
taosArrayPush(pReq->pTaskStatus, &hb);
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
}
tEndDecode(pDecoder);
@ -855,7 +859,8 @@ void metaHbToMnode(void* param, void* tmrId) {
continue;
}
STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus};
STaskStatusEntry entry = {
.id = *pId, .status = (*pTask)->status.taskStatus, .nodeId = pMeta->vgId, .stage = pMeta->stage};
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasValEpset) {