adj stream operator result

This commit is contained in:
54liuyao 2024-07-18 11:44:15 +08:00
parent 99f4484328
commit 455c9d32c4
1 changed files with 10 additions and 7 deletions

View File

@ -911,10 +911,10 @@ void buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBl
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1);
int32_t tmpRes = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(tmpRes)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(tmpRes));
TSDB_CHECK_CODE(code, lino, _end);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
@ -2299,7 +2299,7 @@ void doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
getSessionHashKey(pKey, &hashKey);
int32_t code = tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
if (code != TSDB_CODE_SUCCESS) {
qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
qTrace("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
@ -2905,8 +2905,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) {
code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
TSDB_CHECK_CODE(code, lino, _end);
int32_t tmpRes = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(tmpRes)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(tmpRes));
TSDB_CHECK_CODE(code, lino, _end);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {