fix(stream):set correct version for sink/agg tasks.

This commit is contained in:
Haojun Liao 2024-03-14 10:04:43 +08:00
parent 2b7fd0d15c
commit 190b02dd1a
2 changed files with 40 additions and 35 deletions

View File

@ -860,6 +860,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer);
ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer);
}
return 0;

View File

@ -458,12 +458,49 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x free task completed", taskId);
}
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
SDataRange* pRange = &pTask->dataRange;
// only set the version info for stream tasks without fill-history task
if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint
pChkInfo->processedVer = ver - 1; // already processed version
pChkInfo->nextProcessVer = ver; // next processed version
pRange->range.maxVer = ver;
pRange->range.minVer = ver;
} else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer;
pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
} else {
pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
pChkInfo->nextProcessVer = 1;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
}
}
}
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1;
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->inputq.queue = streamQueueOpen(512 << 10);
pTask->outputq.queue = streamQueueOpen(512 << 10);
if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) {
@ -481,41 +518,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
}
pTask->execInfo.created = taosGetTimestampMs();
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
SDataRange* pRange = &pTask->dataRange;
// only set the version info for stream tasks without fill-history task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) {
pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint
pChkInfo->processedVer = ver - 1; // already processed version
pChkInfo->nextProcessVer = ver; // next processed version
pRange->range.maxVer = ver;
pRange->range.minVer = ver;
} else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer;
pChkInfo->nextProcessVer = pRange->range.maxVer + 1;
} else {
pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
pChkInfo->nextProcessVer = 1;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
}
}
}
setInitialVersionInfo(pTask, ver);
pTask->pMeta = pMeta;
pTask->pMsgCb = pMsgCb;