fix[query]:fix memory invalid write in group by nchar data.
This commit is contained in:
parent
ca5d5ca7bb
commit
b223010bdd
|
@ -671,7 +671,7 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
|||
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey);
|
||||
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
||||
void toSDatablock(SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
|
||||
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
|
||||
|
|
|
@ -1687,12 +1687,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
|
||||
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
if (pResultRow->key == NULL) {
|
||||
pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
|
||||
varDataCopy(pResultRow->key, pData);
|
||||
} else {
|
||||
assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
|
||||
}
|
||||
// todo disable this
|
||||
// if (pResultRow->key == NULL) {
|
||||
// pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
|
||||
// varDataCopy(pResultRow->key, pData);
|
||||
// } else {
|
||||
// ASSERT(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
|
||||
// }
|
||||
} else {
|
||||
int64_t v = -1;
|
||||
GET_TYPED_DATA(v, int64_t, type, pData);
|
||||
|
@ -4467,7 +4468,7 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
|
|||
return pResBlock;
|
||||
}
|
||||
|
||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey);
|
||||
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey);
|
||||
static void cleanupAggSup(SAggSupporter* pAggSup);
|
||||
|
||||
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -4826,7 +4827,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, pTaskInfo->id.str);
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -5890,11 +5892,11 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
|||
taosMemoryFreeClear(pOperator);
|
||||
}
|
||||
|
||||
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey) {
|
||||
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||
pAggSup->keyBuf = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES);
|
||||
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize);
|
||||
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
||||
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
||||
|
@ -5921,12 +5923,12 @@ static void cleanupAggSup(SAggSupporter* pAggSup) {
|
|||
}
|
||||
|
||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
int32_t numOfRows, SSDataBlock* pResultBlock, const char* pkey) {
|
||||
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
|
||||
pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
|
||||
pBasicInfo->pRes = pResultBlock;
|
||||
pBasicInfo->capacity = numOfRows;
|
||||
|
||||
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, pkey);
|
||||
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -5965,7 +5967,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
|
||||
int32_t numOfRows = 1;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str);
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, keyBufSize, pTaskInfo->id.str);
|
||||
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
|
||||
goto _error;
|
||||
|
@ -6081,8 +6084,9 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI
|
|||
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
||||
|
||||
int32_t numOfRows = 1;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
|
||||
goto _error;
|
||||
|
@ -6142,7 +6146,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
|||
|
||||
int32_t numOfCols = num;
|
||||
int32_t numOfRows = 4096;
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
|
||||
|
||||
|
@ -6190,7 +6195,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pInfo->primaryTsIndex = primaryTsSlot;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
initExecTimeWindowInfo(&pInfo->timeWindowData, &pInfo->win);
|
||||
|
||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
|
@ -6266,8 +6272,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
}
|
||||
|
||||
pInfo->colIndex = -1;
|
||||
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, pTaskInfo->id.str);
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||
|
||||
pOperator->name = "StateWindowOperator";
|
||||
|
@ -6299,7 +6305,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
}
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
|
|||
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = taosArrayGet(pGroupColList, i);
|
||||
(*keyLen) += pCol->bytes;
|
||||
(*keyLen) += pCol->bytes; // actual data + null_flag
|
||||
|
||||
SGroupKeys key = {0};
|
||||
key.bytes = pCol->bytes;
|
||||
|
@ -61,8 +61,9 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
|
|||
}
|
||||
|
||||
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
|
||||
(*keyLen) += nullFlagSize;
|
||||
|
||||
(*keyBuf) = taosMemoryCalloc(1, (*keyLen) + nullFlagSize);
|
||||
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
|
||||
if ((*keyBuf) == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -169,11 +170,16 @@ static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t
|
|||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
|
||||
|
||||
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
|
||||
// todo OPT all/all not NULL
|
||||
if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
|
||||
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
char* data = colDataGetData(pColInfoData, rowIndex);
|
||||
|
||||
memcpy(dest, data, pColInfoData->info.bytes);
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
varDataCopy(dest, data);
|
||||
} else {
|
||||
memcpy(dest, data, pColInfoData->info.bytes);
|
||||
}
|
||||
} else { // it is a NULL value
|
||||
pEntryInfo->isNullRes = 1;
|
||||
}
|
||||
|
@ -326,14 +332,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
|
||||
pInfo->pGroupCols = pGroupColList;
|
||||
pInfo->pCondition = pCondition;
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||
|
||||
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
||||
|
||||
pOperator->name = "GroupbyAggOperator";
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
|
|
Loading…
Reference in New Issue