chore: rsma checkpoint verify

This commit is contained in:
kailixu 2023-11-01 20:05:00 +08:00
parent c32e60d199
commit 3803f952f9
6 changed files with 156 additions and 42 deletions

View File

@ -115,6 +115,7 @@ struct SRSmaStat {
SRSmaFS fs; // for recovery/snapshot r/w
SHashObj *infoHash; // key: suid, value: SRSmaInfo
tsem_t notEmpty; // has items in queue buffer
SSDataBlock dataBlock;
};
struct SSmaStat {
@ -140,7 +141,8 @@ struct SRSmaInfoItem {
int8_t fetchLevel : 4;
int8_t triggerStat;
uint16_t nScanned;
int32_t maxDelay; // ms
int32_t streamFlushed : 1;
int32_t maxDelay : 31; // ms
tmr_h tmrId;
void *pStreamState;
void *pStreamTask; // SStreamTask
@ -159,7 +161,6 @@ struct SRSmaInfo {
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
SSDataBlock dataBlock;
};
#define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)

View File

@ -178,6 +178,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit;
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
pRSmaStat->pSma = (SSma *)pSma;
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
tsem_init(&pRSmaStat->notEmpty, 0, 0);
pRSmaStat->dataBlock.info.type = STREAM_CHECKPOINT;
// init smaMgmt
smaInit();

View File

@ -78,32 +78,26 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l
*/
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (pInfo) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (isDeepFree) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i];
if (isDeepFree && pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
pInfo->suid, i + 1);
taosTmrStopA(&pItem->tmrId);
}
if (pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
pInfo->suid, i + 1);
taosTmrStopA(&pItem->tmrId);
}
if (isDeepFree && pItem->pStreamState) {
streamStateClose(pItem->pStreamState, false);
}
if (pItem->pStreamState) {
streamStateClose(pItem->pStreamState, false);
}
if(isDeepFree && pItem->pStreamTask) {
taosMemoryFreeClear(pItem->pStreamTask);
}
if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
}
}
if (isDeepFree) {
taosMemoryFreeClear(pInfo->pTSchema);
}
if (isDeepFree) {
taosMemoryFreeClear(pInfo->pTSchema);
if (pInfo->queue) {
taosCloseQueue(pInfo->queue);
pInfo->queue = NULL;
@ -286,13 +280,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
return TSDB_CODE_FAILED;
}
if (pStreamTask->chkInfo.checkpointId != -1) {
SSDataBlock *pDataBlock = &pRSmaInfo->dataBlock;
if ((terrno = qSetSMAInput(pRSmaInfo->taskInfo[idx], pDataBlock, 1, STREAM_INPUT__CHECKPOINT)) < 0) {
return TSDB_CODE_FAILED;
}
}
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
pItem->pStreamState = pStreamState;
@ -370,7 +357,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
pRSmaInfo->pSma = pSma;
pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->suid = suid;
pRSmaInfo->dataBlock.info.type = STREAM_CHECKPOINT;
T_REF_INIT_VAL(pRSmaInfo, 1);
if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) ||
@ -1075,22 +1061,145 @@ _err:
return code;
}
#if 1
static int32_t tdRSmaExecVerifyCheckPoint(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, SArray **ppResList, int8_t *streamFlushed) {
int32_t code = 0;
int32_t lino = 0;
SSDataBlock *output = NULL;
SArray *pResList = NULL;
if (!(*ppResList)) {
pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*ppResList = pResList;
} else {
pResList = *ppResList;
}
while (1) {
uint64_t ts;
bool hasMore = false;
code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL);
if (code == TSDB_CODE_QRY_IN_EXEC) {
code = 0;
break;
}
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pResList) == 0) {
break;
}
#if 0
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResList, flag);
#endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
output = taosArrayGetP(pResList, i);
if(output->info.type == STREAM_CHECKPOINT) {
if (streamFlushed) *streamFlushed = 1;
continue;
}
smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRIi64, SMA_VID(pSma),
output->info.id.uid, output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
TSDB_CHECK_CODE(code, lino, _exit);
}
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, output->info.version);
if (pReq) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
}
}
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
", ver:%" PRIi64,
SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
output ? output->info.version : -1);
} else {
smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level);
}
taosArrayDestroy(pResList);
qCleanExecTaskBlockBuf(taskInfo);
return code;
}
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0;
int32_t lino = 0;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SRSmaFS fs = {0};
int32_t code = 0;
int32_t lino = 0;
int32_t nTaskInfo = 0;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SSDataBlock *pDataBlock = &pRSmaStat->dataBlock;
SArray *pResList = NULL;
SRSmaFS fs = {0};
if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS;
}
void *infoHash = NULL;
// stream state: trigger checkpoint
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
code = qSetSMAInput(pRSmaInfo->taskInfo[i], pDataBlock, 1, STREAM_INPUT__CHECKPOINT);
TSDB_CHECK_CODE(code, lino, _exit);
pRSmaInfo->items[i].streamFlushed = 0;
++nTaskInfo;
}
}
}
// stream state: process checkpoint response in async mode
int32_t nStreamFlushed = 0;
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) {
int8_t streamFlushed = 0;
code = tdRSmaExecVerifyCheckPoint(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema,
pRSmaInfo->suid, &pResList, &streamFlushed);
TSDB_CHECK_CODE(code, lino, _exit);
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
goto _checkpoint;
}
}
}
}
// stream state: build checkpoint in backend
_checkpoint:
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue;
}
@ -1100,11 +1209,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
if (pItem && pItem->pStreamTask) {
SStreamTask *pTask = pItem->pStreamTask;
// adaption for API streamTaskBuildCheckpoint
atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
atomic_store_32(&pTask->pMeta->chkptNotReadyTasks, 1);
pTask->checkpointingId = taosGetTimestampNs();
code = streamTaskBuildCheckpoint(pTask);
TSDB_CHECK_CODE(code, lino, _exit);
// save checkpointId to vnode.json
(pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId;
@ -1123,7 +1232,7 @@ _exit:
terrno = code;
return code;
}
#endif
/**
* @brief trigger to get rsma result in async mode
*

View File

@ -646,7 +646,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
blockIndex += 1;
current += p->info.rows;
ASSERT(p->info.rows > 0);
ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT);
taosArrayPush(pResList, &p);
if (current >= rowsThreshold) {

View File

@ -2331,11 +2331,12 @@ FETCH_NEXT_BLOCK:
return NULL;
}
int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
// SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
SSDataBlock* pBlock = pData->pDataBlock;
if (pBlock->info.type == STREAM_CHECKPOINT) {
streamScanOperatorSaveCheckpoint(pInfo);