enh: rsma code optimization
This commit is contained in:
parent
4b3d989168
commit
65e1cce6df
|
@ -162,8 +162,8 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
|
||||||
pInfo->iQall = NULL;
|
pInfo->iQall = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1524,8 +1524,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
||||||
|
|
||||||
if (!pStat) {
|
if (!pStat) {
|
||||||
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
|
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)",
|
||||||
pRSmaInfo->refId);
|
smaMgmt.rsetId, pRSmaInfo->refId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1741,7 +1741,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
|
||||||
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
|
tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
|
if (type == RSMA_EXEC_OVERFLOW) {
|
||||||
|
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
|
||||||
|
}
|
||||||
|
|
||||||
if (qallItemSize > 0) {
|
if (qallItemSize > 0) {
|
||||||
// subtract the item size after the task finished, commit should wait for all items be consumed
|
// subtract the item size after the task finished, commit should wait for all items be consumed
|
||||||
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
|
||||||
|
|
Loading…
Reference in New Issue