From 468094c86a8cf1b665368b803ae25ca1a5a5f939 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Nov 2023 15:37:30 +0800 Subject: [PATCH] fix(stream):set failure for the related fill-history task. --- source/libs/stream/src/streamStart.c | 17 ++++++++++++++--- source/util/src/tdecompress.c | 2 +- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index c997a8086e..2f83a0d6fa 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -468,8 +468,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, - taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); + + // automatically set the related fill-history task to be failed. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pId = &pTask->hTaskInfo.id; + + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); + streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaReleaseTask(pTask->pMeta, pHTask); + } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -1069,8 +1077,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { + if (numOfRecv == numOfTotal) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; @@ -1084,6 +1093,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); streamMetaResetStartInfo(pStartInfo); + } else { + stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } streamMetaWUnLock(pMeta); diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index e31d8f9079..e4595d3047 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -259,7 +259,7 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen int32_t longBytes = LONG_BYTES; // _mm_maskz_loadu_epi8 -#if __AVX2__ +#if __AVX512F__ // _mm_blendv_epi8 int32_t batch = nelements >> 4;