fix operator double free

This commit is contained in:
yihaoDeng 2022-08-12 11:22:41 +08:00
parent 46bd5e8685
commit bd929582f5
2 changed files with 67 additions and 73 deletions

View File

@ -1433,7 +1433,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
pAggInfo->groupId = groupId;
}
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
const int32_t* rowCellOffset) {
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
@ -1613,7 +1614,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
if (!pbInfo->mergeResultBlock) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
} else {
while(hasRemainResults(pGroupResInfo)) {
while (hasRemainResults(pGroupResInfo)) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break;
@ -2065,7 +2066,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t num
char** pNextStart) {
if (pColList == NULL) { // data from other sources
blockDataCleanup(pRes);
*pNextStart = (char*) blockDecode(pRes, pData);
*pNextStart = (char*)blockDecode(pRes, pData);
} else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList));
char* pStart = pData;
@ -2163,7 +2164,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
int32_t index = 0;
char* pStart = pRetrieveRsp->data;
while(index++ < pRetrieveRsp->numOfBlocks) {
while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart);
if (code != 0) {
@ -2177,8 +2178,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d"
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb,"
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d"
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb,"
" completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
@ -2186,9 +2189,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64
", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId,
pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize/1024.0);
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
}
taosMemoryFreeClear(pDataInfo->pRsp);
@ -3562,7 +3566,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
return pOperator;
_error:
destroyAggOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@ -4175,7 +4178,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
@ -4190,7 +4194,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);

View File

@ -97,7 +97,8 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->groupId = groupId;
}
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
uint64_t groupId) {
pRowSup->startRowIndex = rowIndex;
pRowSup->numOfRows = 0;
pRowSup->win.skey = tsList[rowIndex];
@ -869,7 +870,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
}
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);;
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
;
}
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
@ -1874,7 +1876,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -1931,7 +1932,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -1965,7 +1965,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
doKeepTuple(pRowSup, tsList[j], gid);
} else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap ||
(pRowSup->prevTs - tsList[j] >= 0 ) && (pRowSup->prevTs - tsList[j] <= gap)) {
(pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap)) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid);
if (j == 0 && pRowSup->startRowIndex != 0) {
@ -2150,11 +2150,9 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
}
pSliceInfo->fillLastPoint = isLastRow ? true : false;
}
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup,
SSDataBlock* pResBlock) {
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows;
// todo set the correct primary timestamp column
@ -2165,7 +2163,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
//SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
// SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
switch (pSliceInfo->fillType) {
@ -2181,15 +2179,15 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char *)&v, false);
colDataAppend(pDst, rows, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char *)&v, false);
colDataAppend(pDst, rows, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rows, (char *)&v, false);
colDataAppend(pDst, rows, (char*)&v, false);
}
pResBlock->info.rows += 1;
break;
@ -2212,7 +2210,7 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
colDataAppendNULL(pDst, rows);
} else {
taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type);
colDataAppend(pDst, rows, (char *)current.val, false);
colDataAppend(pDst, rows, (char*)current.val, false);
}
pResBlock->info.rows += 1;
@ -2445,8 +2443,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
@ -2457,8 +2455,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break;
}
}
} else {// it is the last row of current block
//store ts value as start, and calculate interp value when processing next block
} else { // it is the last row of current block
// store ts value as start, and calculate interp value when processing next block
doKeepLinearInfo(pSliceInfo, pBlock, i, true);
}
} else { // non-linear interpolation
@ -2480,15 +2478,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i, false);
// no need to increate pSliceInfo->current here
//pSliceInfo->current =
// pSliceInfo->current =
// taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (i < pBlock->info.rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
@ -2508,8 +2506,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
@ -2556,7 +2554,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pResBlock->info.rows += 1;
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->fillType == TSDB_FILL_LINEAR) {
doKeepLinearInfo(pSliceInfo, pBlock, i, false);
pSliceInfo->current =
@ -2566,8 +2563,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (nextTs > pSliceInfo->current) {
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
pInterval->precision);
if (pResBlock->info.rows >= pResBlock->info.capacity) {
break;
}
@ -2595,12 +2592,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
}
}
}
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) {
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
@ -2795,7 +2792,6 @@ _error:
destroySWindowOperatorInfo(pInfo, numOfCols);
}
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -3432,7 +3428,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
_error:
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -3617,7 +3612,6 @@ _error:
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
}
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -3735,8 +3729,8 @@ SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY star
return insertNewSessionWindow(pWinInfos, startTs, index + 1);
}
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted) {
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t groupId,
int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted) {
for (int32_t i = start; i < rows; ++i) {
if (!isInWindow(pWinInfo, pStartTs[i], gap) && (!pEndTs || !isInWindow(pWinInfo, pEndTs[i], gap))) {
return i - start;
@ -3913,8 +3907,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex);
winRows =
updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap, pStDeleted);
winRows = updateSessionWindowInfo(pCurWin, startTsCols, endTsCols, groupId, pSDataBlock->info.rows, i, pInfo->gap,
pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -4449,7 +4443,6 @@ _error:
destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs);
}
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -4886,7 +4879,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
_error:
destroyStreamStateOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -4996,7 +4988,6 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
}
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -5071,7 +5062,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
blockDataCleanup(pRes);
if (iaInfo->binfo.mergeResultBlock) {
while(1) {
while (1) {
if (pOperator->status == OP_EXEC_DONE) {
break;
}
@ -5161,7 +5152,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
_error:
destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols);
taosMemoryFreeClear(miaInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
@ -5466,7 +5456,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
_error:
destroyMergeIntervalOperatorInfo(miaInfo, numOfCols);
taosMemoryFreeClear(miaInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;