fix(stream): init the in-mem task list when recv checkpoint-report. check the number of complete downstream check by checking it in hashmap, not just number.

This commit is contained in:
Haojun Liao 2024-08-13 23:20:37 +08:00
parent fdfc8d8310
commit 57a0c7b487
2 changed files with 29 additions and 3 deletions

View File

@ -2564,6 +2564,10 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
}
tDecoderClear(&decoder);
streamMutexLock(&execInfo.lock);
mndInitStreamExecInfo(pMnode, &execInfo);
streamMutexUnlock(&execInfo.lock);
mDebug("receive stream task checkpoint-report msg, vgId:%d, s-task:0x%x, checkpointId:%" PRId64
" checkpointVer:%" PRId64 " transId:%d",
req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);

View File

@ -1589,11 +1589,33 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
}
}
// check all existed tasks are received rsp
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
for (int32_t i = 0; i < numOfTotal; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (pTaskId == NULL) {
continue;
}
STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
if (px == NULL) {
px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
if (px == NULL) {
return false;
}
}
}
return true;
}
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
int32_t vgId = pMeta->vgId;
bool allRsp = true;
streamMetaWLock(pMeta);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
@ -1618,9 +1640,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
return 0;
}
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) {
if (code == TSDB_CODE_DUP_KEY) {
@ -1637,7 +1658,8 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
if (numOfRecv == numOfTotal) {
allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
if (allRsp) {
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;