fix: write snapshot after apply queue is empty
This commit is contained in:
parent
99f29f6455
commit
2b2e1c79c9
|
@ -30,6 +30,7 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
|||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
pVnode->blockCount = 1;
|
||||
tsem_wait(&pVnode->syncSem);
|
||||
}
|
||||
}
|
||||
|
@ -37,8 +38,11 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
|||
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
tsem_post(&pVnode->syncSem);
|
||||
if (pVnode->blockCount) {
|
||||
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
pVnode->blockCount = 0;
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -281,14 +285,15 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGInfo("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%ld", vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
|
||||
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
|
||||
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (rsp.code == 0) {
|
||||
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
|
||||
vGError("vgId:%d, msg:%p failed to apply since %s, index:%" PRId64, vgId, pMsg, terrstr(),
|
||||
pMsg->info.conn.applyIndex);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -297,7 +302,7 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, rsp.code);
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x index:%" PRId64, vgId, pMsg, rsp.code, pMsg->info.conn.applyIndex);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
@ -611,6 +616,18 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
|
|||
#ifdef USE_TSDB_SNAPSHOT
|
||||
SVnode *pVnode = pFsm->data;
|
||||
SSnapshotParam *pSnapshotParam = pParam;
|
||||
|
||||
do {
|
||||
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
|
||||
if (itemSize == 0) {
|
||||
vDebug("vgId:%d, apply queue is empty, start write snapshot", pVnode->config.vgId);
|
||||
break;
|
||||
} else {
|
||||
vDebug("vgId:%d, %d items in apply queue, write snapshot later", pVnode->config.vgId);
|
||||
taosMsleep(10);
|
||||
}
|
||||
} while (true);
|
||||
|
||||
int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
|
||||
return code;
|
||||
#else
|
||||
|
@ -622,7 +639,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
|
|||
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
|
||||
#ifdef USE_TSDB_SNAPSHOT
|
||||
SVnode *pVnode = pFsm->data;
|
||||
vDebug("vgId:%d, stop write snapshot, isApply:%d", pVnode->config.vgId, isApply);
|
||||
|
||||
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
|
||||
vDebug("vgId:%d, apply snapshot to vnode, code:0x%x", pVnode->config.vgId, code);
|
||||
return code;
|
||||
#else
|
||||
taosMemoryFree(pWriter);
|
||||
|
@ -634,6 +654,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
|
|||
#ifdef USE_TSDB_SNAPSHOT
|
||||
SVnode *pVnode = pFsm->data;
|
||||
int32_t code = vnodeSnapWrite(pWriter, pBuf, len);
|
||||
vTrace("vgId:%d, write snapshot, len:%d", pVnode->config.vgId, len);
|
||||
return code;
|
||||
#else
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue