From b1a4ea94726f3b339514f9c37778f07efdf00641 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 24 Jul 2024 19:10:39 +0800 Subject: [PATCH] adj operator result --- source/libs/executor/inc/tfill.h | 6 +- source/libs/executor/src/filloperator.c | 9 +- .../executor/src/streamcountwindowoperator.c | 40 ++-- .../executor/src/streameventwindowoperator.c | 36 +-- source/libs/executor/src/streamfilloperator.c | 23 +- .../executor/src/streamtimewindowoperator.c | 209 +++++++++++------- source/libs/executor/src/sysscanoperator.c | 14 +- source/libs/executor/src/tfill.c | 15 +- 8 files changed, 221 insertions(+), 131 deletions(-) diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index cd003ebd15..b659c12315 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -128,9 +128,9 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn int32_t numOfNotFillCols, const struct SNodeListNode* val); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); -SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, - SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, - int32_t order, const char* id, SExecTaskInfo* pTaskInfo); +void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, + SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, + int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo); void* taosDestroyFillInfo(struct SFillInfo* pFillInfo); int32_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 3d139e1d2e..341b1a01ea 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -317,7 +317,7 @@ _end: } static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SFillOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -355,7 +355,7 @@ static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static SSDataBlock* doFill(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doFillNext(pOperator, &pRes); + int32_t code = doFillNext(pOperator, &pRes); return pRes; } @@ -384,8 +384,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t // STimeWindow w = {0}; // getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC); - pInfo->pFillInfo = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, - pInfo->primaryTsCol, order, id, pTaskInfo); + pInfo->pFillInfo = NULL; + taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, + pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo); if (order == TSDB_ORDER_ASC) { pInfo->win.skey = win.skey; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index f249a0fe0a..41a5a3505c 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -357,12 +357,13 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } destroySBuffInfo(pAggSup, &buffInfo); } -static SSDataBlock* buildCountResult(SOperatorInfo* pOperator) { +static int32_t buildCountResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; SStreamCountAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SOptrBasicInfo* pBInfo = &pInfo->binfo; @@ -370,15 +371,18 @@ static SSDataBlock* buildCountResult(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + (*ppRes) = pInfo->pDelRes; + return code; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pBInfo->pRes; + (*ppRes) = pBInfo->pRes; + return code; } - return NULL; + (*ppRes) = NULL; + return code; } int32_t doStreamCountEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { @@ -425,6 +429,7 @@ int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamCountAggOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (!pInfo) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); @@ -465,7 +470,7 @@ int32_t doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -475,6 +480,7 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) { int32_t lino = 0; void* pBuf = NULL; SStreamCountAggOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (needSaveStreamOperatorInfo(&pInfo->basic)) { int32_t len = doStreamCountEncodeOpState(NULL, 0, pOperator, true); pBuf = taosMemoryCalloc(1, len); @@ -492,7 +498,7 @@ void doStreamCountSaveCheckpoint(SOperatorInfo* pOperator) { _end: taosMemoryFreeClear(pBuf); if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -617,7 +623,9 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe (*ppRes) = NULL; return code; } else if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* opRes = buildCountResult(pOperator); + SSDataBlock* opRes = NULL; + code = buildCountResult(pOperator, &opRes); + QUERY_CHECK_CODE(code, lino, _end); if (opRes) { (*ppRes) = opRes; return code; @@ -715,7 +723,9 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe QUERY_CHECK_CODE(code, lino, _end); } - SSDataBlock* opRes = buildCountResult(pOperator); + SSDataBlock* opRes = NULL; + code = buildCountResult(pOperator, &opRes); + QUERY_CHECK_CODE(code, lino, _end); if (opRes) { (*ppRes) = opRes; return code; @@ -724,7 +734,7 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -733,7 +743,7 @@ _end: static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamCountAggNext(pOperator, &pRes); + int32_t code = doStreamCountAggNext(pOperator, &pRes); return pRes; } @@ -741,6 +751,7 @@ void streamCountReleaseState(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamEventAggOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t resSize = sizeof(TSKEY); char* pBuff = taosMemoryCalloc(1, resSize); QUERY_CHECK_NULL(pBuff, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); @@ -758,7 +769,7 @@ void streamCountReleaseState(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { terrno = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -766,6 +777,7 @@ void streamCountReloadState(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamCountAggOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; int32_t size = 0; void* pBuf = NULL; @@ -787,7 +799,7 @@ void streamCountReloadState(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { terrno = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -896,6 +908,6 @@ _error: taosMemoryFreeClear(pOperator); pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); return NULL; } diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 842c93491d..1181b1d981 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -285,7 +285,7 @@ static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pC _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -444,7 +444,7 @@ _end: colDataDestroy(pColEnd); taosMemoryFree(pColEnd); if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -490,6 +490,7 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamEventAggOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (!pInfo) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); @@ -532,7 +533,7 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -555,7 +556,8 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) { } } -static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) { +static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; SStreamEventAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -563,15 +565,18 @@ static SSDataBlock* buildEventResult(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + (*ppRes) = pInfo->pDelRes; + return code; } doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pBInfo->pRes; + (*ppRes) = pBInfo->pRes; + return code; } - return NULL; + (*ppRes) = NULL; + return code; } static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -588,7 +593,9 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; qDebug("===stream=== stream event agg"); if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* resBlock = buildEventResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildEventResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -700,7 +707,9 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); - SSDataBlock* resBlock = buildEventResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildEventResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -709,7 +718,7 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -718,7 +727,7 @@ _end: static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamEventAggNext(pOperator, &pRes); + int32_t code = doStreamEventAggNext(pOperator, &pRes); return pRes; } @@ -747,6 +756,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { int32_t lino = 0; SStreamEventAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; resetWinRange(&pAggSup->winRange); SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; @@ -830,7 +840,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -951,6 +961,6 @@ _error: destroyStreamEventOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); return NULL; } diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 6b21e481c7..335bfd286c 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -742,7 +742,7 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -751,6 +751,7 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint int32_t lino = 0; SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI; void* pState = pOp->pTaskInfo->streamInfo.pState; + SExecTaskInfo* pTaskInfo = pOp->pTaskInfo; SSDataBlock* pBlock = delRes; SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -793,7 +794,7 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -804,6 +805,7 @@ static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY int32_t lino = 0; SStreamFillOperatorInfo* pInfo = pOperator->info; SStreamFillSupporter* pFillSup = pInfo->pFillSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (hasPrevWindow(pFillSup)) { TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval); code = buildDeleteRange(pOperator, start, endTs, groupId, delRes); @@ -819,7 +821,7 @@ static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -829,6 +831,7 @@ static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, T int32_t lino = 0; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStreamFillOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup); setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo); SWinKey key = {.ts = startTs, .groupId = groupId}; @@ -851,7 +854,7 @@ static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, T _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -910,6 +913,7 @@ static int32_t doDeleteFillResult(SOperatorInfo* pOperator) { SStreamFillOperatorInfo* pInfo = pOperator->info; SStreamFillInfo* pFillInfo = pInfo->pFillInfo; SSDataBlock* pBlock = pInfo->pSrcDelBlock; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* tsStarts = (TSKEY*)pStartCol->pData; @@ -966,7 +970,7 @@ static int32_t doDeleteFillResult(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -984,6 +988,7 @@ static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBl int32_t lino = 0; SStreamFillOperatorInfo* pInfo = pOperator->info; SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pDstBlock); code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows); @@ -1007,7 +1012,7 @@ static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBl _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -1140,7 +1145,7 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } setOperatorCompleted(pOperator); resetStreamFillInfo(pInfo); @@ -1150,7 +1155,7 @@ _end: static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamFillNext(pOperator, &pRes); + int32_t code = doStreamFillNext(pOperator, &pRes); return pRes; } @@ -1423,7 +1428,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi _error: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } destroyStreamFillOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index bf4b16fe8b..5aa7cac45d 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -236,6 +236,7 @@ static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, S int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -301,7 +302,7 @@ static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, S } _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -335,6 +336,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp void* pIte = NULL; int32_t iter = 0; SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t delSize = taosArrayGetSize(pDelWins); while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { void* key = tSimpleHashGetKey(pIte, NULL); @@ -376,7 +378,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -517,9 +519,10 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo } int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; + SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { return initIntervalDownStream(downstream->pDownstream[0], type, pInfo); @@ -542,7 +545,7 @@ int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStream _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -584,7 +587,7 @@ int32_t compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, i _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -946,7 +949,7 @@ void buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBl _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -1149,7 +1152,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -1332,6 +1335,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (!pInfo) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); @@ -1401,7 +1405,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -1445,15 +1449,16 @@ _end: return code; } -static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { +static int32_t buildIntervalResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - uint16_t opType = pOperator->operatorType; + int32_t code = TSDB_CODE_SUCCESS; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; // check if query task is closed or not if (isTaskKilled(pTaskInfo)) { - return NULL; + (*ppRes) = NULL; + return code; } if (IS_FINAL_INTERVAL_OP(pOperator)) { @@ -1461,7 +1466,8 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { if (pInfo->pPullDataRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pPullDataRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); - return pInfo->pPullDataRes; + (*ppRes) = pInfo->pPullDataRes; + return code; } } @@ -1469,16 +1475,19 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + (*ppRes) = pInfo->pDelRes; + return code; } doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); - return pInfo->binfo.pRes; + (*ppRes) = pInfo->binfo.pRes; + return code; } - return NULL; + (*ppRes) = NULL; + return code; } int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) { @@ -1520,7 +1529,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc (*ppRes) = NULL; return code; } else if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* resBlock = buildIntervalResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildIntervalResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -1549,7 +1560,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc return code; } else { if (!IS_FINAL_INTERVAL_OP(pOperator)) { - SSDataBlock* resBlock = buildIntervalResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildIntervalResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -1712,7 +1725,9 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); - SSDataBlock* resBlock = buildIntervalResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildIntervalResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -1728,7 +1743,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -1737,7 +1752,7 @@ _end: static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes); + int32_t code = doStreamFinalIntervalAggNext(pOperator, &pRes); return pRes; } @@ -1799,6 +1814,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL && pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { int32_t size = 0; @@ -1820,7 +1836,7 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -2036,8 +2052,9 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { SStreamPartitionOperatorInfo* pScanInfo = downstream->info; pScanInfo->tsColIndex = tsColIndex; @@ -2062,7 +2079,7 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -2328,7 +2345,7 @@ int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -2404,7 +2421,7 @@ int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWi _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -2442,7 +2459,7 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -2574,7 +2591,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -2631,9 +2648,10 @@ inline int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2) { } void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; - SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStorageAPI* pAPI = &pOp->pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOp->pTaskInfo; blockDataCleanup(pBlock); int32_t size = tSimpleHashGetSize(pStDeleted); @@ -2699,7 +2717,7 @@ _end: } if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -2786,7 +2804,7 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -2974,7 +2992,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -3003,11 +3021,12 @@ void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } -static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) { +static int32_t buildSessionResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SOptrBasicInfo* pBInfo = &pInfo->binfo; @@ -3015,15 +3034,18 @@ static SSDataBlock* buildSessionResult(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + (*ppRes) = pInfo->pDelRes; + return code; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pBInfo->pRes; + (*ppRes) = pBInfo->pRes; + return code; } - return NULL; + (*ppRes) = NULL; + return code; } int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { @@ -3143,6 +3165,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (!pInfo) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); @@ -3200,7 +3223,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -3268,7 +3291,9 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp (*ppRes) = NULL; return code; } else if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* opRes = buildSessionResult(pOperator); + SSDataBlock* opRes = NULL; + code = buildSessionResult(pOperator, &opRes); + QUERY_CHECK_CODE(code, lino, _end); if (opRes) { (*ppRes) = opRes; return code; @@ -3419,7 +3444,9 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); - SSDataBlock* opRes = buildSessionResult(pOperator); + SSDataBlock* opRes = NULL; + code = buildSessionResult(pOperator, &opRes); + QUERY_CHECK_CODE(code, lino, _end); if (opRes) { (*ppRes) = opRes; return code; @@ -3428,7 +3455,7 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -3437,7 +3464,7 @@ _end: static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamSessionAggNext(pOperator, &pRes); + int32_t code = doStreamSessionAggNext(pOperator, &pRes); return pRes; } @@ -3506,6 +3533,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { int32_t lino = 0; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; resetWinRange(&pAggSup->winRange); SResultWindowInfo winInfo = {0}; @@ -3542,7 +3570,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -3551,6 +3579,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { int32_t lino = 0; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; resetWinRange(&pAggSup->winRange); int32_t size = 0; @@ -3614,7 +3643,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -3736,7 +3765,7 @@ _error: taosMemoryFreeClear(pOperator); pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); return NULL; } @@ -3792,7 +3821,9 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* } { - SSDataBlock* opRes = buildSessionResult(pOperator); + SSDataBlock* opRes = NULL; + code = buildSessionResult(pOperator, &opRes); + QUERY_CHECK_CODE(code, lino, _end); if (opRes) { (*ppRes) = opRes; return code; @@ -3889,7 +3920,9 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); - SSDataBlock* opRes = buildSessionResult(pOperator); + SSDataBlock* opRes = NULL; + code = buildSessionResult(pOperator, &opRes); + QUERY_CHECK_CODE(code, lino, _end); if (opRes) { (*ppRes) = opRes; return code; @@ -3898,7 +3931,7 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock* _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } clearFunctionContext(&pOperator->exprSupp); @@ -3911,7 +3944,7 @@ _end: static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamSessionSemiAggNext(pOperator, &pRes); + int32_t code = doStreamSessionSemiAggNext(pOperator, &pRes); return pRes; } @@ -3982,7 +4015,7 @@ _error: taosMemoryFreeClear(pOperator); pTaskInfo->code = code; if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return NULL; } @@ -4326,7 +4359,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -4383,6 +4416,7 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (!pInfo) { code = TSDB_CODE_FAILED; QUERY_CHECK_CODE(code, lino, _end); @@ -4440,7 +4474,7 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } return code; } @@ -4463,7 +4497,8 @@ void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { } } -static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) { +static int32_t buildStateResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; SStreamStateAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4471,15 +4506,18 @@ static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + (*ppRes) = pInfo->pDelRes; + return code; } doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pBInfo->pRes; + (*ppRes) = pBInfo->pRes; + return code; } - return NULL; + (*ppRes) = NULL; + return code; } static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -4496,7 +4534,9 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; qDebug("===stream=== stream state agg"); if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* resBlock = buildStateResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildStateResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -4597,7 +4637,9 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); - SSDataBlock* resBlock = buildStateResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildStateResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -4606,7 +4648,7 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -4615,7 +4657,7 @@ _end: static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamStateAggNext(pOperator, &pRes); + int32_t code = doStreamStateAggNext(pOperator, &pRes); return pRes; } @@ -4652,6 +4694,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { int32_t lino = 0; SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; resetWinRange(&pAggSup->winRange); SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; @@ -4728,7 +4771,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -4837,7 +4880,7 @@ _error: destroyStreamStateOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); return NULL; } @@ -4869,7 +4912,9 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p } if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* resBlock = buildIntervalResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildIntervalResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -5000,13 +5045,15 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; - (*ppRes) = buildIntervalResult(pOperator); + code = buildIntervalResult(pOperator, ppRes); + QUERY_CHECK_CODE(code, lino, _end); + return code; _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; @@ -5015,7 +5062,7 @@ _end: static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamIntervalAggNext(pOperator, &pRes); + int32_t code = doStreamIntervalAggNext(pOperator, &pRes); return pRes; } @@ -5247,7 +5294,7 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } } @@ -5314,7 +5361,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* (*ppRes) = NULL; return code; } else if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* resBlock = buildIntervalResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildIntervalResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -5327,7 +5376,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* (*ppRes) = NULL; return code; } else { - SSDataBlock* resBlock = buildIntervalResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildIntervalResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -5490,7 +5541,9 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); - SSDataBlock* resBlock = buildIntervalResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildIntervalResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); if (resBlock != NULL) { (*ppRes) = resBlock; return code; @@ -5511,7 +5564,7 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock* _end: if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); } (*ppRes) = NULL; return code; @@ -5519,7 +5572,7 @@ _end: static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doStreamMidIntervalAggNext(pOperator, &pRes); + int32_t code = doStreamMidIntervalAggNext(pOperator, &pRes); return pRes; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 094e6575d1..b502baa8ec 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1984,7 +1984,11 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); - (void)tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); + int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); + if (tempRes < 0) { + code = terrno; + return NULL; + } // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -2589,7 +2593,11 @@ static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); QUERY_CHECK_NULL(p, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY); - (void)tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); + int32_t tempRes = tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); + if (tempRes < 0) { + code = terrno; + QUERY_CHECK_CODE(code, lino, _end); + } varDataSetLen(p, len); code = colDataSetVal(pColInfo, 0, p, false); @@ -2619,7 +2627,7 @@ _end: static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doBlockInfoScanNext(pOperator, &pRes); + int32_t code = doBlockInfoScanNext(pOperator, &pRes); return pRes; } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index bfb9c5a5c1..a9bc6e8c0f 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -47,9 +47,9 @@ static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, } SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx); - int32_t code = doSetVal(pDstColInfo, rowIndex, pKey); + int32_t code = doSetVal(pDstColInfo, rowIndex, pKey); if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); T_LONG_JMP(pFillInfo->pTaskInfo->env, code); } } @@ -511,13 +511,14 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { return pFillInfo->numOfRows - pFillInfo->index; } -struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, - SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, - int32_t primaryTsSlotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo) { +void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity, + SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t primaryTsSlotId, + int32_t order, const char* id, SExecTaskInfo* pTaskInfo, SFillInfo** ppFillInfo) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (fillType == TSDB_FILL_NONE) { - return NULL; + (*ppFillInfo) = NULL; + return; } SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo)); @@ -562,7 +563,7 @@ _end: terrno = code; T_LONG_JMP(pTaskInfo->env, code); } - return pFillInfo; + (*ppFillInfo) = pFillInfo; } void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {