fix(stream): fix race-condition during expand stream tasks.

This commit is contained in:
Haojun Liao 2024-07-10 17:46:33 +08:00
parent 8db4722be4
commit 60fa674be8
2 changed files with 7 additions and 9 deletions

View File

@ -198,15 +198,6 @@ int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3
int32_t nBuf);
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
void *pBuf, int32_t nBuf);
// for internal usage
int32_t getWordLength(char type);
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool bigEndian);
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
/*************************************************************************
* STREAM COMPRESSION

View File

@ -1345,11 +1345,18 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
}
ASSERT(pTask->status.downstreamReady == 0);
// avoid initialization and destroy running concurrently.
taosThreadMutexLock(&pTask->lock);
if (pTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pTask);
taosThreadMutexUnlock(&pTask->lock);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
} else {
taosThreadMutexUnlock(&pTask->lock);
}
if (code == TSDB_CODE_SUCCESS) {