Merge pull request #15524 from taosdata/feature/3.0_mhli
refactor(sync): modify propose batch interface
This commit is contained in:
commit
3bb89e7097
|
@ -210,7 +210,7 @@ SyncGroupId syncGetVgId(int64_t rid);
|
|||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
|
||||
int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
|
||||
bool syncEnvIsStart();
|
||||
const char* syncStr(ESyncState state);
|
||||
bool syncIsRestoreFinish(int64_t rid);
|
||||
|
|
|
@ -238,7 +238,7 @@ typedef struct SyncClientRequestBatch {
|
|||
char data[]; // block2, block3
|
||||
} SyncClientRequestBatch;
|
||||
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
int32_t vgId);
|
||||
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
|
||||
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
|
||||
|
|
|
@ -170,7 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode);
|
|||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||
void syncNodeClose(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
||||
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
|
||||
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
|
||||
|
||||
// option
|
||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||
|
|
|
@ -677,7 +677,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
|
||||
int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) {
|
||||
if (arrSize < 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
|
@ -690,18 +690,18 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_
|
|||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
|
||||
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgPArr, pIsWeakArr, arrSize);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) {
|
||||
static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) {
|
||||
for (int32_t i = 0; i < arrSize; ++i) {
|
||||
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||
if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -709,8 +709,8 @@ static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) {
|
|||
return true;
|
||||
}
|
||||
|
||||
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
|
||||
if (!syncNodeBatchOK(pMsgArr, arrSize)) {
|
||||
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) {
|
||||
if (!syncNodeBatchOK(pMsgPArr, arrSize)) {
|
||||
syncNodeErrorLog(pSyncNode, "sync propose batch error");
|
||||
terrno = TSDB_CODE_SYN_BATCH_ERROR;
|
||||
return -1;
|
||||
|
@ -738,14 +738,14 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
|
|||
for (int i = 0; i < arrSize; ++i) {
|
||||
SRespStub stub;
|
||||
stub.createTime = taosGetTimestampMs();
|
||||
stub.rpcMsg = pMsgArr[i];
|
||||
stub.rpcMsg = *(pMsgPArr[i]);
|
||||
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
||||
|
||||
raftArr[i].isWeak = pIsWeakArr[i];
|
||||
raftArr[i].seqNum = seqNum;
|
||||
}
|
||||
|
||||
SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId);
|
||||
SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgPArr, raftArr, arrSize, pSyncNode->vgId);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
|
@ -759,7 +759,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
|
|||
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg);
|
||||
ASSERT(arrSize == pSyncMsg->dataCount);
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex;
|
||||
pMsgPArr[i]->info.conn.applyIndex = msgArr[i].info.conn.applyIndex;
|
||||
syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum);
|
||||
}
|
||||
|
||||
|
@ -860,7 +860,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
|||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
|
||||
sError("vgId:%d, sync propose not leader, %s, msgtype:%s,%d", pSyncNode->vgId,
|
||||
syncUtilState2String(pSyncNode->state), TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
goto _END;
|
||||
}
|
||||
|
||||
|
|
|
@ -963,9 +963,9 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
|
|||
// block2: SRaftMeta array
|
||||
// block3: rpc msg array (with pCont)
|
||||
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
int32_t vgId) {
|
||||
ASSERT(rpcMsgArr != NULL);
|
||||
ASSERT(rpcMsgPArr != NULL);
|
||||
ASSERT(arrSize > 0);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
|
@ -991,7 +991,7 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet
|
|||
raftMetaArr[i].seqNum = raftArr[i].seqNum;
|
||||
|
||||
// init msgArr
|
||||
msgArr[i] = rpcMsgArr[i];
|
||||
msgArr[i] = *(rpcMsgPArr[i]);
|
||||
}
|
||||
|
||||
return pMsg;
|
||||
|
|
|
@ -22,25 +22,25 @@ done
|
|||
|
||||
echo ""
|
||||
echo "generate vgId ..."
|
||||
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | sort | uniq > ${logpath}/log.vgIds.tmp
|
||||
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' > ${logpath}/log.vgIds.tmp
|
||||
echo "all vgIds:" > ${logpath}/log.vgIds
|
||||
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort | uniq >> ${logpath}/log.vgIds
|
||||
cat ${logpath}/log.dnode* | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' >> ${logpath}/log.vgIds
|
||||
for dnode in `ls ${logpath} | grep dnode | grep -v log`;do
|
||||
echo "" >> ${logpath}/log.vgIds
|
||||
echo "" >> ${logpath}/log.vgIds
|
||||
echo "${dnode}:" >> ${logpath}/log.vgIds
|
||||
cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort | uniq >> ${logpath}/log.vgIds
|
||||
cat ${logpath}/${dnode}/log/taosdlog.* | grep SYN | grep "vgId:" | grep -v ERROR | awk '{print $5}' | awk -F, '{print $1}' | sort -T. | uniq | awk -F: '{print $2, $0}' | sort -T. -k1 -n | awk '{print $2}' >> ${logpath}/log.vgIds
|
||||
done
|
||||
|
||||
echo ""
|
||||
echo "generate log.dnode.vgId ..."
|
||||
for logdnode in `ls ${logpath}/log.dnode*`;do
|
||||
for vgId in `cat ${logpath}/log.vgIds.tmp`;do
|
||||
rowNum=`cat ${logdnode} | grep "${vgId}" | awk 'BEGIN{rowNum=0}{rowNum++}END{print rowNum}'`
|
||||
rowNum=`cat ${logdnode} | grep "${vgId}," | awk 'BEGIN{rowNum=0}{rowNum++}END{print rowNum}'`
|
||||
#echo "-----${rowNum}"
|
||||
if [ $rowNum -gt 0 ] ; then
|
||||
echo "generate ${logdnode}.${vgId}"
|
||||
cat ${logdnode} | grep "${vgId}" > ${logdnode}.${vgId}
|
||||
cat ${logdnode} | grep "${vgId}," > ${logdnode}.${vgId}
|
||||
fi
|
||||
done
|
||||
done
|
||||
|
@ -54,7 +54,7 @@ done
|
|||
|
||||
echo ""
|
||||
echo "generate log.leader.term ..."
|
||||
cat ${logpath}/*.main | grep "become leader" | grep -v "config change" | awk '{print $5,$0}' | awk -F, '{print $4"_"$0}' | sort -k1 > ${logpath}/log.leader.term
|
||||
cat ${logpath}/*.main | grep "become leader" | grep -v "config change" | awk '{print $5,$0}' | awk -F, '{print $4"_"$0}' | sort -T. -k1 > ${logpath}/log.leader.term
|
||||
|
||||
echo ""
|
||||
echo "generate log.index, log.snapshot, log.records, log.actions ..."
|
||||
|
|
|
@ -28,12 +28,12 @@ SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
|
|||
}
|
||||
|
||||
SyncClientRequestBatch *createMsg() {
|
||||
SRpcMsg rpcMsgArr[5];
|
||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
||||
SRpcMsg *rpcMsgPArr[5];
|
||||
memset(rpcMsgPArr, 0, sizeof(rpcMsgPArr));
|
||||
for (int32_t i = 0; i < 5; ++i) {
|
||||
SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
|
||||
rpcMsgArr[i] = *pRpcMsg;
|
||||
taosMemoryFree(pRpcMsg);
|
||||
rpcMsgPArr[i] = pRpcMsg;
|
||||
//taosMemoryFree(pRpcMsg);
|
||||
}
|
||||
|
||||
SRaftMeta raftArr[5];
|
||||
|
@ -43,7 +43,7 @@ SyncClientRequestBatch *createMsg() {
|
|||
raftArr[i].isWeak = i % 2;
|
||||
}
|
||||
|
||||
SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234);
|
||||
SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgPArr, raftArr, 5, 1234);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
sql connect
|
||||
sql create dnode $hostname port 7200
|
||||
sql create dnode $hostname port 7300
|
||||
sql create dnode $hostname port 7400
|
||||
|
||||
$x = 0
|
||||
step1:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
print ===> $data20 $data21 $data22 $data23 $data24 $data25
|
||||
print ===> $data30 $data31 $data32 $data33 $data34 $data35
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(3)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(4)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
|
||||
$replica = 3
|
||||
$vgroups = 30
|
||||
|
||||
print ============= create database
|
||||
sql create database db replica $replica vgroups $vgroups
|
||||
|
||||
$loop_cnt = 0
|
||||
check_db_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 100 then
|
||||
print ====> db not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show databases
|
||||
print ===> rows: $rows
|
||||
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data[2][15] != ready then
|
||||
goto check_db_ready
|
||||
endi
|
||||
|
||||
sql use db
|
||||
|
||||
$loop_cnt = 0
|
||||
check_vg_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 300 then
|
||||
print ====> vgroups not ready!
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show vgroups
|
||||
print ===> rows: $rows
|
||||
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
|
||||
|
||||
if $rows != $vgroups then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][4] == leader then
|
||||
if $data[0][6] == follower then
|
||||
if $data[0][8] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
|
||||
endi
|
||||
endi
|
||||
elif $data[0][6] == leader then
|
||||
if $data[0][4] == follower then
|
||||
if $data[0][8] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
|
||||
endi
|
||||
endi
|
||||
elif $data[0][8] == leader then
|
||||
if $data[0][4] == follower then
|
||||
if $data[0][6] == follower then
|
||||
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
|
||||
endi
|
||||
endi
|
||||
else
|
||||
goto check_vg_ready
|
||||
endi
|
||||
|
||||
|
||||
vg_ready:
|
||||
print ====> create stable/child table
|
||||
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create table ct1 using stb tags(1000)
|
||||
|
||||
|
||||
print ===> write 1000 records
|
||||
$N = 10000
|
||||
$count = 0
|
||||
while $count < $N
|
||||
$ms = 1591200000000 + $count
|
||||
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
|
||||
$count = $count + 1
|
||||
endw
|
||||
|
||||
|
Loading…
Reference in New Issue