diff --git a/include/common/tcommon.h b/include/common/tcommon.h index f74795a250..36b1abf48a 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -91,6 +91,7 @@ typedef struct STuplePos { }; STupleKey streamTupleKey; }; + bool isValid; } STuplePos; typedef struct SFirstLastRes { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 695f62b7d9..70aa7b4c0b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2014,20 +2014,26 @@ static void prepareBuf(SqlFunctionCtx* pCtx) { ASSERT(pCtx->subsidiaries.rowLen > 0); } -static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, +static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) { if (pCtx->subsidiaries.num <= 0) { - return; + return TSDB_CODE_SUCCESS; } if (!pInfo->hasResult) { pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock); + if (!pInfo->pos.isValid) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + return terrno; + } } else { updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); } + + return TSDB_CODE_SUCCESS; } -static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) { +static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -2037,9 +2043,13 @@ static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t cur memcpy(pInfo->buf, pData, pInfo->bytes); pInfo->ts = currentTs; - firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->hasResult = true; + return TSDB_CODE_SUCCESS; } // This ordinary first function does not care if current scan is ascending order or descending order scan @@ -2063,7 +2073,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); // save selectivity value for column consisted of all null values - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } return TSDB_CODE_SUCCESS; } @@ -2136,7 +2149,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = pts[i]; if (pResInfo->numOfRes == 0 || pInfo->ts > cts) { - doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); + int32_t code = doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2144,7 +2160,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { if (numOfElems == 0) { // save selectivity value for column consisted of all null values - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; @@ -2171,7 +2190,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); // save selectivity value for column consisted of all null values - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } return TSDB_CODE_SUCCESS; } @@ -2261,7 +2283,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { char* data = colDataGetData(pInputCol, chosen); - doSaveCurrentVal(pCtx, i, cts, type, data); + int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2269,7 +2294,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { char* data = colDataGetData(pInputCol, i); - doSaveCurrentVal(pCtx, i, pts[i], type, data); + int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2283,7 +2311,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { char* data = colDataGetData(pInputCol, i); - doSaveCurrentVal(pCtx, i, pts[i], type, data); + int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2294,7 +2325,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { // save selectivity value for column consisted of all null values if (numOfElems == 0) { - firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } // SET_VAL(pResInfo, numOfElems, 1); @@ -2322,12 +2356,17 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p return TSDB_CODE_SUCCESS; } -static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, +static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, int32_t rowIndex) { if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) { - firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pOutput->hasResult = true; } + + return TSDB_CODE_SUCCESS; } static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) { @@ -2343,7 +2382,10 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer for (int32_t i = start; i < start + pInput->numOfRows; ++i) { char* data = colDataGetData(pCol, i); SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data); - firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); + int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (!numOfElems) { numOfElems = pInputInfo->hasResult ? 1 : 0; } @@ -2412,7 +2454,7 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return TSDB_CODE_SUCCESS; } -static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) { +static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; @@ -2429,9 +2471,14 @@ static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, i } pInfo->ts = cts; - firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->hasResult = true; + + return TSDB_CODE_SUCCESS; } int32_t lastRowFunction(SqlFunctionCtx* pCtx) { @@ -2493,7 +2540,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) { numOfElems++; if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - doSaveLastrow(pCtx, data, i, cts, pInfo); + int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } } @@ -2948,18 +2998,27 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf if (pHandle->currentPage == -1) { pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); + if (pPage == NULL) { + return p; + } pPage->num = sizeof(SFilePage); } else { pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); + if (pPage == NULL) { + return p; + } if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { // current page is all used, let's prepare a new buffer page releaseBufPage(pHandle->pBuf, pPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); + if (pPage == NULL) { + return p; + } pPage->num = sizeof(SFilePage); } } - p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num}; + p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num, .isValid = true}; memcpy(pPage->data + pPage->num, pBuf, length); pPage->num += length; @@ -5697,7 +5756,10 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - doSaveLastrow(pCtx, data, i, cts, pInfo); + int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pResInfo->numOfRes = 1; } }