fix(vnd): save vnode info at the begining of vnode prepare commit
This commit is contained in:
parent
028cc52846
commit
3ea9fadfc8
|
@ -184,16 +184,51 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static void vnodePrepareCommit(SVnode *pVnode) {
|
||||
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
char dir[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
tsem_wait(&pVnode->canCommit);
|
||||
|
||||
pVnode->state.commitTerm = pVnode->state.applyTerm;
|
||||
|
||||
pInfo->info.config = pVnode->config;
|
||||
pInfo->info.state.committed = pVnode->state.applied;
|
||||
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
|
||||
pInfo->info.state.commitID = pVnode->state.commitID;
|
||||
pInfo->pVnode = pVnode;
|
||||
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
||||
|
||||
// save info
|
||||
if (pVnode->pTfs) {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||
} else {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
||||
}
|
||||
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
tsdbPrepareCommit(pVnode->pTsdb);
|
||||
metaPrepareAsyncCommit(pVnode->pMeta);
|
||||
smaPrepareAsyncCommit(pVnode->pSma);
|
||||
|
||||
metaPrepareAsyncCommit(pVnode->pMeta);
|
||||
|
||||
vnodeBufPoolUnRef(pVnode->inUse);
|
||||
pVnode->inUse = NULL;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
|
||||
tstrerror(code), pVnode->state.commitID);
|
||||
} else {
|
||||
vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeCommitTask(void *arg) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -213,27 +248,26 @@ _exit:
|
|||
int vnodeAsyncCommit(SVnode *pVnode) {
|
||||
int32_t code = 0;
|
||||
|
||||
// prepare to commit
|
||||
vnodePrepareCommit(pVnode);
|
||||
|
||||
// schedule the task
|
||||
pVnode->state.commitTerm = pVnode->state.applyTerm;
|
||||
|
||||
SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
|
||||
if (NULL == pInfo) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
pInfo->info.config = pVnode->config;
|
||||
pInfo->info.state.committed = pVnode->state.applied;
|
||||
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
|
||||
pInfo->info.state.commitID = pVnode->state.commitID;
|
||||
pInfo->pVnode = pVnode;
|
||||
pInfo->txn = metaGetTxn(pVnode->pMeta);
|
||||
|
||||
// prepare to commit
|
||||
code = vnodePrepareCommit(pVnode, pInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// schedule the task
|
||||
vnodeScheduleTask(vnodeCommitTask, pInfo);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (NULL != pInfo) {
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
|
||||
pVnode->state.commitID);
|
||||
} else {
|
||||
|
@ -265,16 +299,11 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// save info
|
||||
if (pVnode->pTfs) {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
|
||||
} else {
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
||||
}
|
||||
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
||||
syncBeginSnapshot(pVnode->sync, pVnode->state.applied);
|
||||
|
|
Loading…
Reference in New Issue