fix(sma): double free
This commit is contained in:
parent
443234076d
commit
9847abf166
|
@ -30,7 +30,7 @@ typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
|
||||||
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
|
||||||
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
|
||||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||||
SReadHandle *handle, int8_t idx);
|
int8_t idx);
|
||||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem,
|
||||||
STSchema *pTSchema, tb_uid_t suid, int8_t level);
|
STSchema *pTSchema, tb_uid_t suid, int8_t level);
|
||||||
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid);
|
||||||
|
@ -256,14 +256,20 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
|
||||||
SReadHandle *pReadHandle, int8_t idx) {
|
int8_t idx) {
|
||||||
SRetention *pRetention = SMA_RETENTION(pSma);
|
SRetention *pRetention = SMA_RETENTION(pSma);
|
||||||
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
|
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
|
||||||
|
|
||||||
|
SReadHandle handle = {
|
||||||
|
.meta = pSma->pVnode->pMeta,
|
||||||
|
.vnode = pSma->pVnode,
|
||||||
|
.initTqReader = 1,
|
||||||
|
};
|
||||||
|
|
||||||
if (param->qmsg[idx]) {
|
if (param->qmsg[idx]) {
|
||||||
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
|
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
|
||||||
pItem->refId = RSMA_REF_ID(pStat);
|
pItem->refId = RSMA_REF_ID(pStat);
|
||||||
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
|
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle);
|
||||||
if (!pItem->taskInfo) {
|
if (!pItem->taskInfo) {
|
||||||
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -299,10 +305,6 @@ _err:
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
|
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
|
||||||
SMeta *pMeta = pVnode->pMeta;
|
|
||||||
SMsgCb *pMsgCb = &pVnode->msgCb;
|
|
||||||
|
|
||||||
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
|
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
|
||||||
smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
|
smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -331,19 +333,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqReader *pReader = tqOpenReader(pVnode);
|
|
||||||
if (!pReader) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
SReadHandle handle = {
|
|
||||||
.tqReader = pReader,
|
|
||||||
.meta = pMeta,
|
|
||||||
.pMsgCb = pMsgCb,
|
|
||||||
.vnode = pVnode,
|
|
||||||
};
|
|
||||||
|
|
||||||
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
|
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
|
||||||
if (!pTSchema) {
|
if (!pTSchema) {
|
||||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
|
@ -352,11 +341,11 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
pRSmaInfo->pTSchema = pTSchema;
|
pRSmaInfo->pTSchema = pTSchema;
|
||||||
pRSmaInfo->suid = suid;
|
pRSmaInfo->suid = suid;
|
||||||
|
|
||||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 0) < 0) {
|
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, &handle, 1) < 0) {
|
if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,7 +358,6 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
tdFreeRSmaInfo(pSma, pRSmaInfo);
|
tdFreeRSmaInfo(pSma, pRSmaInfo);
|
||||||
taosMemoryFree(pReader);
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,7 +392,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) {
|
||||||
* @param pReq
|
* @param pReq
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
if (!VND_IS_RSMA(pVnode)) {
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name,
|
||||||
|
@ -412,11 +400,9 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief store suid/[uids], prefer to use array and then hash
|
* @brief store suid/[uids], prefer to use array and then hash
|
||||||
|
|
|
@ -39,7 +39,7 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
|
||||||
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||||
const char* dbName);
|
const char* dbName);
|
||||||
|
|
||||||
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
|
||||||
|
|
||||||
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -1178,10 +1178,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
|
for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
|
||||||
SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
|
SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
|
||||||
if (pResCol->info.colId == pColMatchInfo->colId) {
|
if (pResCol->info.colId == pColMatchInfo->colId) {
|
||||||
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId);
|
||||||
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
|
colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
|
||||||
// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
|
// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol);
|
||||||
colExists = true;
|
colExists = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1201,14 +1200,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
blockDataFreeRes((SSDataBlock*) pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doFilter(pInfo->pCondition, pInfo->pRes);
|
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
blockDataFreeRes((SSDataBlock*) pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1444,7 +1443,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo
|
||||||
|
|
||||||
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
|
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
|
||||||
#if 0
|
#if 1
|
||||||
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
||||||
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
||||||
destroyTableScanOperatorInfo(pTableScanInfo, 1);
|
destroyTableScanOperatorInfo(pTableScanInfo, 1);
|
||||||
|
@ -1456,6 +1455,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
if (pStreamScan->pColMatchInfo) {
|
if (pStreamScan->pColMatchInfo) {
|
||||||
taosArrayDestroy(pStreamScan->pColMatchInfo);
|
taosArrayDestroy(pStreamScan->pColMatchInfo);
|
||||||
}
|
}
|
||||||
|
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||||
blockDataDestroy(pStreamScan->pRes);
|
blockDataDestroy(pStreamScan->pRes);
|
||||||
blockDataDestroy(pStreamScan->pUpdateRes);
|
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||||
blockDataDestroy(pStreamScan->pPullDataRes);
|
blockDataDestroy(pStreamScan->pPullDataRes);
|
||||||
|
|
|
@ -1537,7 +1537,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
for (int32_t i = 0; i < size; i++) {
|
for (int32_t i = 0; i < size; i++) {
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
|
||||||
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
||||||
/*taosMemoryFreeClear(pChildOp->info);*/
|
|
||||||
taosMemoryFreeClear(pChildOp);
|
taosMemoryFreeClear(pChildOp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue