From b5ebd55be7826cad315ac82ed8681e23e2423a91 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 28 Jul 2022 14:36:11 +0800 Subject: [PATCH 1/2] refactor(sync): add test case --- source/libs/sync/test/sh/a.sh | 12 +- .../script/tsim/sync/vnodeLogAnalyzeTest.sim | 135 ++++++++++++++++++ 2 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 tests/script/tsim/sync/vnodeLogAnalyzeTest.sim diff --git a/source/libs/sync/test/sh/a.sh b/source/libs/sync/test/sh/a.sh index 44cd2edbec..751b42b9c2 100644 --- a/source/libs/sync/test/sh/a.sh +++ b/source/libs/sync/test/sh/a.sh @@ -22,25 +22,25 @@ done echo "" echo "generate vgId ..." -cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | sort | uniq > ${logpath}/log.vgIds.tmp +cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' > ${logpath}/log.vgIds.tmp echo "all vgIds:" > ${logpath}/log.vgIds -cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort | uniq >> ${logpath}/log.vgIds +cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' >> ${logpath}/log.vgIds for dnode in `ls ${logpath} | grep dnode | grep -v log`;do echo "" >> ${logpath}/log.vgIds echo "" >> ${logpath}/log.vgIds echo "${dnode}:" >> ${logpath}/log.vgIds - cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort | uniq >> ${logpath}/log.vgIds + cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' >> ${logpath}/log.vgIds done echo "" echo "generate log.dnode.vgId ..." for logdnode in `ls ${logpath}/log.dnode*`;do for vgId in `cat ${logpath}/log.vgIds.tmp`;do - rowNum=`cat ${logdnode} | grep "${vgId}" | awk 'BEGIN{rowNum=0}{rowNum++}END{print rowNum}'` + rowNum=`cat ${logdnode} | grep "${vgId}," | awk 'BEGIN{rowNum=0}{rowNum++}END{print rowNum}'` #echo "-----${rowNum}" if [ $rowNum -gt 0 ] ; then echo "generate ${logdnode}.${vgId}" - cat ${logdnode} | grep "${vgId}" > ${logdnode}.${vgId} + cat ${logdnode} | grep "${vgId}," > ${logdnode}.${vgId} fi done done @@ -54,7 +54,7 @@ done echo "" echo "generate log.leader.term ..." -cat ${logpath}/*.main | grep "become leader" | grep -v "config change" | awk '{print $5,$0}' | awk -F, '{print $4"_"$0}' | sort -k1 > ${logpath}/log.leader.term +cat ${logpath}/*.main | grep "become leader" | grep -v "config change" | awk '{print $5,$0}' | awk -F, '{print $4"_"$0}' | sort -T. -k1 > ${logpath}/log.leader.term echo "" echo "generate log.index, log.snapshot, log.records, log.actions ..." diff --git a/tests/script/tsim/sync/vnodeLogAnalyzeTest.sim b/tests/script/tsim/sync/vnodeLogAnalyzeTest.sim new file mode 100644 index 0000000000..f159ac66b2 --- /dev/null +++ b/tests/script/tsim/sync/vnodeLogAnalyzeTest.sim @@ -0,0 +1,135 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 0 + +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +system sh/exec.sh -n dnode4 -s start + +sql connect +sql create dnode $hostname port 7200 +sql create dnode $hostname port 7300 +sql create dnode $hostname port 7400 + +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +print ===> $data20 $data21 $data22 $data23 $data24 $data25 +print ===> $data30 $data31 $data32 $data33 $data34 $data35 +if $rows != 4 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi +if $data(3)[4] != ready then + goto step1 +endi +if $data(4)[4] != ready then + goto step1 +endi + +$replica = 3 +$vgroups = 30 + +print ============= create database +sql create database db replica $replica vgroups $vgroups + +$loop_cnt = 0 +check_db_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 100 then + print ====> db not ready! + return -1 +endi +sql show databases +print ===> rows: $rows +print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19] +if $rows != 3 then + return -1 +endi +if $data[2][15] != ready then + goto check_db_ready +endi + +sql use db + +$loop_cnt = 0 +check_vg_ready: +$loop_cnt = $loop_cnt + 1 +sleep 200 +if $loop_cnt == 300 then + print ====> vgroups not ready! + return -1 +endi + +sql show vgroups +print ===> rows: $rows +print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11] + +if $rows != $vgroups then + return -1 +endi + +if $data[0][4] == leader then + if $data[0][6] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][3] + endi + endi +elif $data[0][6] == leader then + if $data[0][4] == follower then + if $data[0][8] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][5] + endi + endi +elif $data[0][8] == leader then + if $data[0][4] == follower then + if $data[0][6] == follower then + print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] + endi + endi +else + goto check_vg_ready +endi + + +vg_ready: +print ====> create stable/child table +sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +sql create table ct1 using stb tags(1000) + + +print ===> write 1000 records +$N = 10000 +$count = 0 +while $count < $N + $ms = 1591200000000 + $count + sql insert into ct1 values( $ms , $count , 2.1, 3.1) + $count = $count + 1 +endw + + From 64f4325bd77f5761249467177192983d15f58727 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 28 Jul 2022 17:55:15 +0800 Subject: [PATCH 2/2] refactor(sync): modify propose batch interface --- include/libs/sync/sync.h | 2 +- include/libs/sync/syncTools.h | 2 +- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncMain.c | 23 ++++++++++--------- source/libs/sync/src/syncMessage.c | 6 ++--- .../sync/test/syncClientRequestBatchTest.cpp | 10 ++++---- 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index d767b3521e..aec8a1f73e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -210,7 +210,7 @@ SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); -int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); +int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 7e95623740..cd2c2d4a4f 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -238,7 +238,7 @@ typedef struct SyncClientRequestBatch { char data[]; // block2, block3 } SyncClientRequestBatch; -SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, int32_t vgId); void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b802d94bea..586cfc0f15 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -170,7 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); -int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); +int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); // option bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ceca201506..ef9cf1fe8f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -677,7 +677,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return ret; } -int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { +int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) { if (arrSize < 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -690,18 +690,18 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_ } ASSERT(rid == pSyncNode->rid); - int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize); + int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgPArr, pIsWeakArr, arrSize); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } -static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) { +static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) { for (int32_t i = 0; i < arrSize; ++i) { - if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) { + if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) { return false; } - if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { + if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { return false; } } @@ -709,8 +709,8 @@ static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) { return true; } -int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { - if (!syncNodeBatchOK(pMsgArr, arrSize)) { +int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) { + if (!syncNodeBatchOK(pMsgPArr, arrSize)) { syncNodeErrorLog(pSyncNode, "sync propose batch error"); terrno = TSDB_CODE_SYN_BATCH_ERROR; return -1; @@ -738,14 +738,14 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe for (int i = 0; i < arrSize; ++i) { SRespStub stub; stub.createTime = taosGetTimestampMs(); - stub.rpcMsg = pMsgArr[i]; + stub.rpcMsg = *(pMsgPArr[i]); uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); raftArr[i].isWeak = pIsWeakArr[i]; raftArr[i].seqNum = seqNum; } - SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId); + SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgPArr, raftArr, arrSize, pSyncNode->vgId); ASSERT(pSyncMsg != NULL); SRpcMsg rpcMsg; @@ -759,7 +759,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg); ASSERT(arrSize == pSyncMsg->dataCount); for (int i = 0; i < arrSize; ++i) { - pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex; + pMsgPArr[i]->info.conn.applyIndex = msgArr[i].info.conn.applyIndex; syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum); } @@ -860,7 +860,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } else { ret = -1; terrno = TSDB_CODE_SYN_NOT_LEADER; - sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); + sError("vgId:%d, sync propose not leader, %s, msgtype:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), TMSG_INFO(pMsg->msgType), pMsg->msgType); goto _END; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 42a3290d5b..13adaf055c 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -963,9 +963,9 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { // block2: SRaftMeta array // block3: rpc msg array (with pCont) -SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, int32_t vgId) { - ASSERT(rpcMsgArr != NULL); + ASSERT(rpcMsgPArr != NULL); ASSERT(arrSize > 0); int32_t dataLen = 0; @@ -991,7 +991,7 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet raftMetaArr[i].seqNum = raftArr[i].seqNum; // init msgArr - msgArr[i] = rpcMsgArr[i]; + msgArr[i] = *(rpcMsgPArr[i]); } return pMsg; diff --git a/source/libs/sync/test/syncClientRequestBatchTest.cpp b/source/libs/sync/test/syncClientRequestBatchTest.cpp index ae74baeda4..84d037be01 100644 --- a/source/libs/sync/test/syncClientRequestBatchTest.cpp +++ b/source/libs/sync/test/syncClientRequestBatchTest.cpp @@ -28,12 +28,12 @@ SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { } SyncClientRequestBatch *createMsg() { - SRpcMsg rpcMsgArr[5]; - memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + SRpcMsg *rpcMsgPArr[5]; + memset(rpcMsgPArr, 0, sizeof(rpcMsgPArr)); for (int32_t i = 0; i < 5; ++i) { SRpcMsg *pRpcMsg = createRpcMsg(i, 20); - rpcMsgArr[i] = *pRpcMsg; - taosMemoryFree(pRpcMsg); + rpcMsgPArr[i] = pRpcMsg; + //taosMemoryFree(pRpcMsg); } SRaftMeta raftArr[5]; @@ -43,7 +43,7 @@ SyncClientRequestBatch *createMsg() { raftArr[i].isWeak = i % 2; } - SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234); + SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgPArr, raftArr, 5, 1234); return pMsg; }