diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index cbda8dc472..dc884a0581 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -44,9 +44,10 @@ typedef struct SMinmaxResInfo { bool nullTupleSaved; int16_t type; } SMinmaxResInfo; -int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc); -STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock); +int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems); + +int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e28e10de3c..8fde27e046 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -754,24 +754,33 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } int32_t minFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElems = doMinMaxHelper(pCtx, 1); + int32_t numOfElems = 0; + int32_t code = doMinMaxHelper(pCtx, 1, &numOfElems); + if (code != TSDB_CODE_SUCCESS) { + return code; + } SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); return TSDB_CODE_SUCCESS; } int32_t maxFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElems = doMinMaxHelper(pCtx, 0); + int32_t numOfElems = 0; + int32_t code = doMinMaxHelper(pCtx, 0, &numOfElems); + if (code != TSDB_CODE_SUCCESS) { + return code; + } SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); return TSDB_CODE_SUCCESS; } -static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); -static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, +static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); +static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex); int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + int32_t code = TSDB_CODE_SUCCESS; + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; @@ -788,17 +797,17 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } if (pEntryInfo->numOfRes > 0) { - setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); + code = setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); } else { - setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow); + code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow); } - return pEntryInfo->numOfRes; + return code; } -void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { +int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { if (pCtx->subsidiaries.num <= 0) { - return; + return TSDB_CODE_SUCCESS; } for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { @@ -808,17 +817,23 @@ void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); colDataAppendNULL(pDstCol, rowIndex); } + + return TSDB_CODE_SUCCESS; } -void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) { +int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) { if (pCtx->subsidiaries.num <= 0) { - return; + return TSDB_CODE_SUCCESS; } if ((pCtx->saveHandle.pBuf != NULL && pTuplePos->pageId != -1) || (pCtx->saveHandle.pState && pTuplePos->streamTupleKey.ts > 0)) { int32_t numOfCols = pCtx->subsidiaries.num; const char* p = loadTupleData(pCtx, pTuplePos); + if (p == NULL) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; + } bool* nullList = (bool*)p; char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); @@ -841,6 +856,8 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple tdbFree((void*)p); } } + + return TSDB_CODE_SUCCESS; } void releaseSource(STuplePos* pPos) { @@ -1567,7 +1584,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { // all data are null, set it completed if (pInfo->numOfElems == 0) { pResInfo->complete = true; - return 0; + return TSDB_CODE_SUCCESS; } else { pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval); } @@ -1630,7 +1647,11 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pCol, i); numOfElems += 1; - tMemBucketPut(pInfo->pMemBucket, data, 1); + int32_t code = tMemBucketPut(pInfo->pMemBucket, data, 1); + if (code != TSDB_CODE_SUCCESS) { + tMemBucketDestroy(pInfo->pMemBucket); + return code; + } } SET_VAL(pResInfo, numOfElems, 1); @@ -1653,6 +1674,11 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } tMemBucketDestroy(pMemBucket); + + if (ppInfo->result < 0) { + return TSDB_CODE_NO_AVAIL_DISK; + } + return functionFinalize(pCtx, pBlock); } @@ -2013,20 +2039,24 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { } } -static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, +static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) { + int32_t code = TSDB_CODE_SUCCESS; + if (pCtx->subsidiaries.num <= 0) { - return; + return TSDB_CODE_SUCCESS; } if (!pInfo->hasResult) { - pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock); + code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); } else { - updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); + code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); } + + return code; } -static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) { +static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -2036,9 +2066,13 @@ static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t cur memcpy(pInfo->buf, pData, pInfo->bytes); pInfo->ts = currentTs; - firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->hasResult = true; + return TSDB_CODE_SUCCESS; } // This ordinary first function does not care if current scan is ascending order or descending order scan @@ -2061,7 +2095,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { // All null data column, return directly. if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { // save selectivity value for column consisted of all null values - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } return TSDB_CODE_SUCCESS; } @@ -2134,7 +2171,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = pts[i]; if (pResInfo->numOfRes == 0 || pInfo->ts > cts) { - doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); + int32_t code = doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2142,7 +2182,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { if (numOfElems == 0) { // save selectivity value for column consisted of all null values - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; @@ -2168,7 +2211,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { // All null data column, return directly. if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) { // save selectivity value for column consisted of all null values - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } return TSDB_CODE_SUCCESS; } @@ -2258,7 +2304,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { char* data = colDataGetData(pInputCol, chosen); - doSaveCurrentVal(pCtx, i, cts, type, data); + int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2266,7 +2315,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { char* data = colDataGetData(pInputCol, i); - doSaveCurrentVal(pCtx, i, pts[i], type, data); + int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2280,7 +2332,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { char* data = colDataGetData(pInputCol, i); - doSaveCurrentVal(pCtx, i, pts[i], type, data); + int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2291,7 +2346,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { // save selectivity value for column consisted of all null values if (numOfElems == 0) { - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } // SET_VAL(pResInfo, numOfElems, 1); @@ -2319,12 +2377,17 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p return TSDB_CODE_SUCCESS; } -static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, +static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, int32_t rowIndex) { if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) { - firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pOutput->hasResult = true; } + + return TSDB_CODE_SUCCESS; } static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) { @@ -2342,7 +2405,10 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer for (int32_t i = start; i < start + pInput->numOfRows; ++i) { char* data = colDataGetData(pCol, i); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); - firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); + int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (!numOfElems) { numOfElems = pInputInfo->hasResult ? 1 : 0; } @@ -2357,6 +2423,7 @@ int32_t firstFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMerge int32_t lastFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, false); } int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); @@ -2367,12 +2434,14 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes); // handle selectivity - setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); + code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); - return pResInfo->numOfRes; + return code; } int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); @@ -2388,10 +2457,10 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); colDataAppend(pCol, pBlock->info.rows, res, false); - setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); + code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); taosMemoryFree(res); - return 1; + return code; } int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { @@ -2411,7 +2480,7 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return TSDB_CODE_SUCCESS; } -static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) { +static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; @@ -2428,9 +2497,14 @@ static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, i } pInfo->ts = cts; - firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->hasResult = true; + + return TSDB_CODE_SUCCESS; } int32_t lastRowFunction(SqlFunctionCtx* pCtx) { @@ -2492,7 +2566,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { numOfElems++; if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - doSaveLastrow(pCtx, data, i, cts, pInfo); + int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2769,8 +2846,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { return pRes; } -static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); +static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, + uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery); @@ -2792,11 +2869,17 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true); + int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { - pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pRes->nullTupleSaved = true; } return TSDB_CODE_SUCCESS; @@ -2820,11 +2903,17 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); + int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { - pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pRes->nullTupleSaved = true; } @@ -2864,7 +2953,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par return (val1->v.d > val2->v.d) ? 1 : -1; } -void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, +int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { STopBotRes* pRes = getTopBotOutputInfo(pCtx); @@ -2881,7 +2970,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple if (pCtx->subsidiaries.num > 0) { - pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock); + int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } #ifdef BUF_PAGE_DEBUG qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, @@ -2907,7 +2999,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple by over writing the old data if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + int32_t code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } #ifdef BUF_PAGE_DEBUG qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset); @@ -2916,6 +3011,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData topBotResComparFn, NULL, !isTopQuery); } } + + return TSDB_CODE_SUCCESS; } /* @@ -2955,20 +3052,33 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid return buf; } -static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key) { +static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key, + STuplePos* pPos) { STuplePos p = {0}; if (pHandle->pBuf != NULL) { SFilePage* pPage = NULL; if (pHandle->currentPage == -1) { pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); + if (pPage == NULL) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; + } pPage->num = sizeof(SFilePage); } else { pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); + if (pPage == NULL) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; + } if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { // current page is all used, let's prepare a new buffer page releaseBufPage(pHandle->pBuf, pPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); + if (pPage == NULL) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; + } pPage->num = sizeof(SFilePage); } } @@ -2986,10 +3096,11 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf } } - return p; + *pPos = p; + return TSDB_CODE_SUCCESS; } -STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) { +int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { prepareBuf(pCtx); STupleKey key; @@ -3005,12 +3116,16 @@ STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); - return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key); + return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key, pPos); } static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); + if (pPage == NULL) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; + } memcpy(pPage->data + pPos->offset, pBuf, length); setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); @@ -3025,13 +3140,15 @@ int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc prepareBuf(pCtx); char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); - doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos); - return TSDB_CODE_SUCCESS; + return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos); } static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) { if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); + if (pPage == NULL) { + return NULL; + } char* p = pPage->data + pPos->offset; releaseBufPage(pHandle->pBuf, pPage); return p; @@ -3048,6 +3165,8 @@ const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) { } int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); @@ -3060,8 +3179,8 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t currentRow = pBlock->info.rows; if (pEntryInfo->numOfRes <= 0) { colDataAppendNULL(pCol, currentRow); - setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow); - return pEntryInfo->numOfRes; + code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow); + return code; } for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { STopBotResItem* pItem = &pRes->pItems[i]; @@ -3070,11 +3189,11 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId, pItem->tuplePos.offset); #endif - setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + code = setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); currentRow += 1; } - return pEntryInfo->numOfRes; + return code; } void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { @@ -4452,12 +4571,15 @@ static void sampleAssignResult(SSampleInfo* pInfo, char* data, int32_t index) { assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType); } -static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) { +static int32_t doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) { pInfo->totalPoints++; if (pInfo->numSampled < pInfo->samples) { sampleAssignResult(pInfo, data, pInfo->numSampled); if (pCtx->subsidiaries.num > 0) { - pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } pInfo->numSampled++; } else { @@ -4465,10 +4587,15 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da if (j < pInfo->samples) { sampleAssignResult(pInfo, data, j); if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); + int32_t code = updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } + + return TSDB_CODE_SUCCESS; } int32_t sampleFunction(SqlFunctionCtx* pCtx) { @@ -4484,11 +4611,17 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { } char* data = colDataGetData(pInputCol, i); - doReservoirSample(pCtx, pInfo, data, i); + int32_t code = doReservoirSample(pCtx, pInfo, data, i); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { - pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->nullTupleSaved = true; } @@ -4497,6 +4630,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { } int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SSampleInfo* pInfo = getSampleOutputInfo(pCtx); @@ -4508,15 +4642,15 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t currentRow = pBlock->info.rows; if (pInfo->numSampled == 0) { colDataAppendNULL(pCol, currentRow); - setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); - return pInfo->numSampled; + code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); + return code; } for (int32_t i = 0; i < pInfo->numSampled; ++i) { colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); - setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i); + code = setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i); } - return pInfo->numSampled; + return code; } bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -4782,7 +4916,7 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { return true; } -static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) { +static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) { int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); if (pHashItem == NULL) { @@ -4792,7 +4926,10 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, pItem->count += 1; if (pCtx->subsidiaries.num > 0) { - pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pItem->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); @@ -4800,9 +4937,14 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, } else { (*pHashItem)->count += 1; if (pCtx->subsidiaries.num > 0) { - updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos)); + int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos)); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } + + return TSDB_CODE_SUCCESS; } int32_t modeFunction(SqlFunctionCtx* pCtx) { @@ -4823,7 +4965,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pInputCol, i); - doModeAdd(pInfo, i, pCtx, data); + int32_t code = doModeAdd(pInfo, i, pCtx, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) { taosHashCleanup(pInfo->pHash); @@ -4832,7 +4977,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { - pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->nullTupleSaved = true; } @@ -4842,6 +4990,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { } int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; @@ -4861,15 +5010,15 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (maxCount != 0) { SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); colDataAppend(pCol, currentRow, pResItem->data, false); - setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow); + code = setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow); } else { colDataAppendNULL(pCol, currentRow); - setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); + code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); } taosHashCleanup(pInfo->pHash); - return pResInfo->numOfRes; + return code; } bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -5713,7 +5862,10 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - doSaveLastrow(pCtx, data, i, cts, pInfo); + int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index 813e66b80d..847c738655 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -700,7 +700,7 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct } } -int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { +int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) { int32_t numOfElems = 0; SInputColumnInfoData* pInput = &pCtx->input; @@ -745,7 +745,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } else { @@ -759,7 +762,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -773,7 +779,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -787,7 +796,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -803,14 +815,17 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } } pBuf->assign = true; - return numOfElems; + return TSDB_CODE_SUCCESS; } int32_t start = pInput->startRowIndex; @@ -824,7 +839,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes); if (pCtx->subsidiaries.num > 0) { - pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } pBuf->assign = true; numOfElems = 1; @@ -887,9 +905,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { _over: if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { - pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pBuf->nullTupleSaved = true; } - return numOfElems; + *nElems = numOfElems; + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index f14406fdac..04472c42ec 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -40,6 +40,9 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) int32_t *pageId = taosArrayGet(pIdList, i); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); + if (pg == NULL) { + return NULL; + } memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); @@ -104,6 +107,9 @@ double findOnlyResult(tMemBucket *pMemBucket) { int32_t *pageId = taosArrayGet(list, 0); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); + if (pPage == NULL) { + return -1; + } ASSERT(pPage->num == 1); double v = 0; @@ -380,6 +386,9 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { } pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); + if (pSlot->info.data == NULL) { + return TSDB_CODE_NO_AVAIL_DISK; + } pSlot->info.pageId = pageId; taosArrayPush(pPageIdList, &pageId); } @@ -391,7 +400,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { } pBucket->total += count; - return 0; + return TSDB_CODE_SUCCESS; } //////////////////////////////////////////////////////////////////////////////////////////// @@ -471,6 +480,9 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) if (pSlot->info.size <= pMemBucket->maxCapacity) { // data in buffer and file are merged together to be processed. SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); + if (buffer == NULL) { + return -1; + } int32_t currentIdx = count - num; char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; @@ -505,8 +517,14 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) for (int32_t f = 0; f < list->size; ++f) { int32_t *pageId = taosArrayGet(list, f); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); + if (pg == NULL) { + return -1; + } - tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); + int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); + if (code != TSDB_CODE_SUCCESS) { + return -1; + } setBufPageDirty(pg, true); releaseBufPage(pMemBucket->pBuffer, pg); } @@ -528,7 +546,9 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { // if only one elements exists, return it if (pMemBucket->total == 1) { - return findOnlyResult(pMemBucket); + if (findOnlyResult(pMemBucket) < 0) { + return -1; + } } percent = fabs(percent);