Merge pull request #18830 from taosdata/fix/td-21076
Fix: vnode snapshot problem
This commit is contained in:
commit
bf85790e0d
|
@ -155,7 +155,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
|
|||
|
||||
if (rowVer >= pReader->sver && rowVer <= pReader->ever) {
|
||||
pIter->rInfo.suid = pIter->bData.suid;
|
||||
pIter->rInfo.uid = pIter->bData.uid;
|
||||
pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
|
||||
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
|
||||
goto _add_iter;
|
||||
}
|
||||
|
@ -179,16 +179,14 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) { return pReader->pIter ? &pReader->pIter->rInfo : NULL; }
|
||||
|
||||
static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pReader->pIter) {
|
||||
SFDataIter* pIter = pReader->pIter;
|
||||
|
||||
SFDataIter* pIter = NULL;
|
||||
while (true) {
|
||||
_find_row:
|
||||
pIter = pReader->pIter;
|
||||
for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
int64_t rowVer = pIter->bData.aVersion[pIter->iRow];
|
||||
|
||||
|
@ -224,6 +222,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
|
|||
}
|
||||
|
||||
pReader->pIter = NULL;
|
||||
break;
|
||||
} else if (pIter->type == SNAP_STT_FILE_ITER) {
|
||||
for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) {
|
||||
SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||
|
@ -238,6 +237,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
|
|||
}
|
||||
|
||||
pReader->pIter = NULL;
|
||||
break;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -269,6 +269,20 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) {
|
||||
if (pReader->pIter) {
|
||||
return &pReader->pIter->rInfo;
|
||||
} else {
|
||||
tsdbSnapNextRow(pReader);
|
||||
|
||||
if (pReader->pIter) {
|
||||
return &pReader->pIter->rInfo;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -1356,7 +1370,7 @@ _exit:
|
|||
taosMemoryFree(pWriter);
|
||||
}
|
||||
} else {
|
||||
tsdbDebug("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path);
|
||||
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
||||
*ppWriter = pWriter;
|
||||
}
|
||||
return code;
|
||||
|
@ -1421,7 +1435,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
|||
for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) {
|
||||
tFree(pWriter->aBuf[iBuf]);
|
||||
}
|
||||
tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
|
||||
tsdbInfo("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__);
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
|
|
|
@ -151,7 +151,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
|
|||
}
|
||||
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
syncNodeReplicate(pSyncNode);
|
||||
//syncNodeReplicate(pSyncNode);
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
|
|
@ -345,6 +345,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceive
|
|||
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
||||
// force close, abandon incomplete data
|
||||
if (pReceiver->pWriter != NULL) {
|
||||
// event log
|
||||
sRTrace(pReceiver, "snapshot receiver force stop");
|
||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
|
||||
&(pReceiver->snapshot));
|
||||
ASSERT(ret == 0);
|
||||
|
@ -354,7 +356,7 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
|||
pReceiver->start = false;
|
||||
|
||||
// event log
|
||||
sRTrace(pReceiver, "snapshot receiver force stop");
|
||||
// sRTrace(pReceiver, "snapshot receiver force stop");
|
||||
}
|
||||
|
||||
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
|
||||
|
@ -675,6 +677,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
|
|||
sNTrace(pSyncNode, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
|
||||
pMsg->startTime);
|
||||
taosMsleep(10);
|
||||
timeNow = taosGetTimestampMs();
|
||||
}
|
||||
|
||||
int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
|
||||
|
|
Loading…
Reference in New Issue