vnode snapshot read

This commit is contained in:
yihaoDeng 2023-06-29 09:35:09 +00:00
parent 5cebd36c1c
commit 9eb6d8cf1e
4 changed files with 31 additions and 9 deletions

View File

@ -26,6 +26,7 @@ struct SStreamStateReader {
TBC* pCur; TBC* pCur;
SStreamSnapReader* pReaderImpl; SStreamSnapReader* pReaderImpl;
int32_t complete;
}; };
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) { int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
@ -46,8 +47,9 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
SStreamSnapReader* pSnapReader = NULL; SStreamSnapReader* pSnapReader = NULL;
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints"); sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints");
streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader); if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) {
pReader->complete = 1;
}
pReader->pReaderImpl = pSnapReader; pReader->pReaderImpl = pSnapReader;
tqInfo("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode)); tqInfo("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode));
@ -63,6 +65,7 @@ _err:
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
int32_t code = 0; int32_t code = 0;
tqInfo("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode));
streamSnapReaderClose(pReader->pReaderImpl); streamSnapReaderClose(pReader->pReaderImpl);
taosMemoryFree(pReader); taosMemoryFree(pReader);
return code; return code;
@ -70,10 +73,16 @@ int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
int32_t code = 0; int32_t code = 0;
if (pReader->complete == 0) {
return 0;
}
uint8_t* rowData = NULL; uint8_t* rowData = NULL;
int64_t len; int64_t len;
code = streamSnapRead(pReader->pReaderImpl, &rowData, &len); code = streamSnapRead(pReader->pReaderImpl, &rowData, &len);
if (rowData == NULL || len == 0) {
return code;
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + len);
if (*ppData == NULL) { if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -122,16 +131,18 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
SStreamSnapWriter* pSnapWriter = NULL; SStreamSnapWriter* pSnapWriter = NULL;
streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter); streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter);
tqInfo("vgId:%d, snapshot writer opened", TD_VID(pTq->pVnode));
pWriter->pWriterImpl = pSnapWriter; pWriter->pWriterImpl = pSnapWriter;
return code;
_err: _err:
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL; *ppWriter = NULL;
return code; return code;
} }
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
tqError("vgId:%d, vnode snapshot writer close", TD_VID(pWriter->pTq->pVnode));
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
@ -139,6 +150,7 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
tqError("vgId:%d, vnode snapshot write", TD_VID(pWriter->pTq->pVnode));
code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
return code; return code;
} }

View File

@ -182,6 +182,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
} }
*ppWriter = pWriter; *ppWriter = pWriter;
tqInfo("vgId:%d, stream-task snapshot writer opened", TD_VID(pTq->pVnode));
return code; return code;
_err: _err:
@ -195,6 +196,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
tqInfo("vgId:%d, stream-task snapshot writer closed", TD_VID(pTq->pVnode));
if (rollback) { if (rollback) {
tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn); tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn);
} else { } else {
@ -249,6 +251,7 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
// do nothing // do nothing
} }
tqInfo("vgId:%d, stream-task snapshot write", TD_VID(pTq->pVnode));
return code; return code;

View File

@ -584,7 +584,9 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) { if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME ||
pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno); vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0; return 0;
} }
@ -640,8 +642,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true); return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META: case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg); return vnodeGetBatchMeta(pVnode, pMsg);
// case TDMT_VND_TMQ_CONSUME: // case TDMT_VND_TMQ_CONSUME:
// return tqProcessPollReq(pVnode->pTq, pMsg); // return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO: case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
@ -1566,7 +1568,7 @@ static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t ver) {
} }
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode)); vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confirm msg is processed", TD_VID(pVnode));
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!pVnode->config.hashChange) { if (!pVnode->config.hashChange) {
goto _exit; goto _exit;

View File

@ -131,6 +131,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
} }
taosCloseDir(&pDir); taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) {
code = -1;
return code;
}
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
SBackendFileItem item; SBackendFileItem item;
@ -239,7 +243,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) { if (nread == -1) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to read snap, file name:%s, reason:%s", item->name, tstrerror(code)); qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type,
tstrerror(code));
return code; return code;
// handle later // handle later
return -1; return -1;