Merge pull request #23900 from taosdata/fix/3_liaohj
fix(stream): merge fix from main to 3.0
This commit is contained in:
commit
3194ff41aa
|
@ -853,7 +853,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
|
|||
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||
int64_t endTs, bool ready);
|
||||
void streamMetaRLock(SStreamMeta* pMeta);
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||
void streamMetaWLock(SStreamMeta* pMeta);
|
||||
|
|
|
@ -475,6 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||
|
||||
if (!isLeader) {
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
|
||||
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
|
||||
return code;
|
||||
|
@ -482,6 +483,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
||||
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
|
@ -670,7 +672,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
|
|||
streamLaunchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||
pTask->execInfo.start, true);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||
pTask->id.idStr, numOfDowns, el, p);
|
||||
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||
pTask->execInfo.start, true);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -469,14 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||
}
|
||||
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||
taosGetTimestampMs(), false);
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
|
||||
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
|
||||
streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false);
|
||||
streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
|
||||
taosGetTimestampMs(), false);
|
||||
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
||||
}
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||
|
@ -1066,14 +1069,12 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||
int64_t endTs, bool ready) {
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
STaskId id = streamTaskExtractKey(pTask);
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
|
||||
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
|
||||
|
||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||
|
@ -1086,15 +1087,14 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
|
|||
pStartInfo->readyTs = taosGetTimestampMs();
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64
|
||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pStartInfo->elapsedTime / 1000.0);
|
||||
|
||||
// print the initialization elapsed time and info
|
||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
} else {
|
||||
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
|
||||
|
|
Loading…
Reference in New Issue