Merge pull request #26499 from taosdata/fix/3_liaohj
fix(stream): fix race-condition during expand stream tasks.
This commit is contained in:
commit
e273173d20
|
@ -198,15 +198,6 @@ int32_t tsCompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3
|
||||||
int32_t nBuf);
|
int32_t nBuf);
|
||||||
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
int32_t tsDecompressBigint2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg,
|
||||||
void *pBuf, int32_t nBuf);
|
void *pBuf, int32_t nBuf);
|
||||||
// for internal usage
|
|
||||||
int32_t getWordLength(char type);
|
|
||||||
|
|
||||||
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
|
|
||||||
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
|
|
||||||
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
|
|
||||||
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
|
|
||||||
bool bigEndian);
|
|
||||||
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
|
|
||||||
|
|
||||||
/*************************************************************************
|
/*************************************************************************
|
||||||
* STREAM COMPRESSION
|
* STREAM COMPRESSION
|
||||||
|
|
|
@ -1236,8 +1236,15 @@ static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
|
||||||
|
|
||||||
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
||||||
uint16_t opType = pOperator->operatorType;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
uint16_t opType = pOperator->operatorType;
|
||||||
|
|
||||||
|
// check if query task is closed or not
|
||||||
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||||
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
||||||
if (pInfo->pPullDataRes->info.rows != 0) {
|
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||||
|
|
|
@ -1354,11 +1354,18 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
// avoid initialization and destroy running concurrently.
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
if (pTask->pBackend == NULL) {
|
if (pTask->pBackend == NULL) {
|
||||||
code = pMeta->expandTaskFn(pTask);
|
code = pMeta->expandTaskFn(pTask);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue