commit
1f1f391860
|
@ -36,6 +36,8 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
|
||||||
|
|
||||||
if (sindex < 0 || eindex < sindex) return;
|
if (sindex < 0 || eindex < sindex) return;
|
||||||
|
|
||||||
|
sDebug("%s, extra files will be removed between sindex:%d and eindex:%d", pPeer->id, sindex, eindex);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
name[0] = 0;
|
name[0] = 0;
|
||||||
magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion);
|
magic = (*pNode->getFileInfo)(pNode->vgId, name, &index, eindex, &size, &fversion);
|
||||||
|
@ -61,11 +63,12 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
||||||
bool fileChanged = false;
|
bool fileChanged = false;
|
||||||
|
|
||||||
*fversion = 0;
|
*fversion = 0;
|
||||||
sinfo.index = 0;
|
sinfo.index = -1;
|
||||||
while (1) {
|
while (1) {
|
||||||
// read file info
|
// read file info
|
||||||
int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(SFileInfo));
|
minfo.index = -1;
|
||||||
if (ret != sizeof(SFileInfo)) {
|
int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo));
|
||||||
|
if (ret != sizeof(SFileInfo) || minfo.index == -1) {
|
||||||
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
|
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -75,7 +78,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
|
||||||
sDebug("%s, no more files to restore", pPeer->id);
|
sDebug("%s, no more files to restore", pPeer->id);
|
||||||
|
|
||||||
// remove extra files after the current index
|
// remove extra files after the current index
|
||||||
syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX);
|
if (sinfo.index != -1) syncRemoveExtraFile(pPeer, sinfo.index + 1, TAOS_SYNC_MAX_INDEX);
|
||||||
code = 0;
|
code = 0;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -243,8 +243,10 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
||||||
|
|
||||||
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
||||||
if (queued > MAX_QUEUED_MSG_NUM) {
|
if (queued > MAX_QUEUED_MSG_NUM) {
|
||||||
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
|
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
|
||||||
taosMsleep(3);
|
if (ms > 100) ms = 100;
|
||||||
|
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
|
||||||
|
taosMsleep(ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = vnodePerformFlowCtrl(pWrite);
|
code = vnodePerformFlowCtrl(pWrite);
|
||||||
|
|
Loading…
Reference in New Issue