fix(stream): halt the correct task.

This commit is contained in:
Haojun Liao 2024-02-22 15:01:04 +08:00
parent 3c7923a77e
commit 7c8eb21ee5
1 changed files with 13 additions and 11 deletions

View File

@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) {
}
}
static bool countWindowStreamTask(SSubplan* pPlan) {
SPhysiNode* pNode = pPlan->pNode;
return hasCountWindowNode(pNode);
static bool isCountWindowStreamTask(SSubplan* pPlan) {
return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
}
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
}
}
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) {
bool hasCountWindowNode = countWindowStreamTask(pPlan);
bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0);
if (hasCountWindowNode && isRelStreamTask) {
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
if (hasCountWindowNode && (!isFillhistoryTask)) {
SStreamStatus* pStatus = &pTask->status;
mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set",
pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT));
mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
pStatus->taskStatus = TASK_STATUS__HALT;
}
}
@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) {
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
// new stream task
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
haltInitialTaskStatus(pTask, plan);
if (pStream->conf.fillHistory) {
haltInitialTaskStatus(pTask, plan, isFillhistory);
}
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
terrno = code;
return terrno;
}
return TDB_CODE_SUCCESS;
}