From 455c9d32c42a5519e6ae21306dab9ed1d8c38c05 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 18 Jul 2024 11:44:15 +0800 Subject: [PATCH] adj stream operator result --- .../executor/src/streamtimewindowoperator.c | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3feb3156d8..93b8e0af8b 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -911,10 +911,10 @@ void buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBl pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.finalize) { - int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - if (TAOS_FAILED(code1)) { - qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1)); - T_LONG_JMP(pTaskInfo->env, code1); + int32_t tmpRes = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(tmpRes)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(tmpRes)); + TSDB_CHECK_CODE(code, lino, _end); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor @@ -2299,7 +2299,7 @@ void doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) { getSessionHashKey(pKey, &hashKey); int32_t code = tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey)); if (code != TSDB_CODE_SUCCESS) { - qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qTrace("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } } @@ -2905,8 +2905,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.finalize) { - code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - TSDB_CHECK_CODE(code, lino, _end); + int32_t tmpRes = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(tmpRes)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(tmpRes)); + TSDB_CHECK_CODE(code, lino, _end); + } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor } else {