fix loadTupleData

This commit is contained in:
Ganlin Zhao 2023-01-04 17:59:30 +08:00
parent 6411e717df
commit af0bd9c72b
1 changed files with 44 additions and 25 deletions

View File

@ -776,13 +776,14 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
int32_t rowIndex); int32_t rowIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
@ -799,17 +800,17 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
} }
if (pEntryInfo->numOfRes > 0) { if (pEntryInfo->numOfRes > 0) {
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); code = setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
} else { } else {
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow); code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
} }
return pEntryInfo->numOfRes; return code;
} }
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
if (pCtx->subsidiaries.num <= 0) { if (pCtx->subsidiaries.num <= 0) {
return; return TSDB_CODE_SUCCESS;
} }
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
@ -819,17 +820,23 @@ void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colDataAppendNULL(pDstCol, rowIndex); colDataAppendNULL(pDstCol, rowIndex);
} }
return TSDB_CODE_SUCCESS;
} }
void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) { int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) {
if (pCtx->subsidiaries.num <= 0) { if (pCtx->subsidiaries.num <= 0) {
return; return TSDB_CODE_SUCCESS;
} }
if ((pCtx->saveHandle.pBuf != NULL && pTuplePos->pageId != -1) || if ((pCtx->saveHandle.pBuf != NULL && pTuplePos->pageId != -1) ||
(pCtx->saveHandle.pState && pTuplePos->streamTupleKey.ts > 0)) { (pCtx->saveHandle.pState && pTuplePos->streamTupleKey.ts > 0)) {
int32_t numOfCols = pCtx->subsidiaries.num; int32_t numOfCols = pCtx->subsidiaries.num;
const char* p = loadTupleData(pCtx, pTuplePos); const char* p = loadTupleData(pCtx, pTuplePos);
if (p == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
bool* nullList = (bool*)p; bool* nullList = (bool*)p;
char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
@ -853,6 +860,8 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
tdbFree((void*)p); tdbFree((void*)p);
} }
} }
return TSDB_CODE_SUCCESS;
} }
void releaseSource(STuplePos* pPos) { void releaseSource(STuplePos* pPos) {
@ -2406,6 +2415,7 @@ int32_t firstFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMerge
int32_t lastFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, false); } int32_t lastFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, false); }
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
@ -2416,12 +2426,14 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes); colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes);
// handle selectivity // handle selectivity
setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
return pResInfo->numOfRes; return code;
} }
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
@ -2437,10 +2449,10 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false); colDataAppend(pCol, pBlock->info.rows, res, false);
setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows); code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
taosMemoryFree(res); taosMemoryFree(res);
return 1; return code;
} }
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
@ -3111,6 +3123,9 @@ int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) { static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) {
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) {
return NULL;
}
char* p = pPage->data + pPos->offset; char* p = pPage->data + pPos->offset;
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
return p; return p;
@ -3127,6 +3142,8 @@ const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) {
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
@ -3139,8 +3156,8 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
if (pEntryInfo->numOfRes <= 0) { if (pEntryInfo->numOfRes <= 0) {
colDataAppendNULL(pCol, currentRow); colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow); code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
return pEntryInfo->numOfRes; return code;
} }
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STopBotResItem* pItem = &pRes->pItems[i]; STopBotResItem* pItem = &pRes->pItems[i];
@ -3149,11 +3166,11 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId, qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId,
pItem->tuplePos.offset); pItem->tuplePos.offset);
#endif #endif
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); code = setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
currentRow += 1; currentRow += 1;
} }
return pEntryInfo->numOfRes; return code;
} }
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) {
@ -4583,6 +4600,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
} }
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = getSampleOutputInfo(pCtx); SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
@ -4594,15 +4612,15 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
if (pInfo->numSampled == 0) { if (pInfo->numSampled == 0) {
colDataAppendNULL(pCol, currentRow); colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
return pInfo->numSampled; return code;
} }
for (int32_t i = 0; i < pInfo->numSampled; ++i) { for (int32_t i = 0; i < pInfo->numSampled; ++i) {
colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false);
setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i); code = setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i);
} }
return pInfo->numSampled; return code;
} }
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
@ -4942,6 +4960,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
} }
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
@ -4961,15 +4980,15 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (maxCount != 0) { if (maxCount != 0) {
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
colDataAppend(pCol, currentRow, pResItem->data, false); colDataAppend(pCol, currentRow, pResItem->data, false);
setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow); code = setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow);
} else { } else {
colDataAppendNULL(pCol, currentRow); colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
} }
taosHashCleanup(pInfo->pHash); taosHashCleanup(pInfo->pHash);
return pResInfo->numOfRes; return code;
} }
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {