diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 683a8f2cee..cea648a3a6 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -198,15 +198,6 @@ int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3 int32_t nBuf); int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf); -// for internal usage -int32_t getWordLength(char type); - -int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); -int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); -int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, - bool bigEndian); -int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian); /************************************************************************* * STREAM COMPRESSION diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e7fdb7ae2a..55bb7d1710 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1345,11 +1345,18 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas } ASSERT(pTask->status.downstreamReady == 0); + + // avoid initialization and destroy running concurrently. + taosThreadMutexLock(&pTask->lock); if (pTask->pBackend == NULL) { code = pMeta->expandTaskFn(pTask); + taosThreadMutexUnlock(&pTask->lock); + if (code != TSDB_CODE_SUCCESS) { streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } + } else { + taosThreadMutexUnlock(&pTask->lock); } if (code == TSDB_CODE_SUCCESS) {