chore: reopen rsma stream commit

This commit is contained in:
kailixu 2023-10-30 19:45:18 +08:00
parent 59dd4eaa39
commit 89651626e3
4 changed files with 6 additions and 4 deletions

View File

@ -218,7 +218,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName); void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);

View File

@ -178,7 +178,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit; if (!isCommit) goto _exit;
// code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());

View File

@ -1052,13 +1052,14 @@ _err:
return code; return code;
} }
#if 0 #if 1
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSma *pSma = pRSmaStat->pSma; SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SRSmaFS fs = {0}; SRSmaFS fs = {0};
smaInfo("prop:%s:%d start", __func__, __LINE__);
if (taosHashGetSize(pInfoHash) <= 0) { if (taosHashGetSize(pInfoHash) <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1075,6 +1076,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
if (pItem && pItem->pStreamState) { if (pItem && pItem->pStreamState) {
smaInfo("prop:%s:%d streamState commit", __func__, __LINE__);
if (streamStateCommit(pItem->pStreamState) < 0) { if (streamStateCommit(pItem->pStreamState) < 0) {
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
#todo wait for streamState checkpoint #todo wait for streamState checkpoint
return 1 #return 1
print =============== create database with retentions print =============== create database with retentions
sql create database d0 retentions -:7d,5m:21d,15m:365d; sql create database d0 retentions -:7d,5m:21d,15m:365d;