enh: rsma level 2/3 submitReq msg use wal version
This commit is contained in:
parent
39b768153f
commit
00a9598ac8
|
@ -67,8 +67,6 @@ struct STSmaStat {
|
|||
struct SRSmaStat {
|
||||
SSma *pSma;
|
||||
int64_t commitAppliedVer; // vnode applied version for async commit
|
||||
int64_t commitSubmitVer; // rsma submit version for async commit
|
||||
int64_t submitVer; // latest submit version
|
||||
int64_t refId; // shared by fetch tasks
|
||||
int8_t triggerStat; // shared by fetch tasks
|
||||
int8_t commitStat; // 0 not in committing, 1 in committing
|
||||
|
@ -91,7 +89,6 @@ struct SSmaStat {
|
|||
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
|
||||
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
|
||||
#define RSMA_REF_ID(r) ((r)->refId)
|
||||
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
|
||||
|
||||
struct SRSmaInfoItem {
|
||||
void *taskInfo; // qTaskInfo_t
|
||||
|
@ -223,13 +220,6 @@ struct STFInfo {
|
|||
uint32_t ftype;
|
||||
uint32_t fver;
|
||||
int64_t fsize;
|
||||
|
||||
// specific fields
|
||||
union {
|
||||
struct {
|
||||
int64_t submitVer;
|
||||
} qTaskInfo;
|
||||
};
|
||||
};
|
||||
|
||||
enum {
|
||||
|
|
|
@ -177,7 +177,6 @@ int32_t smaAsyncPostCommit(SSma* pSma);
|
|||
|
||||
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
|
||||
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
|
||||
int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level);
|
||||
|
||||
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
|
||||
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
|
||||
|
|
|
@ -146,7 +146,6 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
|
|||
|
||||
// step 3: perform persist task for qTaskInfo
|
||||
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
||||
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
|
||||
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
|
||||
|
||||
smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
|
||||
|
@ -317,7 +316,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
|||
|
||||
// step 4: others
|
||||
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
||||
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -560,17 +560,6 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
|
|||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
||||
int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
|
||||
if (level == TSDB_RETENTION_L0) {
|
||||
return pSma->pVnode->state.applied;
|
||||
}
|
||||
|
||||
SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma);
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv));
|
||||
|
||||
return atomic_load_64(&pRSmaStat->submitVer);
|
||||
}
|
||||
|
||||
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
|
||||
int8_t blkType) {
|
||||
SArray *pResult = NULL;
|
||||
|
@ -615,13 +604,16 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) {
|
||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||
taosMemoryFreeClear(pReq);
|
||||
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||
SMA_VID(pSma), suid, pItem->level, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%"PRIi64, SMA_VID(pSma),
|
||||
suid, pItem->level, output->info.version);
|
||||
|
||||
taosMemoryFreeClear(pReq);
|
||||
taosArrayClear(pResult);
|
||||
} else if (terrno == 0) {
|
||||
|
@ -908,12 +900,8 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
ASSERT(tFileInfo.qTaskInfo.submitVer > 0);
|
||||
|
||||
SSmaEnv *pRSmaEnv = pSma->pRSmaEnv;
|
||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv);
|
||||
atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer);
|
||||
smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer);
|
||||
|
||||
SRSmaQTaskInfoIter fIter = {0};
|
||||
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
||||
|
@ -1266,7 +1254,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
|||
}
|
||||
|
||||
if (isFileCreated) {
|
||||
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->commitSubmitVer);
|
||||
if (tdUpdateTFileHeader(&tFile) < 0) {
|
||||
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
|
||||
tstrerror(terrno));
|
||||
|
@ -1346,6 +1333,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
|||
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
|
||||
|
||||
tdUnRefRSmaInfo(pSma, pRSmaInfo);
|
||||
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
|
||||
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
|
||||
} break;
|
||||
case TASK_TRIGGER_STAT_PAUSED: {
|
||||
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
|
||||
|
|
|
@ -32,9 +32,6 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
|
|||
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
|
||||
tlen += taosEncodeFixedU32(buf, pInfo->fver);
|
||||
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
|
||||
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
|
||||
tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer);
|
||||
}
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
@ -44,10 +41,6 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
|
|||
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
|
||||
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
|
||||
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
|
||||
// specific
|
||||
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
|
||||
buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer));
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
|
|
@ -2265,10 +2265,6 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
|
|||
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
|
||||
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
|
||||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
|
||||
}
|
||||
|
||||
int64_t endVer = 0;
|
||||
if (pCond->endVersion ==
|
||||
-1) { // user not specified end version, set current maximum version of vnode as the endVersion
|
||||
|
|
Loading…
Reference in New Issue