From 466f325514f67e4ba4217a60cccfcb067d62b4bc Mon Sep 17 00:00:00 2001 From: danielclow <106956386+danielclow@users.noreply.github.com> Date: Mon, 24 Oct 2022 11:02:03 +0800 Subject: [PATCH 1/8] Update README.md Update corporate twitter account link --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8d2567a816..44aeaecf07 100644 --- a/README.md +++ b/README.md @@ -349,6 +349,6 @@ Please follow the [contribution guidelines](CONTRIBUTING.md) to contribute to th For more information about TDengine, you can follow us on social media and join our Discord server: - [Discord](https://discord.com/invite/VZdSuUg4pS) -- [Twitter](https://twitter.com/TaosData) +- [Twitter](https://twitter.com/TDengineDB) - [LinkedIn](https://www.linkedin.com/company/tdengine/) - [YouTube](https://www.youtube.com/channel/UCmp-1U6GS_3V3hjir6Uq5DQ) From 68db9b1cc919cbb0b0b70efe2afeda506b74cbf4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Nov 2022 18:48:57 +0800 Subject: [PATCH 2/8] fix(query): optimize the performance of tsdbread. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 150 ++++++++++++++++++------- 1 file changed, 109 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b6e8362092..d4dcee52ea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -132,7 +132,7 @@ typedef struct SReaderStatus { bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not SHashObj* pTableMap; // SHash - STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. + STableBlockScanInfo**pTableIter; // table iterator used in building in-memory buffer data blocks. SUidOrderCheckInfo uidCheckInfo; // check all table in uid order SFileBlockDumpInfo fBlockDumpInfo; SDFileSet* pCurrentFileset; // current opened file set @@ -141,6 +141,12 @@ typedef struct SReaderStatus { SDataBlockIter blockIter; } SReaderStatus; +typedef struct SBlockInfoBuf { + int32_t currentIndex; + SArray* pData; + int32_t numPerBucket; +} SBlockInfoBuf; + struct STsdbReader { STsdb* pTsdb; uint64_t suid; @@ -158,9 +164,9 @@ struct STsdbReader { STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times SDataFReader* pFileReader; SVersionRange verRange; - - int32_t step; - STsdbReader* innerReader[2]; + SBlockInfoBuf blockInfoBuf; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -226,6 +232,50 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { return TSDB_CODE_SUCCESS; } +static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { + int32_t num = numOfTables / pBuf->numPerBucket; + int32_t remainder = numOfTables % pBuf->numPerBucket; + if (pBuf->pData == NULL) { + pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); + } + + for(int32_t i = 0; i < num; ++i) { + char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pBuf->pData, &p); + } + + if (remainder > 0) { + char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pBuf->pData, &p); + } + + return TSDB_CODE_SUCCESS; +} + +static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) { + size_t num = taosArrayGetSize(pBuf->pData); + for(int32_t i = 0; i < num; ++i) { + char** p = taosArrayGet(pBuf->pData, i); + taosMemoryFree(*p); + } + + taosArrayDestroy(pBuf->pData); +} + +static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { + int32_t bucketIndex = index / pBuf->numPerBucket; + char** pBucket = taosArrayGet(pBuf->pData, bucketIndex); + return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo); +} + +// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) { // allocate buffer in order to load data blocks from file // todo use simple hash instead, optimize the memory consumption @@ -236,9 +286,23 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } int64_t st = taosGetTimestampUs(); + initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables); for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; + STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j); + pScanInfo->uid = idList[j].uid; + if (ASCENDING_TRAVERSE(pTsdbReader->order)) { + int64_t skey = pTsdbReader->window.skey; + pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; + } else { + int64_t ekey = pTsdbReader->window.ekey; + pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + } + + taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); + +#if 0 +// STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { int64_t skey = pTsdbReader->window.skey; info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; @@ -248,7 +312,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); - tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey, +#endif + + tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey, pTsdbReader->idStr); } @@ -260,18 +326,20 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK return pTableMap; } -static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { - STableBlockScanInfo* p = NULL; +static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { + STableBlockScanInfo** p = NULL; while ((p = taosHashIterate(pTableMap, p)) != NULL) { - p->iterInit = false; - p->iiter.hasVal = false; - if (p->iter.iter != NULL) { - p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p; + + pInfo->iterInit = false; + pInfo->iiter.hasVal = false; + if (pInfo->iter.iter != NULL) { + pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); } - p->delSkyline = taosArrayDestroy(p->delSkyline); - p->lastKey = ts; + pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); + pInfo->lastKey = ts; } } @@ -292,10 +360,10 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) { tMapDataClear(&p->mapData); } -static void destroyBlockScanInfo(SHashObj* pTableMap) { +static void destroyAllBlockScanInfo(SHashObj* pTableMap) { STableBlockScanInfo* p = NULL; while ((p = taosHashIterate(pTableMap, p)) != NULL) { - clearBlockScanInfo(p); + clearBlockScanInfo(*(STableBlockScanInfo**)p); } taosHashCleanup(pTableMap); @@ -500,7 +568,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->verRange = getQueryVerRange(pVnode, pCond, level); pReader->type = pCond->type; pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); - + pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket ASSERT(pCond->numOfCols > 0); limitOutputBufferSize(pCond, &pReader->capacity); @@ -566,7 +634,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, } // this block belongs to a table that is not queried. - void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); + void* p = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); if (p == NULL) { continue; } @@ -591,7 +659,7 @@ _end: } static void cleanupTableScanInfo(SHashObj* pTableMap) { - STableBlockScanInfo* px = NULL; + STableBlockScanInfo** px = NULL; while (1) { px = taosHashIterate(pTableMap, px); if (px == NULL) { @@ -599,8 +667,8 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { } // reset the index in last block when handing a new file - tMapDataClear(&px->mapData); - taosArrayClear(px->pBlockList); + tMapDataClear(&(*px)->mapData); + taosArrayClear((*px)->pBlockList); } } @@ -1101,7 +1169,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte break; } - STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr; + STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr; if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) { continue; } @@ -2233,7 +2301,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo != NULL) { - pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pBlockScanInfo == NULL) { code = TSDB_CODE_INVALID_PARA; tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, @@ -2253,7 +2321,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } } } else { // file blocks not exist - pBlockScanInfo = pReader->status.pTableIter; + pBlockScanInfo = *pReader->status.pTableIter; } SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2478,7 +2546,7 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea void* p = taosHashIterate(pStatus->pTableMap, NULL); while (p != NULL) { - STableBlockScanInfo* pScanInfo = p; + STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p; pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid; p = taosHashIterate(pStatus->pTableMap, p); } @@ -2552,7 +2620,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { while (1) { // load the last data block of current table - STableBlockScanInfo* pScanInfo = pStatus->pTableIter; + STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter; bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasVal) { bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); @@ -2590,9 +2658,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; if (pBlockInfo != NULL) { - pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); } else { - pScanInfo = pReader->status.pTableIter; + pScanInfo = *pReader->status.pTableIter; } if (pScanInfo == NULL) { @@ -2657,11 +2725,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } } - STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; - initMemDataIterator(pBlockScanInfo, pReader); + STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; + initMemDataIterator(*pBlockScanInfo, pReader); int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN; - int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey); + int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3464,9 +3532,9 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { ASSERT(pReader != NULL); - STableBlockScanInfo* p = NULL; + STableBlockScanInfo** p = NULL; while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) { - clearBlockScanInfo(p); + clearBlockScanInfo(*p); } taosHashClear(pReader->status.pTableMap); @@ -3501,7 +3569,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); -// resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey); // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { @@ -3679,8 +3746,9 @@ void tsdbReaderClose(STsdbReader* pReader) { cleanupDataBlockIterator(&pReader->status.blockIter); size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); - destroyBlockScanInfo(pReader->status.pTableMap); + destroyAllBlockScanInfo(pReader->status.pTableMap); blockDataDestroy(pReader->pResBlock); + clearBlockScanInfoBuf(&pReader->blockInfoBuf); if (pReader->pFileReader != NULL) { tsdbDataFReaderClose(&pReader->pFileReader); @@ -3764,7 +3832,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->step == EXTERNAL_ROWS_PREV) { // prepare for the main scan int32_t code = doOpenReaderImpl(pReader); - resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey); + resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3781,7 +3849,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) { // prepare for the next row scan int32_t code = doOpenReaderImpl(pReader->innerReader[1]); - resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey); + resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3797,7 +3865,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { - STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); + STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); if (pBlockScanInfo == NULL) { // no data block for the table of given uid return false; } @@ -3912,7 +3980,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { } SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); - STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pBlockScanInfo == NULL) { terrno = TSDB_CODE_INVALID_PARA; tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, @@ -3967,7 +4035,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { resetDataBlockIterator(&pReader->status.blockIter, pReader->order); int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1; - resetDataBlockScanInfo(pReader->status.pTableMap, ts); + resetAllDataBlockScanInfo(pReader->status.pTableMap, ts); int32_t code = 0; SDataBlockIter* pBlockIter = &pReader->status.blockIter; @@ -4072,7 +4140,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); while (pStatus->pTableIter != NULL) { - STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter; + STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STbData* d = NULL; if (pReader->pTsdb->mem != NULL) { From 3605a345cdb3b378d9271a1417ac821bc0a03984 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 1 Nov 2022 20:12:26 +0800 Subject: [PATCH 3/8] test:add replica 3 for cluster test case --- .../6-cluster/5dnode3mnodeAdd1Ddnoe.py | 8 +++-- .../5dnode3mnodeRestartDnodeInsertData.py | 22 ++++++++----- ...5dnode3mnodeRestartDnodeInsertDataAsync.py | 4 ++- .../5dnode3mnodeSep1VnodeStopDnodeCreateDb.py | 5 +-- ...5dnode3mnodeSep1VnodeStopDnodeCreateStb.py | 5 +-- .../5dnode3mnodeSep1VnodeStopMnodeCreateDb.py | 8 +++-- ...5dnode3mnodeSep1VnodeStopMnodeCreateStb.py | 5 +-- .../5dnode3mnodeSep1VnodeStopVnodeCreateDb.py | 4 ++- ...5dnode3mnodeSep1VnodeStopVnodeCreateStb.py | 15 +++++---- .../6-cluster/clusterCommonCheck.py | 23 +++++++------ tests/system-test/fulltest.sh | 33 +++++++++++-------- 11 files changed, 79 insertions(+), 53 deletions(-) diff --git a/tests/system-test/6-cluster/5dnode3mnodeAdd1Ddnoe.py b/tests/system-test/6-cluster/5dnode3mnodeAdd1Ddnoe.py index e591114ef4..f174975a8e 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeAdd1Ddnoe.py +++ b/tests/system-test/6-cluster/5dnode3mnodeAdd1Ddnoe.py @@ -31,6 +31,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() + self.replicaVar = int(replicaVar) def getBuildPath(self): @@ -118,6 +119,7 @@ class TDTestCase: rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] rowsall=rowsPerStb*paraDict['stbNumbers'] dbNumbers = 1 + paraDict['replica'] = self.replicaVar tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") @@ -167,8 +169,8 @@ class TDTestCase: threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]))) for tr in threads: tr.start() - dnode6Port=int(6030+5*100) - tdSql.execute("create dnode '%s:%d'"%(hostname,dnode6Port)) + dnode7Port=int(6030+6*100) + tdSql.execute("create dnode '%s:%d'"%(hostname,dnode7Port)) clusterComCheck.checkDnodes(dnodeNumbers) for tr in threads: tr.join() @@ -219,7 +221,7 @@ class TDTestCase: tdSql.checkRows(rowsPerStb) def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='dnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='dnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertData.py b/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertData.py index 52c9773544..5af9e55472 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertData.py +++ b/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertData.py @@ -31,6 +31,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() + self.replicaVar = int(replicaVar) def getBuildPath(self): @@ -118,6 +119,7 @@ class TDTestCase: rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] rowsall=rowsPerStb*paraDict['stbNumbers'] dbNumbers = 1 + paraDict['replica'] = self.replicaVar tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") @@ -157,7 +159,7 @@ class TDTestCase: stableName= '%s_%d'%(paraDict['stbName'],i) newTdSql=tdCom.newTdSql() clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum']) - #insert date + #insert data for i in range(paraDict['stbNumbers']): stableName= '%s_%d'%(paraDict['stbName'],i) newTdSql=tdCom.newTdSql() @@ -203,17 +205,19 @@ class TDTestCase: clusterComCheck.checkDbRows(dbNumbers) # clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"]) - tdSql.execute("use %s" %(paraDict["dbName"])) - tdSql.query("show stables") - tdSql.checkRows(paraDict["stbNumbers"]) - # for i in range(paraDict['stbNumbers']): - # stableName= '%s_%d'%(paraDict['stbName'],i) - # tdSql.query("select * from %s"%stableName) - # tdSql.checkRows(rowsPerStb) + newTdSql=tdCom.newTdSql() + newTdSql.execute("reset query cache") + newTdSql.execute("use %s" %(paraDict["dbName"])) + newTdSql.query("show %s.stables"%(paraDict["dbName"])) + newTdSql.checkRows(paraDict["stbNumbers"]) + for i in range(paraDict['stbNumbers']): + stableName= '%s_%d'%(paraDict['stbName'],i) + newTdSql.query("select * from %s"%stableName) + newTdSql.checkRows(rowsPerStb) def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=1,stopRole='dnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='dnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py b/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py index 4d641b0e27..1c488cab5f 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py +++ b/tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py @@ -31,6 +31,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() + self.replicaVar = int(replicaVar) def getBuildPath(self): @@ -118,6 +119,7 @@ class TDTestCase: rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"] rowsall=rowsPerStb*paraDict['stbNumbers'] dbNumbers = 1 + paraDict['replica'] = self.replicaVar tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") @@ -214,7 +216,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=1,stopRole='dnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py index eba456d1c2..93d60e6561 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py @@ -30,7 +30,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() - + self.replicaVar=int(replicaVar) def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -113,6 +113,7 @@ class TDTestCase: vnodeNumbers = int(dnodeNumbers-mnodeNums) allDbNumbers=(paraDict['dbNumbers']*restartNumbers) allStbNumbers=(paraDict['stbNumbers']*restartNumbers) + paraDict['replica'] = self.replicaVar tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") @@ -198,7 +199,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=10,stopRole='dnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=10,stopRole='dnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py index 1949157191..62f4b248f9 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py @@ -30,7 +30,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() - + self.replicaVar = int(replicaVar) def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -87,6 +87,7 @@ class TDTestCase: vnodeNumbers = int(dnodeNumbers-mnodeNums) allStbNumbers=(paraDict['stbNumbers']*restartNumbers) dbNumbers = 1 + paraDict['replica'] = self.replicaVar tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") @@ -171,7 +172,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='dnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='dnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py index 9acb313203..397c6f5ccc 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py @@ -30,7 +30,8 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() - self.replicaVar = replicaVar + self.replicaVar = int(replicaVar) + def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -87,7 +88,8 @@ class TDTestCase: vnodeNumbers = int(dnodeNumbers-mnodeNums) allDbNumbers=(paraDict['dbNumbers']*restartNumbers) allStbNumbers=(paraDict['stbNumbers']*restartNumbers) - paraDict['replica']=int(self.replicaVar) + paraDict['replica'] = self.replicaVar + tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") tdSql.checkData(0,1,'%s:6030'%self.host) @@ -171,7 +173,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=1,stopRole='mnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='mnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py index 14ae1a8a48..e3f1a91de8 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py @@ -30,7 +30,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() - + self.replicaVar = int(replicaVar) def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -112,6 +112,7 @@ class TDTestCase: vnodeNumbers = int(dnodeNumbers-mnodeNums) allStbNumbers=(paraDict['stbNumbers']*restartNumbers) dbNumbers = 1 + paraDict['replica'] = self.replicaVar tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") @@ -197,7 +198,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='mnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='mnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py index 056e3225e8..4ff6bffc07 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py @@ -30,6 +30,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() + self.replicaVar = int(replicaVar) def getBuildPath(self): @@ -88,6 +89,7 @@ class TDTestCase: vnodeNumbers = int(dnodeNumbers-mnodeNums) allDbNumbers=(paraDict['dbNumbers']*restartNumbers) allStbNumbers=(paraDict['stbNumbers']*restartNumbers) + paraDict['replica'] = self.replicaVar tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") @@ -171,7 +173,7 @@ class TDTestCase: def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=10,stopRole='vnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=10,stopRole='vnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py index 5fd79e4296..4bd8628a66 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py @@ -30,7 +30,7 @@ class TDTestCase: self.TDDnodes = None tdSql.init(conn.cursor()) self.host = socket.gethostname() - print(tdSql) + self.replicaVar = int(replicaVar) def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) @@ -112,8 +112,8 @@ class TDTestCase: vnodeNumbers = int(dnodeNumbers-mnodeNums) allStbNumbers=(paraDict['stbNumbers']*restartNumbers) dbNumbers = 1 + paraDict['replica'] = self.replicaVar - print(tdSql) tdLog.info("first check dnode and mnode") tdSql.query("select * from information_schema.ins_dnodes;") tdSql.checkData(0,1,'%s:6030'%self.host) @@ -175,7 +175,7 @@ class TDTestCase: # dnodeNumbers don't include database of schema if clusterComCheck.checkDnodes(dnodeNumbers): - tdLog.info("123") + tdLog.info("check numbers of dnodes right ") else: print("456") @@ -192,14 +192,15 @@ class TDTestCase: tdSql.execute("use %s" %(paraDict["dbName"])) tdSql.query("show stables") - tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers)) # # tdLog.info("check Stable Rows:") - tdSql.checkRows(allStbNumbers) - + if self.replicaVar==1: + tdSql.checkRows(allStbNumbers) + else: + tdLog.debug("we find %d stables but exepect to create %d stables "%(tdSql.queryRows,allStbNumbers)) def run(self): # print(self.master_dnode.cfgDict) - self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='vnode') + self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='vnode') def stop(self): tdSql.close() diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 28c4810833..7cbe9d9680 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -47,13 +47,14 @@ class ClusterComCheck: for i in range(dnodeNumbers): if tdSql.queryResult[i][4] == "ready": status+=1 - tdLog.info(status) + # tdLog.info(status) if status == dnodeNumbers: - tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within %ds! " % (dnodeNumbers, count)) + tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within %ds! " % (dnodeNumbers, count+1)) return True - count+=1 time.sleep(1) + count+=1 + else: tdSql.query("select * from information_schema.ins_dnodes") tdLog.debug(tdSql.queryResult) @@ -74,10 +75,10 @@ class ClusterComCheck: tdLog.debug(tdSql.queryResult) tdLog.exit("we find %d databases but expect %d in clusters! " %(tdSql.queryRows,dbNumbers)) - def checkDb(self,dbNumbers,restartNumber,dbNameIndex): + def checkDb(self,dbNumbers,restartNumber,dbNameIndex, timeout=100): count=0 alldbNumbers=(dbNumbers*restartNumber)+2 - while count < 5: + while count < timeout: query_status=0 for j in range(dbNumbers): for i in range(alldbNumbers): @@ -87,22 +88,24 @@ class ClusterComCheck: query_status+=1 tdLog.debug("check %s_%d that status is ready "%(dbNameIndex,j)) else: + sleep(1) continue # print(query_status) - count+=1 if query_status == dbNumbers: - tdLog.success(" check %d database and all databases are ready within 5s! " %dbNumbers) + tdLog.success(" check %d database and all databases are ready within %ds! " %(dbNumbers,count+1)) return True + count+=1 + else: tdLog.debug(tdSql.queryResult) tdLog.debug("query status is %d"%query_status) - tdLog.exit("database is not ready within 5s") + tdLog.exit("database is not ready within %ds"%(timeout+1)) def checkData(self,dbname,stbname,stableCount,CtableCount,rowsPerSTable,): tdSql.execute("use %s"%dbname) - tdSql.query("show stables") + tdSql.query("show %s.stables"%dbname) tdSql.checkRows(stableCount) - tdSql.query("show tables") + tdSql.query("show %s.tables"%dbname) tdSql.checkRows(CtableCount) for i in range(stableCount): tdSql.query("select count(*) from %s%d"%(stbname,i)) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 8529f8d99b..febcc4b728 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -236,22 +236,29 @@ python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py -N 6 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py -N 6 -M 3 -n 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 6 -M 3 -n 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 6 -M 3 -n 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 6 -M 3 -n 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 6 -M 3 -n 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 6 -M 3 -n 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 6 -M 3 -n 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 5 -M 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 5 -M 3 -# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartMnodeInsertData.py -N 5 -M 3 -# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartVnodeInsertData.py -N 5 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 + + +python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 +python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 -python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # TD-19646 python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 From b610b239819b3081698aebfcb4a36fbf27fcb959 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Nov 2022 21:47:12 +0800 Subject: [PATCH 4/8] fix(query): check for null value. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 894f5b97ed..ddd468b7a1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -640,12 +640,12 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, } // this block belongs to a table that is not queried. - void* p = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); + void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); if (p == NULL) { continue; } - STableBlockScanInfo* pScanInfo = p; + STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p; if (pScanInfo->pBlockList == NULL) { pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex)); } @@ -689,7 +689,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN for (int32_t i = 0; i < numOfTables; ++i) { SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i); - STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); + STableBlockScanInfo* pScanInfo = + *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); tMapDataReset(&pScanInfo->mapData); tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); @@ -1137,14 +1138,14 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); if (pBlockInfo != NULL) { - STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + STableBlockScanInfo** pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pScanInfo == NULL) { tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr); return TSDB_CODE_INVALID_PARA; } - SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); - tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); + SBlockIndex* pIndex = taosArrayGet((*pScanInfo)->pBlockList, pBlockInfo->tbBlockIdx); + tMapDataGetItemByIdx(&(*pScanInfo)->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); } #if 0 From 698ae90e5b0fdd8cad7a96d8e2eedbdfeb449620 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Nov 2022 23:37:51 +0800 Subject: [PATCH 5/8] fix(query): set correct value in hash map reset. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ddd468b7a1..dee565874b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -138,7 +138,7 @@ typedef struct SReaderStatus { bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not SHashObj* pTableMap; // SHash - STableBlockScanInfo**pTableIter; // table iterator used in building in-memory buffer data blocks. + STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks. SUidOrderCheckInfo uidCheckInfo; // check all table in uid order SFileBlockDumpInfo fBlockDumpInfo; SDFileSet* pCurrentFileset; // current opened file set @@ -334,7 +334,6 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { STableBlockScanInfo** p = NULL; - while ((p = taosHashIterate(pTableMap, p)) != NULL) { STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p; @@ -367,7 +366,7 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) { } static void destroyAllBlockScanInfo(SHashObj* pTableMap) { - STableBlockScanInfo* p = NULL; + void* p = NULL; while ((p = taosHashIterate(pTableMap, p)) != NULL) { clearBlockScanInfo(*(STableBlockScanInfo**)p); } @@ -3534,18 +3533,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e // TODO refactor: with createDataBlockScanInfo int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { ASSERT(pReader != NULL); + int32_t size = taosHashGetSize(pReader->status.pTableMap); STableBlockScanInfo** p = NULL; while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) { clearBlockScanInfo(*p); } + // todo handle the case where size is less than the value of num + ASSERT(size >= num); + taosHashClear(pReader->status.pTableMap); STableKeyInfo* pList = (STableKeyInfo*) pTableList; for(int32_t i = 0; i < num; ++i) { - STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid}; - taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); + STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i); + pInfo->uid = pList[i].uid; + taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); } return TDB_CODE_SUCCESS; @@ -4017,6 +4021,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { return TSDB_CODE_SUCCESS; } + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + pReader->order = pCond->order; pReader->type = TIMEWINDOW_RANGE_CONTAINED; pReader->status.loadFromFile = true; @@ -4033,13 +4039,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); - resetDataBlockIterator(&pReader->status.blockIter, pReader->order); + resetDataBlockIterator(pBlockIter, pReader->order); int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1; resetAllDataBlockScanInfo(pReader->status.pTableMap, ts); - int32_t code = 0; - SDataBlockIter* pBlockIter = &pReader->status.blockIter; + int32_t code = 0; // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { From 8b1f230d131f0023f8a055c2edb575bb57da6031 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Wed, 2 Nov 2022 09:38:04 +0800 Subject: [PATCH 6/8] build: release ver-3.0.1.6 --- cmake/cmake.version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.version b/cmake/cmake.version index 1a83126cd7..92f7f8b895 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.1.5") + SET(TD_VER_NUMBER "3.0.1.6") ENDIF () IF (DEFINED VERCOMPATIBLE) From c5ac8cb09fb6c39eaa9c11131f5e3a01b74623f0 Mon Sep 17 00:00:00 2001 From: wenzhouwww Date: Wed, 2 Nov 2022 09:45:36 +0800 Subject: [PATCH 7/8] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 44aeaecf07..8d2567a816 100644 --- a/README.md +++ b/README.md @@ -349,6 +349,6 @@ Please follow the [contribution guidelines](CONTRIBUTING.md) to contribute to th For more information about TDengine, you can follow us on social media and join our Discord server: - [Discord](https://discord.com/invite/VZdSuUg4pS) -- [Twitter](https://twitter.com/TDengineDB) +- [Twitter](https://twitter.com/TaosData) - [LinkedIn](https://www.linkedin.com/company/tdengine/) - [YouTube](https://www.youtube.com/channel/UCmp-1U6GS_3V3hjir6Uq5DQ) From 7a537bba09295ae5a9215250e9e0d3c9f26576a8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 2 Nov 2022 10:24:55 +0800 Subject: [PATCH 8/8] refact: adjust sync.h and syncState --- include/libs/sync/sync.h | 44 ++++----- source/dnode/mnode/impl/src/mndMain.c | 17 ++-- source/dnode/mnode/impl/src/mndSync.c | 12 ++- source/dnode/vnode/src/vnd/vnodeQuery.c | 6 +- source/dnode/vnode/src/vnd/vnodeSync.c | 27 ++++-- source/libs/sync/inc/syncEnv.h | 1 + source/libs/sync/inc/syncInt.h | 4 +- .../sync => source/libs/sync/inc}/syncTools.h | 4 +- source/libs/sync/inc/syncUtil.h | 2 +- source/libs/sync/src/syncMain.c | 89 +++++-------------- source/libs/sync/src/syncMessage.c | 2 +- source/libs/sync/src/syncUtil.c | 12 --- .../test/syncConfigChangeSnapshotTest.cpp | 15 ++-- .../libs/sync/test/syncConfigChangeTest.cpp | 14 ++- source/libs/sync/test/syncReplicateTest.cpp | 6 +- source/libs/sync/test/syncSnapshotTest.cpp | 6 +- source/libs/sync/test/syncTestTool.cpp | 16 ++-- source/libs/sync/test/syncWriteTest.cpp | 6 +- 18 files changed, 113 insertions(+), 170 deletions(-) rename {include/libs/sync => source/libs/sync/inc}/syncTools.h (99%) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index dbe8a347da..67c1ffba50 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -25,8 +25,6 @@ extern "C" { #include "tlrucache.h" #include "tmsgcb.h" -extern bool gRaftDetailLog; - #define SYNC_RESP_TTL_MS 10000000 #define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_AFTER_MS (1000 * 20) @@ -132,7 +130,7 @@ typedef struct SSnapshotMeta { typedef struct SSyncFSM { void* data; - void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta *pMeta); + void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); @@ -202,37 +200,27 @@ typedef struct SSyncInfo { int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); } SSyncInfo; -int32_t syncInit(); -void syncCleanUp(); -bool syncIsInit(); -int64_t syncOpen(SSyncInfo* pSyncInfo); -void syncStart(int64_t rid); -void syncStop(int64_t rid); -ESyncState syncGetMyRole(int64_t rid); -bool syncIsReady(int64_t rid); -const char* syncGetMyRoleStr(int64_t rid); -bool syncRestoreFinish(int64_t rid); -SyncTerm syncGetMyTerm(int64_t rid); -SyncIndex syncGetLastIndex(int64_t rid); -SyncIndex syncGetCommitIndex(int64_t rid); -SyncGroupId syncGetVgId(int64_t rid); -void syncGetEpSet(int64_t rid, SEpSet* pEpSet); -void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); -// int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); -const char* syncStr(ESyncState state); -bool syncIsRestoreFinish(int64_t rid); -int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); +typedef struct SSyncState { + ESyncState state; + bool restored; +} SSyncState; +int32_t syncInit(); +void syncCleanUp(); +int64_t syncOpen(SSyncInfo* pSyncInfo); +void syncStart(int64_t rid); +void syncStop(int64_t rid); +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); +int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); -int32_t syncLeaderTransfer(int64_t rid); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncEndSnapshot(int64_t rid); +int32_t syncLeaderTransfer(int64_t rid); int32_t syncStepDown(int64_t rid, SyncTerm newTerm); -int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); - -const char* syncUtilState2String(ESyncState state); +SSyncState syncGetState(int64_t rid); +void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); +const char* syncStr(ESyncState state); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 5668802412..f47637bb50 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -487,14 +487,14 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { } if (mndAcquireRpc(pMsg->info.node) == 0) return 0; - SMnode *pMnode = pMsg->info.node; - const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync); - bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync); + SMnode *pMnode = pMsg->info.node; + SSyncState state = syncGetState(pMnode->syncMgmt.sync); + if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, - pMnode->stopped, restored, role); + pMnode->stopped, state.restored, syncStr(state.restored)); return -1; } @@ -505,8 +505,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { mDebug( "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d " "role:%s, redirect numOfEps:%d inUse:%d", - pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps, - epSet.inUse); + pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored, + syncStr(state.restored), epSet.numOfEps, epSet.inUse); if (epSet.numOfEps > 0) { for (int32_t i = 0; i < epSet.numOfEps; ++i) { @@ -729,8 +729,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr } int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { - pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync); - pLoad->syncRestore = pMnode->restored; + SSyncState state = syncGetState(pMnode->syncMgmt.sync); + pLoad->syncState = state.state; + pLoad->syncRestore = state.restored; mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3fbe92b264..0f50391ac5 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -349,11 +349,15 @@ void mndSyncStop(SMnode *pMnode) { } bool mndIsLeader(SMnode *pMnode) { - SSyncMgmt *pMgmt = &pMnode->syncMgmt; + SSyncState state = syncGetState(pMnode->syncMgmt.sync); - if (!syncIsReady(pMgmt->sync)) { - // get terrno from syncIsReady - // terrno = TSDB_CODE_SYN_NOT_LEADER; + if (state.state != TAOS_SYNC_STATE_LEADER || !state.restored) { + if (state.state != TAOS_SYNC_STATE_LEADER) { + terrno = TSDB_CODE_SYN_NOT_LEADER; + } else { + terrno = TSDB_CODE_APP_NOT_READY; + } + mDebug("vgId:1, mnode not ready, state:%s, restore:%d", syncStr(state.state), state.restored); return false; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index f1bbb2d4b3..527f6188df 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -414,9 +414,11 @@ _exit: } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { + SSyncState state = syncGetState(pVnode->sync); + pLoad->vgId = TD_VID(pVnode); - pLoad->syncState = syncGetMyRole(pVnode->sync); - pLoad->syncRestore = pVnode->restored; + pLoad->syncState = state.state; + pLoad->syncRestore = state.restored; pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f9755bcd12..e27ae07460 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -309,13 +309,13 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S const STraceId *trace = &pMsg->info.traceId; vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, - pMeta->code, pMeta->state, syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); + pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, + pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info}; - vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), + vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", pVnode->config.vgId, TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code)); if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); @@ -338,8 +338,8 @@ static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, con static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, - syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); + pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), + TMSG_INFO(pMsg->msgType)); } #define USE_TSDB_SNAPSHOT @@ -552,12 +552,21 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } -bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; } +bool vnodeIsRoleLeader(SVnode *pVnode) { + SSyncState state = syncGetState(pVnode->sync); + return state.state == TAOS_SYNC_STATE_LEADER; +} bool vnodeIsLeader(SVnode *pVnode) { - if (!syncIsReady(pVnode->sync)) { - vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), - syncRestoreFinish(pVnode->sync)); + SSyncState state = syncGetState(pVnode->sync); + + if (state.state != TAOS_SYNC_STATE_LEADER || !state.restored) { + if (state.state != TAOS_SYNC_STATE_LEADER) { + terrno = TSDB_CODE_SYN_NOT_LEADER; + } else { + terrno = TSDB_CODE_APP_NOT_READY; + } + vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncStr(state.state), state.restored); return false; } diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 55ce1470ce..cf4e3309f1 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -52,6 +52,7 @@ typedef struct SSyncEnv { } SSyncEnv; SSyncEnv* syncEnv(); +bool syncIsInit(); int64_t syncNodeAdd(SSyncNode* pNode); void syncNodeRemove(int64_t rid); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 63aa6d81c9..1ceb45e452 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -22,9 +22,9 @@ extern "C" { #include "sync.h" #include "syncTools.h" -#include "tlog.h" -#include "ttimer.h" #include "taosdef.h" +#include "tlog.h" +#include "trpc.h" #include "ttimer.h" // clang-format off diff --git a/include/libs/sync/syncTools.h b/source/libs/sync/inc/syncTools.h similarity index 99% rename from include/libs/sync/syncTools.h rename to source/libs/sync/inc/syncTools.h index 9586d1febb..54fa15002a 100644 --- a/include/libs/sync/syncTools.h +++ b/source/libs/sync/inc/syncTools.h @@ -20,15 +20,13 @@ extern "C" { #endif -#include "trpc.h" - // ------------------ ds ------------------- typedef struct SRaftId { SyncNodeId addr; SyncGroupId vgId; } SRaftId; -char* sync2SimpleStr(int64_t rid); +char* sync2SimpleStr(int64_t rid); // for compatibility, the same as syncPropose int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 96e22720e8..ac8dd0928c 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -49,7 +49,7 @@ int32_t syncUtilQuorum(int32_t replicaNum); cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); char* syncUtilRaftId2Str(const SRaftId* p); -const char* syncUtilState2String(ESyncState state); +const char* syncStr(ESyncState state); bool syncUtilCanPrint(char c); char* syncUtilprintBin(char* ptr, uint32_t len); char* syncUtilprintBin2(char* ptr, uint32_t len); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3089e9b39b..eff752f1a5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -505,50 +505,20 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return ret; } -ESyncState syncGetMyRole(int64_t rid) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return TAOS_SYNC_STATE_ERROR; - } - ASSERT(rid == pSyncNode->rid); - ESyncState state = pSyncNode->state; +SSyncState syncGetState(int64_t rid) { + SSyncState state = {.state = TAOS_SYNC_STATE_ERROR}; + + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode != NULL) { + state.state = pSyncNode->state; + state.restored = pSyncNode->restoreFinish; + syncNodeRelease(pSyncNode); + } - syncNodeRelease(pSyncNode); return state; } -bool syncIsReady(int64_t rid) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return false; - } - ASSERT(rid == pSyncNode->rid); - bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; - syncNodeRelease(pSyncNode); - - // if false, set error code - if (false == b) { - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_SYN_NOT_LEADER; - } else { - terrno = TSDB_CODE_APP_NOT_READY; - } - } - return b; -} - -bool syncIsRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return false; - } - ASSERT(rid == pSyncNode->rid); - bool b = pSyncNode->restoreFinish; - - syncNodeRelease(pSyncNode); - return b; -} - +#if 0 int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) { if (index < SYNC_INDEX_BEGIN) { return -1; @@ -618,6 +588,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct syncNodeRelease(pSyncNode); return 0; } +#endif SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) { ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); @@ -635,23 +606,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho return lastIndex; } -const char* syncGetMyRoleStr(int64_t rid) { - const char* s = syncUtilState2String(syncGetMyRole(rid)); - return s; -} - -bool syncRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return false; - } - ASSERT(rid == pSyncNode->rid); - bool restoreFinish = pSyncNode->restoreFinish; - - syncNodeRelease(pSyncNode); - return restoreFinish; -} - +#if 0 SyncTerm syncGetMyTerm(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -719,6 +674,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { syncNodeRelease(pSyncNode); } +#endif void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { SSyncNode* pSyncNode = syncNodeAcquire(rid); @@ -742,7 +698,6 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { syncNodeRelease(pSyncNode); } - static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) { SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); @@ -877,7 +832,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } else { ret = -1; terrno = TSDB_CODE_SYN_NOT_LEADER; - sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), + sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType)); goto _END; } @@ -1603,7 +1558,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // tla+ server vars cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); - cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); + cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state)); cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore)); // tla+ candidate vars @@ -1743,7 +1698,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { "stgy:%d, bch:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, @@ -1767,7 +1722,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { "stgy:%d, bch:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, @@ -1821,7 +1776,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { "stgy:%d, bch:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, @@ -1843,7 +1798,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { "stgy:%d, bch:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, @@ -1875,9 +1830,9 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ", sby:%d, " "r-num:%d, " "lcfg:%" PRId64 ", chging:%d, rsto:%d", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, - pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); + pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, + logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, + pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); return s; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index f9609d9c39..fa99d3380f 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -2413,7 +2413,7 @@ cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) { cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak); cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code); cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state); - cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncUtilState2String(pMsg->fsmMeta.state)); + cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncStr(pMsg->fsmMeta.state)); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum); cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1750dce5ec..164e050930 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -176,18 +176,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) { return serialized; } -const char* syncUtilState2String(ESyncState state) { - if (state == TAOS_SYNC_STATE_FOLLOWER) { - return "follower"; - } else if (state == TAOS_SYNC_STATE_CANDIDATE) { - return "candidate"; - } else if (state == TAOS_SYNC_STATE_LEADER) { - return "leader"; - } else { - return "state_error"; - } -} - bool syncUtilCanPrint(char c) { if (c >= 32 && c <= 126) { return true; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 96e0b6c483..472aa55712 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -47,8 +47,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag, cbMeta.term); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag, + cbMeta.term); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); @@ -57,10 +57,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf( - logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 + "\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -68,8 +68,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index bf01f76607..9c6f966fff 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -45,8 +45,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); @@ -55,10 +54,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256] = {0}; - snprintf( - logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 + "\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -66,8 +65,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 7552fc7ae3..8c5267b324 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -42,7 +42,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); @@ -53,7 +53,7 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -61,7 +61,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 2aff0aad93..34039700c3 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -45,7 +45,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); @@ -56,7 +56,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } @@ -64,7 +64,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index a36f3af450..3a9ca7ad4d 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -44,8 +44,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ", term:%" PRIu64 " " "currentTerm:%" PRIu64 " \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag, cbMeta.term, cbMeta.currentTerm); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag, + cbMeta.term, cbMeta.currentTerm); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -56,8 +56,8 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) ", term:%" PRIu64 " " "currentTerm:%" PRIu64 " \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag, cbMeta.term, cbMeta.currentTerm); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag, + cbMeta.term, cbMeta.currentTerm); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -68,8 +68,8 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ", term:%" PRIu64 " " "currentTerm:%" PRIu64 " \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag, cbMeta.term, cbMeta.currentTerm); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag, + cbMeta.term, cbMeta.currentTerm); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -168,8 +168,8 @@ void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbM ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 " " "currentTerm:%" PRIu64 " \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), - cbMeta.flag, cbMeta.term, cbMeta.currentTerm); + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag, + cbMeta.term, cbMeta.currentTerm); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index 0547f39bee..8290871131 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -35,7 +35,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } @@ -43,7 +43,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } @@ -51,7 +51,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); }