From 60fa674be8f869bb9f66f3d60af176b046de9685 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Jul 2024 17:46:33 +0800 Subject: [PATCH 1/2] fix(stream): fix race-condition during expand stream tasks. --- include/util/tcompression.h | 9 --------- source/libs/stream/src/streamMeta.c | 7 +++++++ 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 683a8f2cee..cea648a3a6 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -198,15 +198,6 @@ int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 int32_t nBuf); int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); -// for internal usage -int32_t getWordLength(char type); - -int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); -int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, - bool bigEndian); -int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian); /************************************************************************* * STREAM COMPRESSION diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e7fdb7ae2a..55bb7d1710 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1345,11 +1345,18 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas } ASSERT(pTask->status.downstreamReady == 0); + + // avoid initialization and destroy running concurrently. + taosThreadMutexLock(&pTask->lock); if (pTask->pBackend == NULL) { code = pMeta->expandTaskFn(pTask); + taosThreadMutexUnlock(&pTask->lock); + if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } + } else { + taosThreadMutexUnlock(&pTask->lock); } if (code == TSDB_CODE_SUCCESS) { From 7214cea808dff1c0ffbe932b2e52465e180813f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Jul 2024 18:16:51 +0800 Subject: [PATCH 2/2] fix(stream): check if task is killed during generated interval results. --- source/libs/executor/src/streamtimewindowoperator.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5b9c018bba..d609fdf219 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1236,8 +1236,15 @@ static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) { static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - uint16_t opType = pOperator->operatorType; + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + + // check if query task is closed or not + if (isTaskKilled(pTaskInfo)) { + return NULL; + } + if (IS_FINAL_INTERVAL_OP(pOperator)) { doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); if (pInfo->pPullDataRes->info.rows != 0) {