enh: rsma checkpoint

This commit is contained in:
kailixu 2023-11-02 15:20:54 +08:00
parent 722777f8c9
commit 2d597659bc
4 changed files with 39 additions and 48 deletions

View File

@ -630,7 +630,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif #endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
output = taosArrayGetP(pResList, i); output = taosArrayGetP(pResList, i);
if(output->info.type == STREAM_CHECKPOINT) { if (output->info.type == STREAM_CHECKPOINT) {
if (streamFlushed) *streamFlushed = 1; if (streamFlushed) *streamFlushed = 1;
continue; continue;
} }
@ -1087,7 +1087,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
if (taosHashGetSize(pInfoHash) <= 0) { if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void *infoHash = NULL; void *infoHash = NULL;
// stream state: trigger checkpoint // stream state: trigger checkpoint
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
@ -1120,7 +1119,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
pRSmaInfo->suid, &pResList, &streamFlushed); pRSmaInfo->suid, &pResList, &streamFlushed);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
smaInfo("%s:%d checkpoint ready, %d us consumed, received/total: %d/%d", __func__, __LINE__, nSleep * 10, smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
nStreamFlushed, nTaskInfo); nStreamFlushed, nTaskInfo);
goto _checkpoint; goto _checkpoint;
} }
@ -1129,14 +1128,16 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
} }
taosUsleep(10); taosUsleep(10);
++nSleep; ++nSleep;
smaInfo("%s:%d wait for checkpoint ready, %d us elapsed, received/total: %d/%d", __func__, __LINE__, nSleep * 10, smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10,
nStreamFlushed, nTaskInfo); nStreamFlushed, nTaskInfo);
} }
// stream state: build checkpoint in backend // stream state: build checkpoint in backend
_checkpoint: _checkpoint:
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { do {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; void *infHash = NULL;
while ((infHash = taosHashIterate(pInfoHash, infHash))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infHash;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
continue; continue;
} }
@ -1160,7 +1161,7 @@ _checkpoint:
} }
} }
} }
} while (0);
_exit: _exit:
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
if (code) { if (code) {

View File

@ -291,7 +291,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit;
code = smaPrepareAsyncCommit(pVnode->pSma); code = smaPrepareAsyncCommit(pVnode->pSma); // prepare checkpointId and save to vnode.json
if (code) goto _exit; if (code) goto _exit;
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;

View File

@ -1967,19 +1967,9 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
} }
for(int32_t i=0; i< taosArrayGetSize(pTableList->pTableList); ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, i);
if(pKeyInfo->uid == uid) {
assert(0);
}
}
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
taosArrayPush(pTableList->pTableList, &keyInfo); taosArrayPush(pTableList->pTableList, &keyInfo);
if(taosHashGet(pTableList->map, &uid, sizeof(uid))) {
assert(0);
}
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot)); taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));

View File

@ -2332,7 +2332,7 @@ FETCH_NEXT_BLOCK:
} }
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id); qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current); SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0); SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);