fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-11-29 14:03:54 +08:00
parent 9a9a1828c1
commit dee5017c53
1 changed files with 41 additions and 10 deletions

View File

@ -362,18 +362,32 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh
return code;
}
static void addNewTaskList(SStreamObj* pStream) {
static int32_t addNewTaskList(SStreamObj* pStream) {
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
if (pTaskList == NULL) {
mError("failed init task list, code:%s", tstrerror(terrno));
return terrno;
}
if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
mError("failed to put into array");
mError("failed to put into array, code:%s", tstrerror(terrno));
return terrno;
}
if (pStream->conf.fillHistory) {
pTaskList = taosArrayInit(0, POINTER_BYTES);
if (pTaskList == NULL) {
mError("failed init task list, code:%s", tstrerror(terrno));
return terrno;
}
if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
mError("failed to put into array");
mError("failed to put into array, code:%s", tstrerror(terrno));
return terrno;
}
}
return TSDB_CODE_SUCCESS;
}
// set the history task id
@ -454,10 +468,11 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
void* pIter = NULL;
int32_t code = 0;
SSdb* pSdb = pMnode->pSdb;
addNewTaskList(pStream);
int32_t code = addNewTaskList(pStream);
if (code) {
return code;
}
while (1) {
SVgObj* pVgroup = NULL;
@ -570,8 +585,10 @@ END:
}
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
int32_t code = 0;
addNewTaskList(pStream);
int32_t code = addNewTaskList(pStream);
if (code) {
return code;
}
if (pStream->fixedSinkVgId == 0) {
code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
@ -676,8 +693,13 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
if (pStream->tasks == NULL || pStream->pHTasksList == NULL) {
mError("failed to create stream obj, code:%s", tstrerror(terrno));
return terrno;
}
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
// add extra sink
@ -717,6 +739,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
do {
SArray** list = taosArrayGetLast(pStream->tasks);
float size = (float)taosArrayGetSize(*list);
@ -724,7 +747,10 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (cnt <= 1) break;
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
addNewTaskList(pStream);
code = addNewTaskList(pStream);
if (code) {
return code;
}
for (int j = 0; j < cnt; j++) {
code = addAggTask(pStream, pMnode, plan, pEpset, false);
@ -750,7 +776,12 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
mDebug("doScheduleStream add final agg");
SArray** list = taosArrayGetLast(pStream->tasks);
size_t size = taosArrayGetSize(*list);
addNewTaskList(pStream);
code = addNewTaskList(pStream);
if (code) {
return code;
}
code = addAggTask(pStream, pMnode, plan, pEpset, true);
if (code != TSDB_CODE_SUCCESS) {
TAOS_RETURN(code);