fix sma merge error

This commit is contained in:
Hongze Cheng 2023-07-14 13:46:01 +08:00
parent 5a89b96e91
commit 25120cede4
3 changed files with 16 additions and 15 deletions

View File

@ -179,8 +179,8 @@ SArray* metaGetSmaTbUids(SMeta* pMeta);
void* metaGetIdx(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int64_t metaGetTbNum(SMeta *pMeta); int64_t metaGetTbNum(SMeta* pMeta);
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderDoInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
@ -197,12 +197,12 @@ int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pR
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbPrepareCommit(STsdb* pTsdb); // int32_t tsdbPrepareCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb); // int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb); // int32_t tsdbRollbackCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);

View File

@ -103,15 +103,16 @@ _exit:
return code; return code;
} }
extern int32_t tsdbCommitCommit(STsdb *tsdb);
int32_t smaFinishCommit(SSma *pSma) { int32_t smaFinishCommit(SSma *pSma) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { if (VND_RSMA1(pVnode) && (code = tsdbCommitCommit(VND_RSMA1(pVnode))) < 0) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) { if (VND_RSMA2(pVnode) && (code = tsdbCommitCommit(VND_RSMA2(pVnode))) < 0) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
@ -130,6 +131,7 @@ _exit:
* @param isCommit * @param isCommit
* @return int32_t * @return int32_t
*/ */
extern int32_t tsdbPreCommit(STsdb *tsdb);
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -186,11 +188,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
// all rsma results are written completely // all rsma results are written completely
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
if ((pTsdb = VND_RSMA1(pSma->pVnode))) { if ((pTsdb = VND_RSMA1(pSma->pVnode))) {
code = tsdbPrepareCommit(pTsdb); code = tsdbPreCommit(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if ((pTsdb = VND_RSMA2(pSma->pVnode))) { if ((pTsdb = VND_RSMA2(pSma->pVnode))) {
code = tsdbPrepareCommit(pTsdb); code = tsdbPreCommit(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -207,6 +209,7 @@ _exit:
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
extern int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -217,10 +220,10 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
goto _exit; goto _exit;
} }
code = tsdbCommit(VND_RSMA1(pVnode), pInfo); code = tsdbCommitBegin(VND_RSMA1(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA2(pVnode), pInfo); code = tsdbCommitBegin(VND_RSMA2(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:

View File

@ -21,8 +21,6 @@
#include "tsdbSttFileRW.h" #include "tsdbSttFileRW.h"
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
// STsdbSnapReader ======================================== // STsdbSnapReader ========================================
struct STsdbSnapReader { struct STsdbSnapReader {