Merge pull request #17528 from taosdata/fix/TD-19682-D
fix: commit txn for rsma
This commit is contained in:
commit
780e4a489c
|
@ -211,6 +211,7 @@ int32_t smaSyncCommit(SSma* pSma);
|
||||||
int32_t smaSyncPostCommit(SSma* pSma);
|
int32_t smaSyncPostCommit(SSma* pSma);
|
||||||
int32_t smaPreCommit(SSma* pSma);
|
int32_t smaPreCommit(SSma* pSma);
|
||||||
int32_t smaCommit(SSma* pSma);
|
int32_t smaCommit(SSma* pSma);
|
||||||
|
int32_t smaFinishCommit(SSma* pSma);
|
||||||
int32_t smaPostCommit(SSma* pSma);
|
int32_t smaPostCommit(SSma* pSma);
|
||||||
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
int32_t smaDoRetention(SSma* pSma, int64_t now);
|
||||||
|
|
||||||
|
|
|
@ -78,38 +78,69 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); }
|
||||||
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
|
int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief set rsma trigger stat active
|
* @brief prepare rsma1/2, and set rsma trigger stat active
|
||||||
*
|
*
|
||||||
* @param pSma
|
* @param pSma
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t smaBegin(SSma *pSma) {
|
int32_t smaBegin(SSma *pSma) {
|
||||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
int32_t code = 0;
|
||||||
if (!pSmaEnv) {
|
SVnode *pVnode = pSma->pVnode;
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
|
if ((code = tsdbBegin(VND_RSMA1(pVnode))) < 0) {
|
||||||
|
smaError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
if ((code = tsdbBegin(VND_RSMA2(pVnode))) < 0) {
|
||||||
|
smaError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// set trigger stat
|
||||||
|
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
||||||
|
if (!pSmaEnv) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||||
int8_t rsmaTriggerStat =
|
int8_t rsmaTriggerStat =
|
||||||
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
|
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
|
||||||
switch (rsmaTriggerStat) {
|
switch (rsmaTriggerStat) {
|
||||||
case TASK_TRIGGER_STAT_PAUSED: {
|
case TASK_TRIGGER_STAT_PAUSED: {
|
||||||
smaDebug("vgId:%d, rsma trigger stat from paused to active", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma trigger stat from paused to active", TD_VID(pVnode));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TASK_TRIGGER_STAT_INIT: {
|
case TASK_TRIGGER_STAT_INIT: {
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
||||||
smaDebug("vgId:%d, rsma trigger stat from init to active", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma trigger stat from init to active", TD_VID(pVnode));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE);
|
||||||
smaError("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat);
|
smaWarn("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", TD_VID(pVnode), rsmaTriggerStat);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
_exit:
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t smaFinishCommit(SSma *pSma) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
|
||||||
|
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
|
||||||
|
smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) {
|
||||||
|
smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
_exit:
|
||||||
|
terrno = code;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -309,15 +340,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
||||||
* 1) This is high cost task and should not put in asyncPreCommit originally.
|
* 1) This is high cost task and should not put in asyncPreCommit originally.
|
||||||
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
|
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
|
||||||
*/
|
*/
|
||||||
nLoops = 0;
|
|
||||||
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
|
|
||||||
++nLoops;
|
|
||||||
if (nLoops > 1000) {
|
|
||||||
sched_yield();
|
|
||||||
nLoops = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma),
|
smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma),
|
||||||
(void *)taosGetSelfPthreadId());
|
(void *)taosGetSelfPthreadId());
|
||||||
nLoops = 0;
|
nLoops = 0;
|
||||||
|
@ -368,10 +390,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
||||||
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
|
|
||||||
if (!pSmaEnv) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
#if 0
|
#if 0
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
|
||||||
|
|
||||||
|
@ -380,10 +398,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if ((code = tsdbCommit(VND_RSMA0(pVnode))) < 0) {
|
|
||||||
smaError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code));
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) {
|
if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) {
|
||||||
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
|
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
|
|
@ -51,20 +51,11 @@ int vnodeBegin(SVnode *pVnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->pSma) {
|
|
||||||
if (VND_RSMA1(pVnode) && tsdbBegin(VND_RSMA1(pVnode)) < 0) {
|
|
||||||
vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (VND_RSMA2(pVnode) && tsdbBegin(VND_RSMA2(pVnode)) < 0) {
|
|
||||||
vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// begin sma
|
// begin sma
|
||||||
smaBegin(pVnode->pSma); // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged
|
if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) {
|
||||||
|
vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -239,10 +230,8 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
||||||
|
|
||||||
if (smaPreCommit(pVnode->pSma) < 0) {
|
code = smaPreCommit(pVnode->pSma);
|
||||||
vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeBufPoolUnRef(pVnode->inUse);
|
vnodeBufPoolUnRef(pVnode->inUse);
|
||||||
pVnode->inUse = NULL;
|
pVnode->inUse = NULL;
|
||||||
|
@ -253,14 +242,12 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (VND_IS_RSMA(pVnode)) {
|
|
||||||
if (smaCommit(pVnode->pSma) < 0) {
|
|
||||||
vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
code = tsdbCommit(pVnode->pTsdb);
|
code = tsdbCommit(pVnode->pTsdb);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (VND_IS_RSMA(pVnode)) {
|
||||||
|
code = smaCommit(pVnode->pSma);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqCommit(pVnode->pTq) < 0) {
|
if (tqCommit(pVnode->pTq) < 0) {
|
||||||
|
@ -274,7 +261,13 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbFinishCommit(pVnode->pTsdb);
|
code = tsdbFinishCommit(pVnode->pTsdb);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (VND_IS_RSMA(pVnode)) {
|
||||||
|
code = smaFinishCommit(pVnode->pSma);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
if (metaFinishCommit(pVnode->pMeta) < 0) {
|
if (metaFinishCommit(pVnode->pMeta) < 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
|
Loading…
Reference in New Issue