enh: rsma result

This commit is contained in:
kailixu 2023-11-09 11:23:50 +08:00
parent 3495efaac7
commit 349e190120
3 changed files with 39 additions and 56 deletions

View File

@ -149,6 +149,7 @@ struct SRSmaInfoItem {
tmr_h tmrId;
void *pStreamState;
void *pStreamTask; // SStreamTask
SArray *pResList;
};
struct SRSmaInfo {
@ -218,7 +219,7 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
int32_t smaPreClose(SSma *pSma);
// rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);

View File

@ -179,7 +179,7 @@ static void tRSmaInfoHashFreeNode(void *data) {
if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) {
taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES);
}
tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true);
tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo);
}
}

View File

@ -45,7 +45,7 @@ static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo);
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
int32_t execType, SArray **ppResList, int8_t *streamFlushed);
int32_t execType, int8_t *streamFlushed);
static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables);
@ -74,41 +74,39 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
*
* @param pSma
* @param pInfo
* @param isDeepFree Only stop tmrId and free pTSchema for deep free
* @return void*
*/
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (pInfo) {
if (isDeepFree) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
pInfo->suid, i + 1);
taosTmrStopA(&pItem->tmrId);
}
if (pItem->pStreamState) {
streamStateClose(pItem->pStreamState, false);
}
if (pItem->pStreamTask) {
tFreeStreamTask(pItem->pStreamTask);
}
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
if (pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
pInfo->suid, i + 1);
taosTmrStopA(&pItem->tmrId);
}
taosMemoryFreeClear(pInfo->pTSchema);
if (pItem->pStreamState) {
streamStateClose(pItem->pStreamState, false);
}
if (pInfo->queue) {
taosCloseQueue(pInfo->queue);
pInfo->queue = NULL;
}
if (pInfo->qall) {
taosFreeQall(pInfo->qall);
pInfo->qall = NULL;
if (pItem->pStreamTask) {
tFreeStreamTask(pItem->pStreamTask);
}
taosArrayDestroy(pItem->pResList);
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
}
taosMemoryFreeClear(pInfo->pTSchema);
if (pInfo->queue) {
taosCloseQueue(pInfo->queue);
pInfo->queue = NULL;
}
if (pInfo->qall) {
taosFreeQall(pInfo->qall);
pInfo->qall = NULL;
}
taosMemoryFree(pInfo);
@ -311,6 +309,10 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
return TSDB_CODE_FAILED;
}
if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) {
return TSDB_CODE_FAILED;
}
if (pItem->fetchResultVer < pItem->submitReqVer) {
// fetch the data when reboot
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE;
@ -406,7 +408,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_SUCCESS;
_err:
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
tdFreeRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED;
}
@ -623,27 +625,14 @@ _end:
}
static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo,
int32_t execType, SArray **ppResList, int8_t *streamFlushed) {
int32_t execType, int8_t *streamFlushed) {
int32_t code = 0;
int32_t lino = 0;
SSDataBlock *output = NULL;
SArray *pResList = NULL;
SArray *pResList = pItem->pResList;
STSchema *pTSchema = pInfo->pTSchema;
int64_t suid = pInfo->suid;
if (!(*ppResList)) {
pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*ppResList = pResList;
} else {
pResList = *ppResList;
}
taosArrayClear(pResList);
while (1) {
uint64_t ts;
bool hasMore = false;
@ -812,7 +801,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
int32_t idx = level - 1;
void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx);
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
SArray *pResList = NULL;
if (!qTaskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
@ -845,9 +833,8 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize,
atomic_store_64(&pItem->submitReqVer, packData->ver);
}
terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, &pResList, NULL);
terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL);
taosArrayDestroy(pResList);
return terrno ? TSDB_CODE_FAILED : TDB_CODE_SUCCESS;
}
@ -1135,7 +1122,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t nTaskInfo = 0;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SArray *pResList = NULL;
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
@ -1178,7 +1164,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
int8_t streamFlushed = 0;
code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo,
STREAM_CHECKPOINT, &pResList, &streamFlushed);
STREAM_CHECKPOINT, &streamFlushed);
if (code) {
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
@ -1265,7 +1251,6 @@ _checkpoint:
}
} while (0);
_exit:
taosArrayDestroy(pResList);
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
}
@ -1391,7 +1376,6 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) {
*/
static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
SArray *pResList = NULL;
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
@ -1422,7 +1406,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err;
}
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, &pResList, NULL) < 0) {
if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL) < 0) {
atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno);
goto _err;
}
@ -1437,10 +1421,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
}
_end:
taosArrayDestroy(pResList);
return TSDB_CODE_SUCCESS;
_err:
taosArrayDestroy(pResList);
return TSDB_CODE_FAILED;
}