Merge pull request #18458 from taosdata/fix/TD-20654-3.0
fix: asan problems for rsma
This commit is contained in:
commit
5cce33b45b
|
@ -660,6 +660,13 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tdBlockDataDestroy(SArray *pBlockArr) {
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
|
||||||
|
blockDataDestroy(taosArrayGetP(pBlockArr, i));
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pBlockArr);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
|
||||||
int64_t suid) {
|
int64_t suid) {
|
||||||
SArray *pResList = taosArrayInit(1, POINTER_BYTES);
|
SArray *pResList = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
@ -701,38 +708,42 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
#endif
|
#endif
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
|
||||||
SSDataBlock *output = taosArrayGetP(pResList, i);
|
SSDataBlock *output = taosArrayGetP(pResList, i);
|
||||||
smaDebug("result block, uid:%"PRIu64", groupid:%"PRIu64", rows:%d", output->info.uid, output->info.groupId,
|
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.uid, output->info.groupId,
|
||||||
output->info.rows);
|
output->info.rows);
|
||||||
|
|
||||||
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
||||||
SSubmitReq *pReq = NULL;
|
SSubmitReq *pReq = NULL;
|
||||||
|
|
||||||
// TODO: the schema update should be handled later(TD-17965)
|
// TODO: the schema update should be handled later(TD-17965)
|
||||||
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
|
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) {
|
||||||
smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%"PRIu64", level %" PRIi8 " failed since %s", SMA_VID(pSma),
|
smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8
|
||||||
suid, output->info.groupId, pItem->level, terrstr());
|
" failed since %s",
|
||||||
|
SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s",
|
smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8
|
||||||
|
" failed since %s",
|
||||||
SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr());
|
SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
|
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64
|
||||||
SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen));
|
" len %" PRIu32,
|
||||||
|
SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version,
|
||||||
|
htonl(pReq->header.contLen));
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pResList);
|
tdBlockDataDestroy(pResList);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
taosArrayDestroy(pResList);
|
tdBlockDataDestroy(pResList);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -820,8 +831,7 @@ static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
|
||||||
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
|
||||||
ERsmaExecType type, int8_t level) {
|
ERsmaExecType type, int8_t level) {
|
||||||
int32_t idx = level - 1;
|
int32_t idx = level - 1;
|
||||||
|
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
|
||||||
void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
|
|
||||||
if (!qTaskInfo) {
|
if (!qTaskInfo) {
|
||||||
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
|
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
|
||||||
pInfo->suid);
|
pInfo->suid);
|
||||||
|
|
|
@ -1510,10 +1510,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
||||||
|
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset the error code.
|
||||||
|
terrno = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (filter) {
|
if (filter) {
|
||||||
|
|
Loading…
Reference in New Issue