Merge pull request #16222 from taosdata/feature/stream
fix(query): memory leak
This commit is contained in:
commit
240747abed
|
@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
ASSERT(smaObj.uid != 0);
|
||||
char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0};
|
||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name);
|
||||
snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name);
|
||||
memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN);
|
||||
smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN);
|
||||
smaObj.stbUid = pStb->uid;
|
||||
|
@ -530,7 +530,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
streamObj.sourceDbUid = pDb->uid;
|
||||
streamObj.targetDbUid = pDb->uid;
|
||||
streamObj.version = 1;
|
||||
streamObj.sql = pCreate->sql;
|
||||
streamObj.sql = strdup(pCreate->sql);
|
||||
streamObj.smaId = smaObj.uid;
|
||||
streamObj.watermark = pCreate->watermark;
|
||||
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
||||
|
@ -585,6 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
return -1;
|
||||
}
|
||||
if (pAst != NULL) nodesDestroyNode(pAst);
|
||||
nodesDestroyNode((SNode *)pPlan);
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
|
@ -609,6 +610,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
code = 0;
|
||||
|
||||
_OVER:
|
||||
tFreeStreamObj(&streamObj);
|
||||
mndDestroySmaObj(&smaObj);
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
|
|
|
@ -509,6 +509,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
|
|||
pVgroup->replica = 1;
|
||||
|
||||
if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1;
|
||||
taosArrayDestroy(pArray);
|
||||
|
||||
mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
|
||||
return 0;
|
||||
|
@ -1862,4 +1863,4 @@ _OVER:
|
|||
#endif
|
||||
}
|
||||
|
||||
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
|
||||
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
|
||||
|
|
|
@ -341,7 +341,7 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
||||
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
|
||||
|
||||
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||
if (pReader->tbIdHash) {
|
||||
|
|
|
@ -3217,8 +3217,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
|
|||
}
|
||||
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
|
@ -3242,8 +3242,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp
|
|||
pInfo->existNewGroupBlock = NULL;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
|
||||
|
@ -3259,8 +3259,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera
|
|||
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||
|
@ -3270,13 +3270,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo
|
|||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
|
||||
colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);
|
||||
|
||||
for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
|
||||
for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
|
||||
SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
|
||||
ASSERT(pCol->notFillCol);
|
||||
|
||||
SExprInfo* pExpr = pCol->pExpr;
|
||||
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
|
||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
|
||||
SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
|
||||
|
@ -3664,7 +3664,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosRemoveRef(exchangeObjRefPool, pExInfo->self);
|
||||
}
|
||||
|
||||
void freeSourceDataInfo(void *p) {
|
||||
void freeSourceDataInfo(void* p) {
|
||||
SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
|
||||
taosMemoryFreeClear(pInfo->pRsp);
|
||||
}
|
||||
|
@ -3694,8 +3694,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
|
||||
w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
|
||||
|
||||
pInfo->pFillInfo =
|
||||
taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id);
|
||||
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id);
|
||||
|
||||
pInfo->win = win;
|
||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
|
@ -3721,10 +3721,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
|
||||
|
||||
SInterval* pInterval =
|
||||
SInterval* pInterval =
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||
|
||||
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||
|
@ -3741,9 +3741,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
int32_t code =
|
||||
initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues,
|
||||
pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order);
|
||||
int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
|
||||
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
||||
pTaskInfo->id.str, pInterval, type, order);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
|
|
@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
}
|
||||
taosArrayDestroy(tableIdList);
|
||||
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
|
||||
} else {
|
||||
taosArrayDestroy(pColIds);
|
||||
}
|
||||
|
||||
// create the pseduo columns info
|
||||
|
|
|
@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
blockDataDestroy(pInfo->pPullDataRes);
|
||||
taosArrayDestroy(pInfo->pRecycledPages);
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
taosArrayDestroy(pInfo->pDelWins);
|
||||
blockDataDestroy(pInfo->pDelRes);
|
||||
|
||||
if (pInfo->pChildren) {
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
|
||||
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
||||
taosMemoryFree(pChildOp->pDownstream);
|
||||
cleanupExprSupp(&pChildOp->exprSupp);
|
||||
taosMemoryFreeClear(pChildOp);
|
||||
}
|
||||
taosArrayDestroy(pInfo->pChildren);
|
||||
}
|
||||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
|
@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
|
||||
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
|
|
@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
void tFreeSStreamTask(SStreamTask* pTask) {
|
||||
qDebug("free stream task %d", pTask->taskId);
|
||||
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
|
||||
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
|
||||
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
||||
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
|
||||
taosArrayDestroy(pTask->childEpInfo);
|
||||
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
|
||||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||
|
|
Loading…
Reference in New Issue