Merge remote-tracking branch 'origin/main' into fix/main_bugfix_wxy
This commit is contained in:
commit
108863e454
|
@ -192,6 +192,8 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
|
||||||
return SYNC_TERM_INVALID;
|
return SYNC_TERM_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); }
|
||||||
|
|
||||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
@ -219,9 +221,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
|
|
||||||
ASSERT(pEntry->index == index);
|
ASSERT(pEntry->index == index);
|
||||||
|
|
||||||
if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
|
bool forceSync = raftLogForceSync(pEntry);
|
||||||
walFsync(pWal, true);
|
walFsync(pWal, forceSync);
|
||||||
}
|
|
||||||
|
|
||||||
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
|
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
|
||||||
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
|
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
|
||||||
|
|
|
@ -1316,11 +1316,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
|
||||||
}
|
}
|
||||||
TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
|
TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
|
||||||
|
|
||||||
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4);
|
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - nHeader - sizeof(pgno));
|
||||||
nLeft -= nLocal - 4;
|
nLeft -= nLocal - nHeader - sizeof(pgno);
|
||||||
nLeftKey -= nLocal - 4;
|
nLeftKey -= nLocal - nHeader - sizeof(pgno);
|
||||||
|
|
||||||
memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno));
|
memcpy(&pgno, pCell + nLocal - sizeof(pgno), sizeof(pgno));
|
||||||
|
|
||||||
int lastKeyPageSpace = 0;
|
int lastKeyPageSpace = 0;
|
||||||
// load left key & val to ovpages
|
// load left key & val to ovpages
|
||||||
|
@ -1346,9 +1346,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
|
||||||
|
|
||||||
if (lastKeyPage) {
|
if (lastKeyPage) {
|
||||||
if (lastKeyPageSpace >= vLen) {
|
if (lastKeyPageSpace >= vLen) {
|
||||||
|
if (vLen > 0) {
|
||||||
pDecoder->pVal = ofpCell + kLen - nLeftKey;
|
pDecoder->pVal = ofpCell + kLen - nLeftKey;
|
||||||
|
|
||||||
nLeft -= vLen;
|
nLeft -= vLen;
|
||||||
|
}
|
||||||
pgno = 0;
|
pgno = 0;
|
||||||
} else {
|
} else {
|
||||||
// read partial val to local
|
// read partial val to local
|
||||||
|
|
|
@ -637,11 +637,6 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
||||||
void walFsync(SWal *pWal, bool forceFsync) {
|
void walFsync(SWal *pWal, bool forceFsync) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
||||||
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
|
||||||
if (taosFsyncFile(pWal->pIdxFile) < 0) {
|
|
||||||
wError("vgId:%d, file:%" PRId64 ".idx, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
|
||||||
strerror(errno));
|
|
||||||
}
|
|
||||||
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||||
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
||||||
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
||||||
|
|
|
@ -112,7 +112,8 @@ class TDTestCase:
|
||||||
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
|
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
|
||||||
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;"
|
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;"
|
||||||
tdLog.debug(cmd)
|
tdLog.debug(cmd)
|
||||||
os.system(cmd)
|
if os.system(cmd) != 0:
|
||||||
|
raise Exception("failed to execute system command. cmd: %s" % cmd)
|
||||||
|
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
|
tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
|
||||||
|
@ -292,6 +293,8 @@ class TDTestCase:
|
||||||
tdLog.debug("drop mnode %d successfully"%(i+1))
|
tdLog.debug("drop mnode %d successfully"%(i+1))
|
||||||
break
|
break
|
||||||
count+=1
|
count+=1
|
||||||
|
self.wait_for_transactions(20)
|
||||||
|
|
||||||
tdLog.debug("create mnode on dnode %d"%(i+1))
|
tdLog.debug("create mnode on dnode %d"%(i+1))
|
||||||
tdSql.execute("create mnode on dnode %d"%(i+1))
|
tdSql.execute("create mnode on dnode %d"%(i+1))
|
||||||
count=0
|
count=0
|
||||||
|
@ -299,12 +302,24 @@ class TDTestCase:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
tdSql.query("select * from information_schema.ins_mnodes;")
|
tdSql.query("select * from information_schema.ins_mnodes;")
|
||||||
if tdSql.checkRows(3):
|
if tdSql.checkRows(3):
|
||||||
tdLog.debug("drop mnode %d successfully"%(i+1))
|
tdLog.debug("create mnode %d successfully"%(i+1))
|
||||||
break
|
break
|
||||||
count+=1
|
count+=1
|
||||||
|
self.wait_for_transactions(20)
|
||||||
dropcount+=1
|
dropcount+=1
|
||||||
self.check3mnode()
|
self.check3mnode()
|
||||||
|
|
||||||
|
def wait_for_transactions(self, timeout):
|
||||||
|
count=0
|
||||||
|
while count<timeout:
|
||||||
|
time.sleep(1)
|
||||||
|
tdSql.query("show transactions;")
|
||||||
|
if tdSql.checkRows(0):
|
||||||
|
tdLog.debug("transactions completed successfully")
|
||||||
|
break
|
||||||
|
count+=1
|
||||||
|
if count >= timeout:
|
||||||
|
tdLog.debug("transactions not finished before timeout (%d secs)", timeout)
|
||||||
|
|
||||||
def getConnection(self, dnode):
|
def getConnection(self, dnode):
|
||||||
host = dnode.cfgDict["fqdn"]
|
host = dnode.cfgDict["fqdn"]
|
||||||
|
|
Loading…
Reference in New Issue