diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index ec095a481f..c6266c1612 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -939,7 +939,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pAr void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey); int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd); -void getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated); +int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated); int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed); int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar); int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2); @@ -991,7 +991,7 @@ void doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey); int32_t saveDeleteInfo(SArray* pWins, SSessionKey key); void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SArray* pWins); int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted); -void copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest); +int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest); bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo); bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index a1702885c8..68ff0057a5 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -140,21 +140,21 @@ static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionK getSessionHashKey(pKey, &key); int32_t code = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); if (code != TSDB_CODE_SUCCESS) { - qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } code = tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); if (code != TSDB_CODE_SUCCESS) { - qWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qInfo("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } } static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated, - SSHashObj* pStDeleted, bool* pRebuild) { - SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; + SSHashObj* pStDeleted, bool* pRebuild, int32_t* pWinRows) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; int32_t num = 0; for (int32_t i = start; i < rows; i++) { if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) { @@ -173,9 +173,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI needDelState = true; if (pStDeleted && pWinInfo->winInfo.isOutput) { code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } pWinInfo->winInfo.sessionWin.win.skey = pTs[start]; @@ -194,7 +192,13 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI } } - return maxNum; + (*pWinRows) = maxNum; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, EStreamType mode, SSessionKey* pDelRange) { @@ -292,8 +296,9 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl setSessionWinOutputInfo(pStUpdated, &curWin.winInfo); slidingRows = *curWin.pWindowCount; if (!buffInfo.rebuildWindow) { - winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, - pStDeleted, &buffInfo.rebuildWindow); + code = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated, + pStDeleted, &buffInfo.rebuildWindow, &winRows); + TSDB_CHECK_CODE(code, lino, _end); } if (buffInfo.rebuildWindow) { SSessionKey range = {0}; @@ -655,7 +660,8 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; - getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); + code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); + TSDB_CHECK_CODE(code, lino, _end); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; @@ -696,7 +702,8 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { TSDB_CHECK_CODE(code, lino, _end); if (pInfo->destHasPrimaryKey && IS_NORMAL_COUNT_OP(pOperator)) { - copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted); + code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted); + TSDB_CHECK_CODE(code, lino, _end); } SSDataBlock* opRes = buildCountResult(pOperator); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index e5f76d4544..2e4ea29437 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -185,12 +185,14 @@ _error: int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey, TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, - SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) { + SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild, + int32_t* pWinRow) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; *pRebuild = false; if (!pWinInfo->pWinFlag->startFlag && !(starts[start])) { - return 1; + (*pWinRow) = 1; + goto _end; } TSKEY maxTs = INT64_MAX; @@ -204,15 +206,14 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW for (int32_t i = start; i < rows; ++i) { if (pTsData[i] >= maxTs) { - return i - start; + (*pWinRow) = i - start; + goto _end; } if (pWin->skey > pTsData[i]) { if (pStDeleted && pWinInfo->winInfo.isOutput) { code = saveDeleteRes(pStDeleted, pWinInfo->winInfo.sessionWin); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->winInfo.sessionWin); pWin->skey = pTsData[i]; @@ -229,7 +230,8 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW } else { *pRebuild = true; pWinInfo->pWinFlag->endFlag |= ends[i]; - return i + 1 - start; + (*pWinRow) = i + 1 - start; + goto _end; } memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); @@ -238,10 +240,17 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW if (pWinInfo->pWinFlag->endFlag && pWin->skey <= pTsData[i] && pTsData[i] < pWin->ekey) { *pRebuild = true; } - return i + 1 - start; + (*pWinRow) = i + 1 - start; + goto _end; } } - return rows - start; + (*pWinRow) = rows - start; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pCurWin, SSHashObj* pStUpdated, @@ -362,9 +371,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); bool rebuild = false; - winRows = - updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, - rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild); + code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, + rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild, &winRows); + TSDB_CHECK_CODE(code, lino, _end); + ASSERT(winRows >= 1); if (rebuild) { uint64_t uid = 0; @@ -616,7 +626,8 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; - getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); + code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); + TSDB_CHECK_CODE(code, lino, _end); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; @@ -668,7 +679,8 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { taosArrayDestroy(pHisWins); } if (pInfo->destHasPrimaryKey && IS_NORMAL_EVENT_OP(pOperator)) { - copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted); + code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted); + TSDB_CHECK_CODE(code, lino, _end); } initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 518f7a33fb..5969bd3bf4 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -150,22 +150,6 @@ void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, void* pState, SStora resetFillWindow(&pFillSup->nextNext); } -void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) { - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - - void* pState = pOperator->pTaskInfo->streamInfo.pState; - resetPrevAndNextWindow(pFillSup, pState, pAPI); - - SWinKey key = {.ts = ts, .groupId = groupId}; - int32_t curVLen = 0; - - int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } - pFillSup->cur.key = key.ts; -} - void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) { SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; void* pState = pOperator->pTaskInfo->streamInfo.pState; @@ -477,25 +461,40 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS ASSERT(pFillInfo->pos != FILL_POS_INVALID); } -static bool checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId) { +static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SWinKey key = {.groupId = groupId, .ts = ts}; if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) { - return false; + (*pRes) = false; } - int32_t code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0); + code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0); + TSDB_CHECK_CODE(code, lino, _end); + (*pRes) = true; + +_end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - return true; + return code; } -static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock) { +static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock, + bool* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pBlock->info.rows >= pBlock->info.capacity) { - return false; + (*pRes) = false; + goto _end; } uint64_t groupId = pBlock->info.id.groupId; - if (pFillSup->hasDelete && !checkResult(pFillSup, ts, groupId)) { - return true; + bool ckRes = true; + code = checkResult(pFillSup, ts, groupId, &ckRes); + TSDB_CHECK_CODE(code, lino, _end); + + if (pFillSup->hasDelete && !ckRes) { + (*pRes) = true; + goto _end; } for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; @@ -509,14 +508,18 @@ static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows); if (!filled) { SResultCellData* pCell = getResultCell(pResRow, slotId); - int32_t code = setRowCell(pColData, pBlock->info.rows, pCell); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = setRowCell(pColData, pBlock->info.rows, pCell); + TSDB_CHECK_CODE(code, lino, _end); } } pBlock->info.rows++; - return true; + (*pRes) = true; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static bool hasRemainCalc(SStreamFillInfo* pFillInfo) { @@ -527,26 +530,37 @@ static bool hasRemainCalc(SStreamFillInfo* pFillInfo) { } static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; if (inWinRange(&pFillSup->winRange, &st)) { - bool res = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock); - if (!res) { - int32_t code = TSDB_CODE_FAILED; - qError("%s failed at line %d since block is full", __func__, __LINE__); - } + bool res = true; + code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res); + TSDB_CHECK_CODE(code, lino, _end); } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillSup->interval.precision); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { uint64_t groupId = pBlock->info.id.groupId; SWinKey key = {.groupId = groupId, .ts = pFillInfo->current}; STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; - if ((pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId)) || + bool ckRes = true; + code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes); + TSDB_CHECK_CODE(code, lino, _end); + + if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) { pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillSup->interval.precision); @@ -570,10 +584,8 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* if (pFillCol->notFillCol) { bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index); if (!filled) { - int32_t code = setRowCell(pColData, index, pCell); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = setRowCell(pColData, index, pCell); + TSDB_CHECK_CODE(code, lino, _end); } } else { if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { @@ -590,10 +602,8 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* cur.key = pFillInfo->current; cur.val = taosMemoryCalloc(1, pCell->bytes); taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type); - int32_t code = colDataSetVal(pColData, index, (const char*)cur.val, false); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = colDataSetVal(pColData, index, (const char*)cur.val, false); + TSDB_CHECK_CODE(code, lino, _end); destroySPoint(&cur); } } @@ -601,6 +611,11 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillSup->interval.precision); pBlock->info.rows++; } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) { @@ -613,17 +628,19 @@ static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SRes } static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + bool res = false; if (pFillInfo->needFill == false) { - bool res = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes); - if (!res) { - int32_t code = TSDB_CODE_FAILED; - qError("%s failed at line %d since block is full", __func__, __LINE__); - } + code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res); + TSDB_CHECK_CODE(code, lino, _end); return; } if (pFillInfo->pos == FILL_POS_START) { - if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) { + code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res); + TSDB_CHECK_CODE(code, lino, _end); + if (res) { pFillInfo->pos = FILL_POS_INVALID; } } @@ -633,7 +650,9 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* doStreamFillLinear(pFillSup, pFillInfo, pRes); if (pFillInfo->pos == FILL_POS_MID) { - if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) { + code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res); + TSDB_CHECK_CODE(code, lino, _end); + if (res) { pFillInfo->pos = FILL_POS_INVALID; } } @@ -648,10 +667,17 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* } } if (pFillInfo->pos == FILL_POS_END) { - if (buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes)) { + code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res); + TSDB_CHECK_CODE(code, lino, _end); + if (res) { pFillInfo->pos = FILL_POS_INVALID; } } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol, diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 93b8e0af8b..da68c0e2b5 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -514,23 +514,22 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo pInfo->pUpdateInfo = pScanInfo->pUpdateInfo; } -void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { +int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initIntervalDownStream(downstream->pDownstream[0], type, pInfo); - return; + return initIntervalDownStream(downstream->pDownstream[0], type, pInfo); } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->pUpdateInfo) { - int32_t code = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, - pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, + pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo); + TSDB_CHECK_CODE(code, lino, _end); } pScanInfo->interval = pInfo->interval; @@ -538,6 +537,12 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->pState = pInfo->pState; pInfo->pUpdateInfo = pScanInfo->pUpdateInfo; pInfo->basic.primaryPkIndex = pScanInfo->primaryKeyIndex; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput, @@ -1795,12 +1800,14 @@ _end: SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - int32_t code = 0; if (pInfo == NULL || pOperator == NULL) { - goto _error; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } pOperator->exprSupp.hasWindowOrGroup = true; @@ -1830,9 +1837,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); } int32_t numOfCols = 0; @@ -1850,9 +1855,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -1912,12 +1915,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { - initIntervalDownStream(downstream, pPhyNode->type, pInfo); + code = initIntervalDownStream(downstream, pPhyNode->type, pInfo); + TSDB_CHECK_CODE(code, lino, _error); } code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); // for stream void* buff = NULL; @@ -2162,13 +2164,19 @@ void getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResu } int32_t saveDeleteInfo(SArray* pWins, SSessionKey key) { - // key.win.ekey = key.win.skey; - void* res = taosArrayPush(pWins, &key); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + void* res = taosArrayPush(pWins, &key); if (!res) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); } - return TSDB_CODE_SUCCESS; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key) { @@ -2240,17 +2248,19 @@ void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SAr int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId, int32_t rows, int32_t start, int64_t gap, - SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted) { + SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, + int32_t* pWinRos) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; for (int32_t i = start; i < rows; ++i) { if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) { - return i - start; + (*pWinRos) = i - start; + goto _end; } if (pWinInfo->sessionWin.win.skey > pStartTs[i]) { if (pStDeleted && pWinInfo->isOutput) { - int32_t code = saveDeleteRes(pStDeleted, pWinInfo->sessionWin); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = saveDeleteRes(pStDeleted, pWinInfo->sessionWin); + TSDB_CHECK_CODE(code, lino, _end); } removeSessionResult(pAggSup, pStUpdated, pResultRows, &pWinInfo->sessionWin); pWinInfo->sessionWin.win.skey = pStartTs[i]; @@ -2261,7 +2271,13 @@ int32_t updateSessionWindowInfo(SStreamAggSupporter* pAggSup, SResultWindowInfo* } memcpy(pWinInfo->pStatePos->pKey, &pWinInfo->sessionWin, sizeof(SSessionKey)); } - return rows - start; + (*pWinRos) = rows - start; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, @@ -2357,10 +2373,8 @@ int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWi if (pNextWin->isOutput && pStDeleted) { qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, pNextWin->sessionWin.groupId); - int32_t code = saveDeleteRes(pStDeleted, pNextWin->sessionWin); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = saveDeleteRes(pStDeleted, pNextWin->sessionWin); + TSDB_CHECK_CODE(code, lino, _end); } removeSessionResult(pAggSup, pStUpdated, pAggSup->pResultRows, &pNextWin->sessionWin); doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); @@ -2499,8 +2513,9 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData continue; } setSessionWinOutputInfo(pStUpdated, &winInfo); - winRows = updateSessionWindowInfo(pAggSup, &winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap, - pAggSup->pResultRows, pStUpdated, pStDeleted); + code = updateSessionWindowInfo(pAggSup, &winInfo, startTsCols, endTsCols, groupId, rows, i, pAggSup->gap, + pAggSup->pResultRows, pStUpdated, pStDeleted, &winRows); + TSDB_CHECK_CODE(code, lino, _end); int64_t winDelta = 0; if (addGap) { @@ -2749,7 +2764,7 @@ static int32_t rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } @@ -2797,7 +2812,9 @@ _end: return code; } -void getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { +int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { @@ -2806,12 +2823,15 @@ void getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { continue; } pWinInfo->pStatePos->beUpdated = false; - int32_t code = saveResult(*pWinInfo, pStUpdated); - if (code != TSDB_CODE_SUCCESS) { - pWinInfo->pStatePos->beUpdated = true; - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = saveResult(*pWinInfo, pStUpdated); + TSDB_CHECK_CODE(code, lino, _end); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) { @@ -3186,21 +3206,27 @@ void resetUnCloseSessionWinInfo(SSHashObj* winMap) { } } -void copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest) { +int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (tSimpleHashGetSize(source) == 0) { - return; + goto _end; } void* pIte = NULL; int32_t iter = 0; size_t keyLen = 0; while ((pIte = tSimpleHashIterate(source, pIte, &iter)) != NULL) { SSessionKey* pKey = tSimpleHashGetKey(pIte, &keyLen); - int32_t code = saveDeleteRes(dest, *pKey); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = saveDeleteRes(dest, *pKey); + TSDB_CHECK_CODE(code, lino, _end); } tSimpleHashClear(source); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { @@ -3281,7 +3307,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; - getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); + code = getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated); + TSDB_CHECK_CODE(code, lino, _end); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; @@ -3350,7 +3377,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { TSDB_CHECK_CODE(code, lino, _end); } if (pInfo->destHasPrimaryKey && IS_NORMAL_SESSION_OP(pOperator)) { - copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted); + code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pStDeleted); + TSDB_CHECK_CODE(code, lino, _end); } initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; @@ -3518,22 +3546,16 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { winInfo.sessionWin.groupId); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { code = saveResult(winInfo, pInfo->pStUpdated); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (!isCloseWindow(&winInfo.sessionWin.win, &pInfo->twAggSup)) { code = saveDeleteRes(pInfo->pStDeleted, winInfo.sessionWin); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } SSessionKey key = {0}; getSessionHashKey(&winInfo.sessionWin, &key); code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } } code = saveSessionOutputBuf(pAggSup, &winInfo); @@ -3775,7 +3797,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pInfo->clearState = true; break; } else if (pBlock->info.type == STREAM_GET_ALL) { - getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated); + code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pStUpdated); + TSDB_CHECK_CODE(code, lino, _end); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; @@ -4098,7 +4121,10 @@ _end: int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pWinInfo, SStateWindowInfo* pNextWin, TSKEY* pTs, uint64_t groupId, SColumnInfoData* pKeyCol, int32_t rows, int32_t start, - bool* allEqual, SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted) { + bool* allEqual, SSHashObj* pResultRows, SSHashObj* pSeUpdated, SSHashObj* pSeDeleted, + int32_t* pWinRows) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; *allEqual = true; for (int32_t i = start; i < rows; ++i) { char* pKeyData = colDataGetData(pKeyCol, i); @@ -4107,20 +4133,20 @@ int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pW if (IS_VALID_SESSION_WIN(pNextWin->winInfo)) { // ts belongs to the next window if (pTs[i] >= pNextWin->winInfo.sessionWin.win.skey) { - return i - start; + (*pWinRows) = i - start; + goto _end; } } } else { - return i - start; + (*pWinRows) = i - start; + goto _end; } } if (pWinInfo->winInfo.sessionWin.win.skey > pTs[i]) { if (pSeDeleted && pWinInfo->winInfo.isOutput) { - int32_t code = saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + code = saveDeleteRes(pSeDeleted, pWinInfo->winInfo.sessionWin); + TSDB_CHECK_CODE(code, lino, _end); } removeSessionResult(pAggSup, pSeUpdated, pResultRows, &pWinInfo->winInfo.sessionWin); pWinInfo->winInfo.sessionWin.win.skey = pTs[i]; @@ -4131,7 +4157,13 @@ int32_t updateStateWindowInfo(SStreamAggSupporter* pAggSup, SStateWindowInfo* pW *allEqual = false; } } - return rows - start; + (*pWinRows) = rows - start; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated, @@ -4189,8 +4221,10 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); - winRows = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, - pAggSup->pResultRows, pSeUpdated, pStDeleted); + code = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, + pAggSup->pResultRows, pSeUpdated, pStDeleted, &winRows); + TSDB_CHECK_CODE(code, lino, _end); + if (!allEqual) { uint64_t uid = 0; appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, @@ -4442,7 +4476,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; - getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); + code = getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); + TSDB_CHECK_CODE(code, lino, _end); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; @@ -4483,7 +4518,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { TSDB_CHECK_CODE(code, lino, _end); } if (pInfo->destHasPrimaryKey && IS_NORMAL_STATE_OP(pOperator)) { - copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted); + code = copyDeleteSessionKey(pInfo->pPkDeleted, pInfo->pSeDeleted); + TSDB_CHECK_CODE(code, lino, _end); } initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); @@ -4582,23 +4618,17 @@ void streamStateReloadState(SOperatorInfo* pOperator) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { code = saveResult(curInfo.winInfo, pInfo->pSeUpdated); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) { code = saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } SSessionKey key = {0}; getSessionHashKey(&curInfo.winInfo.sessionWin, &key); code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } } else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { releaseOutputBuf(pAggSup->pState, nextInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); @@ -4896,16 +4926,17 @@ _end: SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - goto _error; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - - int32_t code = TSDB_CODE_SUCCESS; - int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); pInfo->interval = (SInterval){ @@ -4948,17 +4979,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); } pInfo->invertible = false; @@ -5010,11 +5037,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys taosMemoryFree(buff); } - initIntervalDownStream(downstream, pPhyNode->type, pInfo); + code = initIntervalDownStream(downstream, pPhyNode->type, pInfo); + TSDB_CHECK_CODE(code, lino, _error); + code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); return pOperator; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 2c30eff832..dd63dc6115 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -936,9 +936,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C } code = tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - } + TSDB_CHECK_CODE(code, lino, _end); } TSKEY startTs = pWinKey->win.skey; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ce4b1800fb..56a74e149d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -239,7 +239,7 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal // todo refactor void streamStateFillDel(SStreamState* pState, const SWinKey* key) { int32_t code = streamStateFillDel_rocksdb(pState, key); - qError("%s at line %d res %d", __func__, __LINE__, code); + qTrace("%s at line %d res %d", __func__, __LINE__, code); } void streamStateClear(SStreamState* pState) { streamFileStateClear(pState->pFileState); } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 68e48bb378..e395b3066b 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -279,7 +279,8 @@ bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) { return false; } -int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol, TSKEY* pMaxResTs) { +int32_t updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol, + TSKEY* pMaxResTs) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (pBlock == NULL || pBlock->info.rows == 0) { @@ -652,17 +653,22 @@ _error: } bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { - TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); - bool res = true; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); + bool res = true; if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) { res = false; } else { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); - int32_t code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); - if (code != TSDB_CODE_SUCCESS) { - res = false; - uError("%s failed at line %d since %d", __func__, __LINE__, code); - } + code = taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); + TSDB_CHECK_CODE(code, lino, _error); } return res; + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return false; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 5e3e3a4654..2f656d9b41 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -500,16 +500,24 @@ _error: } SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SRowBuffPos* newPos = getNewRowPos(pFileState); if (!newPos) { - qError("%s failed at line %d since newPos is null", __func__, __LINE__); - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } newPos->beUsed = true; newPos->beFlushed = false; newPos->needFree = false; newPos->beUpdated = true; return newPos; + +_error: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return NULL; } int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,