enh: rsma checkpoint

This commit is contained in:
kailixu 2023-11-02 18:56:36 +08:00
parent 2d597659bc
commit fa5d896787
1 changed files with 93 additions and 58 deletions

View File

@ -92,7 +92,9 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
streamStateClose(pItem->pStreamState, false);
}
taosMemoryFreeClear(pItem->pStreamTask);
if (pItem->pStreamTask) {
tFreeStreamTask(pItem->pStreamTask);
}
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
}
@ -173,8 +175,8 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
return TSDB_CODE_FAILED;
}
void *pIter = taosHashIterate(pStore->uidHash, NULL);
while (pIter) {
void *pIter = NULL;
while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SArray *pTbUids = *(SArray **)pIter;
@ -182,8 +184,6 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
taosHashCancelIterate(pStore->uidHash, pIter);
return TSDB_CODE_FAILED;
}
pIter = taosHashIterate(pStore->uidHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
@ -234,11 +234,12 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx) {
if ((param->qmsgLen > 0) && param->qmsg[idx]) {
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SVnode *pVnode = pSma->pVnode;
char taskInfDir[TSDB_FILENAME_LEN] = {0};
void *pStreamState = NULL;
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
SRetention *pRetention = SMA_RETENTION(pSma);
STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma);
SVnode *pVnode = pSma->pVnode;
char taskInfDir[TSDB_FILENAME_LEN] = {0};
void *pStreamState = NULL;
// set the backend of stream state
tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir);
@ -258,32 +259,30 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pItem->pStreamTask = pStreamTask;
pStreamTask->id.taskId = 0;
pStreamTask->id.streamId = pRSmaInfo->suid + idx;
pStreamTask->chkInfo.startTs = taosGetTimestampMs();
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamTask->exec.qmsg = taosMemoryMalloc(2);
sprintf(pStreamTask->exec.qmsg, "%d", idx);
pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId;
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
taosMemoryFreeClear(pStreamTask);
return TSDB_CODE_FAILED;
}
pItem->pStreamState = pStreamState;
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState};
initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
taosMemoryFreeClear(pStreamTask);
return TSDB_CODE_FAILED;
}
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
pItem->pStreamState = pStreamState;
pItem->pStreamTask = pStreamTask;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
@ -509,11 +508,10 @@ static void tdUidStoreDestory(STbUidStore *pStore) {
if (pStore->uidHash) {
if (pStore->tbUids) {
// When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
void *pIter = taosHashIterate(pStore->uidHash, NULL);
while (pIter) {
void *pIter = NULL;
while ((pIter = taosHashIterate(pStore->uidHash, pIter))) {
SArray *arr = *(SArray **)pIter;
taosArrayDestroy(arr);
pIter = taosHashIterate(pStore->uidHash, pIter);
}
}
taosHashCleanup(pStore->uidHash);
@ -1082,62 +1080,77 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SArray *pResList = NULL;
SRSmaFS fs = {0};
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
void *infoHash = NULL;
// stream state: trigger checkpoint
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
TSDB_CHECK_CODE(code, lino, _exit);
pRSmaInfo->items[i].streamFlushed = 0;
++nTaskInfo;
}
}
}
// stream state: process checkpoint response in async mode
int32_t nStreamFlushed = 0;
int32_t nSleep = 0;
while (true) {
do {
void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
int8_t streamFlushed = 0;
code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema,
pRSmaInfo->suid, &pResList, &streamFlushed);
TSDB_CHECK_CODE(code, lino, _exit);
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
nStreamFlushed, nTaskInfo);
goto _checkpoint;
if (pRSmaInfo->taskInfo[i]) {
code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT);
if (code) {
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
pRSmaInfo->items[i].streamFlushed = 0;
++nTaskInfo;
}
}
}
taosUsleep(10);
++nSleep;
smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
nStreamFlushed, nTaskInfo);
}
// stream state: build checkpoint in backend
} while (0);
// stream state: wait checkpoint ready in async mode
do {
int32_t nStreamFlushed = 0;
int32_t nSleep = 0;
void *infoHash = NULL;
while (true) {
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
int8_t streamFlushed = 0;
code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema,
pRSmaInfo->suid, &pResList, &streamFlushed);
if (code) {
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
nStreamFlushed, nTaskInfo);
taosHashCancelIterate(pInfoHash, infoHash);
goto _checkpoint;
}
}
}
}
taosUsleep(10);
++nSleep;
smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
nStreamFlushed, nTaskInfo);
}
} while (0);
_checkpoint:
// stream state: build checkpoint in backend
do {
void *infHash = NULL;
while ((infHash = taosHashIterate(pInfoHash, infHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infHash;
void *infoHash = NULL;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
@ -1150,7 +1163,26 @@ _checkpoint:
pTask->checkpointingId = taosGetTimestampNs();
pTask->chkInfo.checkpointId = pTask->checkpointingId;
code = streamTaskBuildCheckpoint(pTask);
TSDB_CHECK_CODE(code, lino, _exit);
if (code) {
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosWLockLatch(&pTask->pMeta->lock);
if (streamMetaSaveTask(pTask->pMeta, pTask) != 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
code = TSDB_CODE_OUT_OF_MEMORY;
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (streamMetaCommit(pTask->pMeta) != 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
code = TSDB_CODE_OUT_OF_MEMORY;
taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosWUnLockLatch(&pTask->pMeta->lock);
// save checkpointId to vnode.json
(pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId;
@ -1158,6 +1190,8 @@ _checkpoint:
smaInfo("vgId:%d, commit task:%p, build stream checkpoint success, table:%" PRIi64
", level:%d, checkpointId:%" PRIi64,
TD_VID(pVnode), pTask, pRSmaInfo->suid, i + 1, pTask->checkpointingId);
}
}
}
@ -1452,6 +1486,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (ASSERTS(oldVal >= 0, "oldVal of nFetchAll: %d < 0", oldVal)) {
code = TSDB_CODE_APP_ERROR;
taosHashCancelIterate(infoHash, pIter);
TSDB_CHECK_CODE(code, lino, _exit);
}