add self check

This commit is contained in:
Yihao Deng 2024-06-29 12:56:36 +00:00
parent 7290920c6f
commit 6c6bff611a
1 changed files with 9 additions and 6 deletions

View File

@ -75,7 +75,7 @@ _err:
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
int32_t code = 0; int32_t code = 0;
tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER);
streamSnapReaderClose(pReader->pReaderImpl); code = streamSnapReaderClose(pReader->pReaderImpl);
taosMemoryFree(pReader); taosMemoryFree(pReader);
return code; return code;
} }
@ -138,7 +138,12 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
taosMkDir(pTq->pStreamMeta->path); if (taosMkDir(pTq->pStreamMeta->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tqError("vgId:%d, vnode %s snapshot writer failed to create directory %s since %s", TD_VID(pTq->pVnode),
STREAM_STATE_TRANSFER, pTq->pStreamMeta->path, tstrerror(terrno));
goto _err;
}
SStreamSnapWriter* pSnapWriter = NULL; SStreamSnapWriter* pSnapWriter = NULL;
if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) { if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) {
@ -151,6 +156,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
_err: _err:
tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER,
tstrerror(terrno)); tstrerror(terrno));
@ -160,11 +166,8 @@ _err:
} }
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
int32_t code = 0;
tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); return streamSnapWriterClose(pWriter->pWriterImpl, rollback);
return code;
} }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {