Merge pull request #24840 from taosdata/fix/nullcheck

fix(stream): fix the compatible issue when the fill-history exists.
This commit is contained in:
Haojun Liao 2024-02-24 00:40:09 +08:00 committed by GitHub
commit 98ff21c879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 1 deletions

View File

@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4;
float tsRatioOfVnodeStreamThreads = 0.5F;
float tsRatioOfVnodeStreamThreads = 1.5F;
int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeRsmaThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4;

View File

@ -494,6 +494,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pRange->range.maxVer = ver;
pRange->range.minVer = ver;
} else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer;
@ -502,6 +504,15 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
pChkInfo->nextProcessVer = 1;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
}
}
}