From 9847abf166ce71c600798fc90e28c6b5cb86a35d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 14 Jul 2022 17:34:06 +0800 Subject: [PATCH] fix(sma): double free --- source/dnode/vnode/src/sma/smaRollup.c | 40 ++++++------------- source/libs/executor/src/scanoperator.c | 12 +++--- source/libs/executor/src/timewindowoperator.c | 1 - 3 files changed, 19 insertions(+), 34 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 06ffb639de..a6d56acbfb 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -30,7 +30,7 @@ typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, - SReadHandle *handle, int8_t idx); + int8_t idx); static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, STSchema *pTSchema, tb_uid_t suid, int8_t level); static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); @@ -256,14 +256,20 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui } static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, - SReadHandle *pReadHandle, int8_t idx) { + int8_t idx) { SRetention *pRetention = SMA_RETENTION(pSma); STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + SReadHandle handle = { + .meta = pSma->pVnode->pMeta, + .vnode = pSma->pVnode, + .initTqReader = 1, + }; + if (param->qmsg[idx]) { SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->refId = RSMA_REF_ID(pStat); - pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle); + pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); if (!pItem->taskInfo) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; goto _err; @@ -299,10 +305,6 @@ _err: * @return int32_t */ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { - SVnode *pVnode = pSma->pVnode; - SMeta *pMeta = pVnode->pMeta; - SMsgCb *pMsgCb = &pVnode->msgCb; - if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; @@ -331,19 +333,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con return TSDB_CODE_FAILED; } - STqReader *pReader = tqOpenReader(pVnode); - if (!pReader) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - SReadHandle handle = { - .tqReader = pReader, - .meta = pMeta, - .pMsgCb = pMsgCb, - .vnode = pVnode, - }; - STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1); if (!pTSchema) { terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; @@ -352,11 +341,11 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->suid = suid; - if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 0) < 0) { + if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { goto _err; } - if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 1) < 0) { + if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) { goto _err; } @@ -369,7 +358,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con return TSDB_CODE_SUCCESS; _err: tdFreeRSmaInfo(pSma, pRSmaInfo); - taosMemoryFree(pReader); return TSDB_CODE_FAILED; } @@ -404,7 +392,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) { * @param pReq * @return int32_t */ -int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { +int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { SVnode *pVnode = pSma->pVnode; if (!VND_IS_RSMA(pVnode)) { smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name, @@ -412,11 +400,9 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { return TSDB_CODE_SUCCESS; } - - smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); return TSDB_CODE_SUCCESS; - } +} /** * @brief store suid/[uids], prefer to use array and then hash diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index aa84500fdc..0f44ac48a4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -39,7 +39,7 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); -static bool processBlockWithProbability(const SSampleExecInfo* pInfo); +static bool processBlockWithProbability(const SSampleExecInfo* pInfo); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { #if 0 @@ -1178,10 +1178,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) { SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j); if (pResCol->info.colId == pColMatchInfo->colId) { - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); -// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol); + // taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol); colExists = true; break; } @@ -1201,14 +1200,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { - blockDataFreeRes((SSDataBlock*) pBlock); + blockDataFreeRes((SSDataBlock*)pBlock); longjmp(pTaskInfo->env, code); } } doFilter(pInfo->pCondition, pInfo->pRes); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); - blockDataFreeRes((SSDataBlock*) pBlock); + blockDataFreeRes((SSDataBlock*)pBlock); return 0; } @@ -1444,7 +1443,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; -#if 0 +#if 1 if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; destroyTableScanOperatorInfo(pTableScanInfo, 1); @@ -1456,6 +1455,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { if (pStreamScan->pColMatchInfo) { taosArrayDestroy(pStreamScan->pColMatchInfo); } + updateInfoDestroy(pStreamScan->pUpdateInfo); blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pPullDataRes); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 22c09de64f..acc56af5cd 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1537,7 +1537,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); - /*taosMemoryFreeClear(pChildOp->info);*/ taosMemoryFreeClear(pChildOp); } }