fix(stream):set failure for the related fill-history task.

This commit is contained in:
Haojun Liao 2023-11-24 15:37:30 +08:00
parent 8515a2f9b6
commit 468094c86a
2 changed files with 15 additions and 4 deletions

View File

@ -468,8 +468,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
}
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
taosGetTimestampMs(), false);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false);
streamMetaReleaseTask(pTask->pMeta, pHTask);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
@ -1069,8 +1077,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
if (numOfRecv == numOfTotal) {
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
@ -1084,6 +1093,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
} else {
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);
}
streamMetaWUnLock(pMeta);

View File

@ -259,7 +259,7 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen
int32_t longBytes = LONG_BYTES;
// _mm_maskz_loadu_epi8
#if __AVX2__
#if __AVX512F__
// _mm_blendv_epi8
int32_t batch = nelements >> 4;