diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 683a8f2cee..cea648a3a6 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -198,15 +198,6 @@ int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 int32_t nBuf); int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); -// for internal usage -int32_t getWordLength(char type); - -int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); -int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, - bool bigEndian); -int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian); /************************************************************************* * STREAM COMPRESSION diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 1948fe998f..96c08f19f9 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1236,8 +1236,15 @@ static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) { static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - uint16_t opType = pOperator->operatorType; + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + + // check if query task is closed or not + if (isTaskKilled(pTaskInfo)) { + return NULL; + } + if (IS_FINAL_INTERVAL_OP(pOperator)) { doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); if (pInfo->pPullDataRes->info.rows != 0) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fa3a3ea07d..d0b1f6ca93 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1354,11 +1354,18 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas } ASSERT(pTask->status.downstreamReady == 0); + + // avoid initialization and destroy running concurrently. + taosThreadMutexLock(&pTask->lock); if (pTask->pBackend == NULL) { code = pMeta->expandTaskFn(pTask); + taosThreadMutexUnlock(&pTask->lock); + if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } + } else { + taosThreadMutexUnlock(&pTask->lock); } if (code == TSDB_CODE_SUCCESS) {