From d497b525f513bad1068793cf5fa5d9b9af0d3a34 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 22 Apr 2021 16:23:48 +0800 Subject: [PATCH] [td-225] fix bug found by regression test. --- src/client/src/tscLocalMerge.c | 297 +++++---------------------------- src/query/inc/qExecutor.h | 1 + src/query/src/qExecutor.c | 21 ++- 3 files changed, 60 insertions(+), 259 deletions(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 18b974551a..f6db1459f8 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -102,67 +102,33 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem } pCtx[i].interBufBytes = pExpr->base.interBytes; - pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo)); +// pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo)); pCtx[i].stableQuery = true; } - int16_t n = 0; - int16_t tagLen = 0; - SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES); - - SQLFunctionCtx *pCtx1 = NULL; - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); - if (pExpr->base.functionId == TSDB_FUNC_TAG_DUMMY || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) { - tagLen += pExpr->base.resBytes; - pTagCtx[n++] = &pCtx[i]; - } else if ((aAggs[pExpr->base.functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - pCtx1 = &pCtx[i]; - } - } - - if (n == 0 || pCtx == NULL) { - free(pTagCtx); - } else { - pCtx1->tagInfo.pTagCtxList = pTagCtx; - pCtx1->tagInfo.numOfTagCols = n; - pCtx1->tagInfo.tagsLen = tagLen; - } -} - -//static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) { -// int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo); -// int32_t offset = 0; +// int16_t n = 0; +// int16_t tagLen = 0; +// SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES); // -// SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); -// for(int32_t i = 0; i < numOfCols; ++i) { -// SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); -// -// if (pIField->pExpr->pExpr == NULL) { -// SExprInfo* pExpr = pIField->pExpr; -// -// pFillCol[i].col.bytes = pExpr->base.resBytes; -// pFillCol[i].col.type = (int8_t)pExpr->base.resType; -// pFillCol[i].col.colId = pExpr->base.colInfo.colId; -// pFillCol[i].flag = pExpr->base.colInfo.flag; -// pFillCol[i].col.offset = offset; -// pFillCol[i].functionId = pExpr->base.functionId; -// pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; -// } else { -// pFillCol[i].col.bytes = pIField->field.bytes; -// pFillCol[i].col.type = (int8_t)pIField->field.type; -// pFillCol[i].col.colId = -100; -// pFillCol[i].flag = TSDB_COL_NORMAL; -// pFillCol[i].col.offset = offset; -// pFillCol[i].functionId = -1; -// pFillCol[i].fillVal.i = pQueryInfo->fillVal[i]; +// SQLFunctionCtx *pCtx1 = NULL; +// for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { +// SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); +// if (pExpr->base.functionId == TSDB_FUNC_TAG_DUMMY || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) { +// tagLen += pExpr->base.resBytes; +// pTagCtx[n++] = &pCtx[i]; +// } else if ((aAggs[pExpr->base.functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) { +// pCtx1 = &pCtx[i]; // } -// -// offset += pFillCol[i].col.bytes; // } // -// return pFillCol; -//} +// if (n == 0 || pCtx == NULL) { +// free(pTagCtx); +// } else { +// pCtx1->tagInfo.pTagCtxList = pTagCtx; +// pCtx1->tagInfo.numOfTagCols = n; +// pCtx1->tagInfo.tagsLen = tagLen; +// } +} void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj *pSql) { @@ -330,15 +296,9 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde } assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pMerger->rowSize); -// pMerger->pFinalRes = calloc(1, pMerger->rowSize * pMerger->resColModel->capacity); - if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL /*|| pMerger->pResultBuf == NULL || - pMerger->pFinalRes == NULL || pMerger->prevRowOfInput == NULL*/) { + if (pMerger->pTempBuffer == NULL || pMerger->pLoserTree == NULL) { tfree(pMerger->pTempBuffer); -// tfree(pMerger->discardData); -// tfree(pMerger->pResultBuf); -// tfree(pMerger->pFinalRes); -// tfree(pMerger->prevRowOfInput); tfree(pMerger->pLoserTree); tfree(param); tfree(pMerger); @@ -492,50 +452,28 @@ void tscDestroyLocalMerger(SSqlObj *pSql) { return; } -// SSqlCmd * pCmd = &pSql->cmd; -// SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - // there is no more result, so we release all allocated resource SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL); - if (pLocalMerge != NULL) { -// pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo); + tfree(pLocalMerge->pResultBuf); + tfree(pLocalMerge->pCtx); -// if (pLocalMerge->pCtx != NULL) { -// int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); -// for (int32_t i = 0; i < numOfExprs; ++i) { -// SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i]; -// -// tVariantDestroy(&pCtx->tag); -// tfree(pCtx->resultInfo); -// -// if (pCtx->tagInfo.pTagCtxList != NULL) { -// tfree(pCtx->tagInfo.pTagCtxList); -// } -// } -// -// tfree(pLocalMerge->pCtx); -// } - - tfree(pLocalMerge->pResultBuf); - - if (pLocalMerge->pLoserTree) { - tfree(pLocalMerge->pLoserTree->param); - tfree(pLocalMerge->pLoserTree); - } - - tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, pLocalMerge->finalModel, - pLocalMerge->numOfVnode); - for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) { - tfree(pLocalMerge->pLocalDataSrc[i]); - } - - pLocalMerge->numOfBuffer = 0; - pLocalMerge->numOfCompleted = 0; - free(pLocalMerge); - } else { - tscDebug("%p already freed or another free function is invoked", pSql); + if (pLocalMerge->pLoserTree) { + tfree(pLocalMerge->pLoserTree->param); + tfree(pLocalMerge->pLoserTree); } + tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, + pLocalMerge->finalModel, pLocalMerge->numOfVnode); + for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) { + tfree(pLocalMerge->pLocalDataSrc[i]); + } + + pLocalMerge->numOfBuffer = 0; + pLocalMerge->numOfCompleted = 0; + tfree(pLocalMerge->pTempBuffer); + + free(pLocalMerge); + tscDebug("%p free local reducer finished", pSql); } @@ -611,49 +549,6 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm } } -bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pMerger, char *pPrev, tFilePage *tmpBuffer) { - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - - // disable merge procedure for column projection query - int16_t functionId = pMerger->pCtx[0].functionId; - if (pMerger->orderPrjOnSTable) { - return true; - } - - if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) { - return false; - } - - tOrderDescriptor *pOrderDesc = pMerger->pDesc; - SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo; - - // no group by columns, all data belongs to one group - int32_t numOfCols = orderInfo->numOfCols; - if (numOfCols <= 0) { - return true; - } - - if (orderInfo->colIndex[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - /* - * super table interval query - * if the order columns is the primary timestamp, all result data belongs to one group - */ - assert(pQueryInfo->interval.interval > 0); - if (numOfCols == 1) { - return true; - } - } else { // simple group by query - assert(pQueryInfo->interval.interval == 0); - } - - // only one row exists - int32_t index = orderInfo->colIndex[0]; - int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset; - - int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset); - return ret == 0; -} - int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc, SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) { SSqlCmd *pCmd = &pSql->cmd; @@ -944,6 +839,12 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S for(int32_t j = 0; j < numOfExpr; ++j) { pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); + if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) { + pCtx[j].ptsOutputBuf = pCtx[0].pOutput; + } + } + + for(int32_t j = 0; j < numOfExpr; ++j) { aAggs[pCtx[j].functionId].init(&pCtx[j]); } @@ -985,98 +886,6 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S tfree(add); } -//static int64_t getNumOfResultLocal(SQLFunctionCtx *pCtx, int32_t numOfExprs) { -// int64_t maxOutput = 0; -// -// for (int32_t j = 0; j < numOfExprs; ++j) { -// /* -// * ts, tag, tagprj function can not decide the output number of current query -// * the number of output result is decided by main output -// */ -// int32_t functionId = pCtx[j].functionId; -// if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) { -// continue; -// } -// -// SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]); -// if (maxOutput < pResInfo->numOfRes) { -// maxOutput = pResInfo->numOfRes; -// } -// } -// -// return maxOutput; -//} - -/* - * in handling the top/bottom query, which produce more than one rows result, - * the tsdb_func_tags only fill the first row of results, the remain rows need to - * filled with the same result, which is the tags, specified in group by clause - * - */ -//static void fillMultiRowsOfTagsVal(SLocalMerger *pLocalMerge, int32_t numOfRes, int32_t numOfExprs) { -// for (int32_t k = 0; k < numOfExprs; ++k) { -// SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k]; -// if (pCtx->functionId != TSDB_FUNC_TAG) { -// continue; -// } -// -// int32_t inc = numOfRes - 1; // tsdb_func_tag function only produce one row of result -// memset(pLocalMerge->tagBuf, 0, (size_t)pLocalMerge->tagBufLen); -// memcpy(pLocalMerge->tagBuf, pCtx->pOutput, (size_t)pCtx->outputBytes); -// -// for (int32_t i = 0; i < inc; ++i) { -// pCtx->pOutput += pCtx->outputBytes; -// memcpy(pCtx->pOutput, pLocalMerge->tagBuf, (size_t)pCtx->outputBytes); -// } -// } -//} - -//int32_t finalizeRes(SLocalMerger *pLocalMerge, int32_t numOfExprs) { -// for (int32_t k = 0; k < numOfExprs; ++k) { -// SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k]; -// aAggs[pCtx->functionId].xFinalize(pCtx); -// } -// -// pLocalMerge->hasPrevRow = false; -// -// int32_t numOfRes = (int32_t)getNumOfResultLocal(pLocalMerge->pCtx, numOfExprs); -// pLocalMerge->pResultBuf->num += numOfRes; -// -// fillMultiRowsOfTagsVal(pLocalMerge, numOfRes, numOfExprs); -// return numOfRes; -//} - -/* - * points merge: - * points are merged according to the sort info, which is tags columns and timestamp column. - * In case of points without either tags columns or timestamp, such as - * results generated by simple aggregation function, we merge them all into one points - * *Exception*: column projection query, required no merge procedure - */ -//bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) { -// int32_t ret = 0; // merge all result by default -// -// int16_t functionId = pLocalMerge->pCtx[0].functionId; -// -// // todo opt performance -// if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query -// ret = 1; // disable merge procedure -// } else { -// tOrderDescriptor *pDesc = pLocalMerge->pDesc; -// if (pDesc->orderInfo.numOfCols > 0) { -// if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc -// // todo refactor comparator -// ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); -// } else { // desc -// ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); -// } -// } -// } -// -// /* if ret == 0, means the result belongs to the same group */ -// return (ret == 0); -//} - bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { int32_t ret = 0; size_t size = taosArrayGetSize(columnIndexList); @@ -1087,20 +896,6 @@ bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, return (ret == 0); } -void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning - size_t t = tscSqlExprNumOfExprs(pQueryInfo); - for (int32_t i = 0; i < t; ++i) { - SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); - pLocalMerge->pCtx[i].pOutput = pLocalMerge->pResultBuf->data + pExpr->base.offset * pLocalMerge->resColModel->capacity; - - if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM || pExpr->base.functionId == TSDB_FUNC_DIFF) { - pLocalMerge->pCtx[i].ptsOutputBuf = pLocalMerge->pCtx[0].pOutput; - } - } - - memset(pLocalMerge->pResultBuf, 0, pLocalMerge->nResultBufSize + sizeof(tFilePage)); -} - static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) { return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted); } @@ -1363,9 +1158,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { // not belongs to the same group, return the result of current group setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); - - // todo: it may be overflow handle the output buffer problem - updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor); doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pBlock); savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 52e3439c49..fc924c661f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -476,6 +476,7 @@ typedef struct SMultiwayMergeInfo { int64_t seed; char **prevRow; SArray *orderColumnList; + int32_t resultRowFactor; bool hasGroupColData; char **currentGroupColData; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8cc8af5d0b..d4a63f8dbb 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -191,7 +191,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); -//static bool isPointInterpoQuery(SQueryAttr *pQueryAttr); static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); @@ -4511,20 +4510,27 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) { static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param; destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput); + + taosArrayDestroy(pInfo->orderColumnList); + taosArrayDestroy(pInfo->groupColumnList); + tfree(pInfo->prevRow); + tfree(pInfo->currentGroupColData); } SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); -// int32_t numOfRows = -// (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); + pInfo->resultRowFactor = + (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, + false)); pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx pInfo->pMerge = param; pInfo->bufCapacity = 4096; - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); + + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); @@ -4533,7 +4539,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, // TODO refactor int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { - len += pExpr[i].base.resBytes; + len += pExpr[i].base.colBytes; } int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0; @@ -4593,7 +4599,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx { int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { - len += pExpr[i].base.resBytes; + len += pExpr[i].base.colBytes; } int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0; @@ -4617,7 +4623,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->exec = doMultiwayMergeSort; - + pOperator->cleanup = destroyGlobalAggOperatorInfo; return pOperator; } @@ -7081,6 +7087,7 @@ void freeQInfo(SQInfo *pQInfo) { pQueryAttr->pExpr1 = destroyQueryFuncExpr(pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pQueryAttr->pExpr2 = destroyQueryFuncExpr(pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); + pQueryAttr->pExpr3 = destroyQueryFuncExpr(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3); tfree(pQueryAttr->tagColList); tfree(pQueryAttr->pFilterInfo);