TD-2415
This commit is contained in:
parent
bf0dac41e9
commit
b34535dedd
|
@ -1312,7 +1312,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
}
|
}
|
||||||
|
|
||||||
// always update version
|
// always update version
|
||||||
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
|
sTrace("vgId:%d, update nodeVersion, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
|
||||||
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
|
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
|
||||||
nodeVersion = pWalHead->version;
|
nodeVersion = pWalHead->version;
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
|
||||||
|
|
||||||
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
|
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, name);
|
||||||
(void)remove(fname);
|
(void)remove(fname);
|
||||||
sDebug("%s, %s is removed", pPeer->id, fname);
|
sInfo("%s, %s is removed for its extra", pPeer->id, fname);
|
||||||
|
|
||||||
index++;
|
index++;
|
||||||
if (index > eindex) break;
|
if (index > eindex) break;
|
||||||
|
|
|
@ -46,6 +46,7 @@ typedef struct {
|
||||||
int8_t isFull;
|
int8_t isFull;
|
||||||
int8_t isCommiting;
|
int8_t isCommiting;
|
||||||
uint64_t version; // current version
|
uint64_t version; // current version
|
||||||
|
uint64_t cversion; // version while commit start
|
||||||
uint64_t fversion; // version on saved data file
|
uint64_t fversion; // version on saved data file
|
||||||
void * wqueue; // write queue
|
void * wqueue; // write queue
|
||||||
void * qqueue; // read query queue
|
void * qqueue; // read query queue
|
||||||
|
|
|
@ -203,8 +203,8 @@ int32_t vnodeOpen(int32_t vgId) {
|
||||||
|
|
||||||
code = vnodeReadVersion(pVnode);
|
code = vnodeReadVersion(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
vError("vgId:%d, failed to read version, generate it from data file", pVnode->vgId);
|
vError("vgId:%d, failed to read file version, generate it from data file", pVnode->vgId);
|
||||||
// Allow vnode start even when read version fails, set version as walVersion or zero
|
// Allow vnode start even when read file version fails, set file version as wal version or zero
|
||||||
// vnodeCleanUp(pVnode);
|
// vnodeCleanUp(pVnode);
|
||||||
// return code;
|
// return code;
|
||||||
}
|
}
|
||||||
|
@ -442,6 +442,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
|
||||||
pVnode->fversion, pVnode->version);
|
pVnode->fversion, pVnode->version);
|
||||||
pVnode->isCommiting = 0;
|
pVnode->isCommiting = 0;
|
||||||
pVnode->isFull = 1;
|
pVnode->isFull = 1;
|
||||||
|
pVnode->cversion = pVnode->version;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,7 +458,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
|
||||||
if (status == TSDB_STATUS_COMMIT_OVER) {
|
if (status == TSDB_STATUS_COMMIT_OVER) {
|
||||||
pVnode->isCommiting = 0;
|
pVnode->isCommiting = 0;
|
||||||
pVnode->isFull = 0;
|
pVnode->isFull = 0;
|
||||||
pVnode->fversion = pVnode->version;
|
pVnode->fversion = pVnode->cversion;
|
||||||
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
|
||||||
if (!vnodeInInitStatus(pVnode)) {
|
if (!vnodeInInitStatus(pVnode)) {
|
||||||
walRemoveOneOldFile(pVnode->wal);
|
walRemoveOneOldFile(pVnode->wal);
|
||||||
|
|
|
@ -244,7 +244,7 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
||||||
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
||||||
if (queued > MAX_QUEUED_MSG_NUM) {
|
if (queued > MAX_QUEUED_MSG_NUM) {
|
||||||
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
|
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
|
||||||
taosMsleep(1);
|
taosMsleep(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = vnodePerformFlowCtrl(pWrite);
|
code = vnodePerformFlowCtrl(pWrite);
|
||||||
|
@ -292,10 +292,8 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
|
||||||
|
|
||||||
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
||||||
SVnodeObj *pVnode = pWrite->pVnode;
|
SVnodeObj *pVnode = pWrite->pVnode;
|
||||||
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM) {
|
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
|
||||||
if (pVnode->flowctrlLevel <= 0) return 0;
|
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
|
||||||
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsFlowCtrl == 0) {
|
if (tsFlowCtrl == 0) {
|
||||||
int32_t ms = pow(2, pVnode->flowctrlLevel + 2);
|
int32_t ms = pow(2, pVnode->flowctrlLevel + 2);
|
||||||
|
|
|
@ -91,8 +91,11 @@ while $i < $tblNum
|
||||||
$i = $i + 1
|
$i = $i + 1
|
||||||
endw
|
endw
|
||||||
|
|
||||||
|
sql show db.vgroups;
|
||||||
|
print d1: $data04 $data05 , d2: $data06 $data07
|
||||||
|
|
||||||
sql select count(*) from $stb
|
sql select count(*) from $stb
|
||||||
print rows:$rows data00:$data00
|
print rtest1==> rows:$rows data00:$data00
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
@ -103,6 +106,15 @@ endi
|
||||||
|
|
||||||
$totalRows = $data00
|
$totalRows = $data00
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print test2==> rows:$rows data00:$data00
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print test3==> rows:$rows data00:$data00
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print test4==> rows:$rows data00:$data00
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print test5==> rows:$rows data00:$data00
|
||||||
|
|
||||||
print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc
|
print ============== step3: insert old data(now-15d) and new data(now+15d), control data rows in order to save in cache, not falling disc
|
||||||
sql insert into $tb values ( now - 20d , -20 )
|
sql insert into $tb values ( now - 20d , -20 )
|
||||||
sql insert into $tb values ( now - 40d , -40 )
|
sql insert into $tb values ( now - 40d , -40 )
|
||||||
|
@ -153,12 +165,21 @@ if $data00 != $totalRows then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
|
||||||
print ============== step5: insert two data rows: now-16d, now+16d,
|
print ============== step5: insert two data rows: now-16d, now+16d,
|
||||||
sql insert into $tb values ( now - 21d , -21 )
|
sql insert into $tb values ( now - 21d , -21 )
|
||||||
sql insert into $tb values ( now - 41d , -41 )
|
sql insert into $tb values ( now - 41d , -41 )
|
||||||
$totalRows = $totalRows + 2
|
$totalRows = $totalRows + 2
|
||||||
|
|
||||||
print ============== step5: restart dnode2, waiting sync end
|
print ============== step6: restart dnode2, waiting sync end
|
||||||
system sh/exec.sh -n dnode2 -s start
|
system sh/exec.sh -n dnode2 -s start
|
||||||
sleep 3000
|
sleep 3000
|
||||||
$loopCnt = 0
|
$loopCnt = 0
|
||||||
|
@ -199,3 +220,74 @@ if $data00 != $totalRows then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select count(*) from $stb
|
||||||
|
print data00 $data00
|
||||||
|
if $data00 != $totalRows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
Loading…
Reference in New Issue