diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 36b1abf48a..f74795a250 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -91,7 +91,6 @@ typedef struct STuplePos { }; STupleKey streamTupleKey; }; - bool isValid; } STuplePos; typedef struct SFirstLastRes { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index cbda8dc472..dc884a0581 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -44,9 +44,10 @@ typedef struct SMinmaxResInfo { bool nullTupleSaved; int16_t type; } SMinmaxResInfo; -int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc); -STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock); +int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems); + +int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 70aa7b4c0b..8f099d7275 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -757,13 +757,21 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } int32_t minFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElems = doMinMaxHelper(pCtx, 1); + int32_t numOfElems = 0; + int32_t code = doMinMaxHelper(pCtx, 1, &numOfElems); + if (code != TSDB_CODE_SUCCESS) { + return code; + } SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); return TSDB_CODE_SUCCESS; } int32_t maxFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElems = doMinMaxHelper(pCtx, 0); + int32_t numOfElems = 0; + int32_t code = doMinMaxHelper(pCtx, 0, &numOfElems); + if (code != TSDB_CODE_SUCCESS) { + return code; + } SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -2021,10 +2029,9 @@ static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowI } if (!pInfo->hasResult) { - pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock); - if (!pInfo->pos.isValid) { - terrno = TSDB_CODE_NO_AVAIL_DISK; - return terrno; + int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); + if (code != TSDB_CODE_SUCCESS) { + return code; } } else { updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); @@ -2804,8 +2811,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { return pRes; } -static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); +static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, + uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery); @@ -2827,11 +2834,17 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true); + int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { - pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pRes->nullTupleSaved = true; } return TSDB_CODE_SUCCESS; @@ -2855,11 +2868,17 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); + int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { - pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pRes->nullTupleSaved = true; } @@ -2899,7 +2918,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par return (val1->v.d > val2->v.d) ? 1 : -1; } -void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, +int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { STopBotRes* pRes = getTopBotOutputInfo(pCtx); @@ -2917,7 +2936,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple if (pCtx->subsidiaries.num > 0) { - pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock); + int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } #ifdef BUF_PAGE_DEBUG qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, @@ -2952,6 +2974,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData topBotResComparFn, NULL, !isTopQuery); } } + + return TSDB_CODE_SUCCESS; } /* @@ -2991,7 +3015,8 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid return buf; } -static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key) { +static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key, + STuplePos* pPos) { STuplePos p = {0}; if (pHandle->pBuf != NULL) { SFilePage* pPage = NULL; @@ -2999,26 +3024,29 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf if (pHandle->currentPage == -1) { pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - return p; + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; } pPage->num = sizeof(SFilePage); } else { pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); if (pPage == NULL) { - return p; + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; } if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { // current page is all used, let's prepare a new buffer page releaseBufPage(pHandle->pBuf, pPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - return p; + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; } pPage->num = sizeof(SFilePage); } } - p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num, .isValid = true}; + p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num}; memcpy(pPage->data + pPage->num, pBuf, length); pPage->num += length; @@ -3032,10 +3060,11 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf p.streamTupleKey = key; } - return p; + *pPos = p; + return TSDB_CODE_SUCCESS; } -STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) { +int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { prepareBuf(pCtx); STupleKey key; @@ -3050,7 +3079,7 @@ STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); - return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key); + return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key, pPos); } static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { @@ -4490,12 +4519,15 @@ static void sampleAssignResult(SSampleInfo* pInfo, char* data, int32_t index) { assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType); } -static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) { +static int32_t doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) { pInfo->totalPoints++; if (pInfo->numSampled < pInfo->samples) { sampleAssignResult(pInfo, data, pInfo->numSampled); if (pCtx->subsidiaries.num > 0) { - pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } pInfo->numSampled++; } else { @@ -4507,6 +4539,8 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da } } } + + return TSDB_CODE_SUCCESS; } int32_t sampleFunction(SqlFunctionCtx* pCtx) { @@ -4522,11 +4556,17 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { } char* data = colDataGetData(pInputCol, i); - doReservoirSample(pCtx, pInfo, data, i); + int32_t code = doReservoirSample(pCtx, pInfo, data, i); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { - pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->nullTupleSaved = true; } @@ -4820,7 +4860,7 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { return true; } -static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) { +static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) { int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); if (pHashItem == NULL) { @@ -4830,7 +4870,10 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, pItem->count += 1; if (pCtx->subsidiaries.num > 0) { - pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pItem->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); @@ -4841,6 +4884,8 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos)); } } + + return TSDB_CODE_SUCCESS; } int32_t modeFunction(SqlFunctionCtx* pCtx) { @@ -4861,7 +4906,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { numOfElems++; char* data = colDataGetData(pInputCol, i); - doModeAdd(pInfo, i, pCtx, data); + int32_t code = doModeAdd(pInfo, i, pCtx, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) { taosHashCleanup(pInfo->pHash); @@ -4870,7 +4918,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { - pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->nullTupleSaved = true; } diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index 3660ec272f..cb66fd36fb 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -700,7 +700,7 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct } } -int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { +int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) { int32_t numOfElems = 0; SInputColumnInfoData* pInput = &pCtx->input; @@ -746,7 +746,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } else { @@ -760,7 +763,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -774,7 +780,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -788,7 +797,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -804,7 +816,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { - pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } } @@ -825,7 +840,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes); if (pCtx->subsidiaries.num > 0) { - pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } pBuf->assign = true; numOfElems = 1; @@ -889,9 +907,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { _over: if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { - pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); + int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pBuf->nullTupleSaved = true; } - return numOfElems; + *nElems = numOfElems; + return TSDB_CODE_SUCCESS; }