From b223010bdd69e917c0dd5db9f0c14305a2fb1dc4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 11 Apr 2022 14:54:16 +0800 Subject: [PATCH] fix[query]:fix memory invalid write in group by nchar data. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 45 ++++++++++++++---------- source/libs/executor/src/groupoperator.c | 17 ++++++--- 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9a75830ec4..f6f863c82d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -671,7 +671,7 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey); + int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1bc84f85d7..1c9ca87563 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1687,12 +1687,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { if (IS_VAR_DATA_TYPE(type)) { - if (pResultRow->key == NULL) { - pResultRow->key = taosMemoryMalloc(varDataTLen(pData)); - varDataCopy(pResultRow->key, pData); - } else { - assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0); - } + // todo disable this +// if (pResultRow->key == NULL) { +// pResultRow->key = taosMemoryMalloc(varDataTLen(pData)); +// varDataCopy(pResultRow->key, pData); +// } else { +// ASSERT(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0); +// } } else { int64_t v = -1; GET_TYPED_DATA(v, int64_t, type, pData); @@ -4467,7 +4468,7 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { return pResBlock; } -static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey); +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); static void cleanupAggSup(SAggSupporter* pAggSup); static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { @@ -4826,7 +4827,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t goto _error; } - int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, pTaskInfo->id.str); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -5890,11 +5892,11 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { taosMemoryFreeClear(pOperator); } -int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey) { +int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); - pAggSup->keyBuf = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES); + pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); @@ -5921,12 +5923,12 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { } int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, - int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey) { + int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; pBasicInfo->capacity = numOfRows; - doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, pkey); + doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); return TSDB_CODE_SUCCESS; } @@ -5965,7 +5967,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = 1; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -6081,8 +6084,9 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); int32_t numOfRows = 1; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; @@ -6142,7 +6146,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p int32_t numOfCols = num; int32_t numOfRows = 4096; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols); @@ -6190,7 +6195,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->primaryTsIndex = primaryTsSlot; int32_t numOfRows = 4096; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); @@ -6266,8 +6272,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf } pInfo->colIndex = -1; - - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, pTaskInfo->id.str); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pOperator->name = "StateWindowOperator"; @@ -6299,7 +6305,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo } int32_t numOfRows = 4096; - int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3eb8ff1b72..9410d1384d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -46,7 +46,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = taosArrayGet(pGroupColList, i); - (*keyLen) += pCol->bytes; + (*keyLen) += pCol->bytes; // actual data + null_flag SGroupKeys key = {0}; key.bytes = pCol->bytes; @@ -61,8 +61,9 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** } int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; + (*keyLen) += nullFlagSize; - (*keyBuf) = taosMemoryCalloc(1, (*keyLen) + nullFlagSize); + (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); if ((*keyBuf) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -169,11 +170,16 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]); SColumnInfoData* pColInfoData = pCtx[i].input.pData[0]; + // todo OPT all/all not NULL if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) { char* dest = GET_ROWCELL_INTERBUF(pEntryInfo); char* data = colDataGetData(pColInfoData, rowIndex); - memcpy(dest, data, pColInfoData->info.bytes); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + varDataCopy(dest, data); + } else { + memcpy(dest, data, pColInfoData->info.bytes); + } } else { // it is a NULL value pEntryInfo->isNullRes = 1; } @@ -326,14 +332,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->pGroupCols = pGroupColList; pInfo->pCondition = pCondition; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error; } + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED;