vnode snapshot read
This commit is contained in:
parent
e25df41536
commit
93391f73e5
|
@ -52,20 +52,20 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
|||
}
|
||||
pReader->pReaderImpl = pSnapReader;
|
||||
|
||||
tqInfo("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode));
|
||||
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode stream-state snapshot reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode stream-state snapshot reader failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
|
||||
int32_t code = 0;
|
||||
tqInfo("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode));
|
||||
streamSnapReaderClose(pReader->pReaderImpl);
|
||||
taosMemoryFree(pReader);
|
||||
return code;
|
||||
|
@ -93,11 +93,11 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
|
|||
pHdr->type = SNAP_DATA_STREAM_STATE_BACKEND;
|
||||
pHdr->size = len;
|
||||
memcpy(pHdr->data, rowData, len);
|
||||
tqInfo("vgId:%d, vnode stream-state snapshot read data", TD_VID(pReader->pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode stream-state snapshot read data failed since %s", TD_VID(pReader->pTq->pVnode),
|
||||
tqError("vgId:%d, vnode stream-state snapshot failed to read since %s", TD_VID(pReader->pTq->pVnode),
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
@ -129,20 +129,23 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
|
|||
|
||||
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
|
||||
SStreamSnapWriter* pSnapWriter = NULL;
|
||||
streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter);
|
||||
if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d, snapshot writer opened", TD_VID(pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-state snapshot writer opened", TD_VID(pTq->pVnode));
|
||||
pWriter->pWriterImpl = pSnapWriter;
|
||||
return code;
|
||||
_err:
|
||||
tqError("vgId:%d, vnode snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
tqError("vgId:%d, vnode snapshot writer close", TD_VID(pWriter->pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-state snapshot writer closed", TD_VID(pWriter->pTq->pVnode));
|
||||
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
|
||||
taosMemoryFree(pWriter);
|
||||
return code;
|
||||
|
@ -150,7 +153,7 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
|
|||
|
||||
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
tqError("vgId:%d, vnode snapshot write", TD_VID(pWriter->pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode));
|
||||
code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
goto _err;
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot reader opened", TD_VID(pTq->pVnode));
|
||||
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
|
@ -105,7 +105,7 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
|
|||
|
||||
*ppData = NULL;
|
||||
int8_t except = 0;
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot start read data", TD_VID(pReader->pTq->pVnode));
|
||||
|
||||
STablePair* pPair = taosArrayGet(pReader->tdbTbList, pReader->pos);
|
||||
NextTbl:
|
||||
|
@ -132,7 +132,7 @@ NextTbl:
|
|||
}
|
||||
if (pVal == NULL || vLen == 0) {
|
||||
*ppData = NULL;
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot finished read data", TD_VID(pReader->pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot finished read data", TD_VID(pReader->pTq->pVnode));
|
||||
return code;
|
||||
}
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||
|
@ -146,7 +146,7 @@ NextTbl:
|
|||
pHdr->size = vLen;
|
||||
memcpy(pHdr->data, pVal, vLen);
|
||||
|
||||
tqInfo("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
|
||||
|
||||
return code;
|
||||
_err:
|
||||
|
@ -184,11 +184,11 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
|||
}
|
||||
|
||||
*ppWriter = pWriter;
|
||||
tqInfo("vgId:%d, stream-task snapshot writer opened", TD_VID(pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, stream-task snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode stream-task snapshot writer failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
return 0;
|
||||
|
@ -198,7 +198,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
|
|||
int32_t code = 0;
|
||||
STQ* pTq = pWriter->pTq;
|
||||
|
||||
tqInfo("vgId:%d, stream-task snapshot writer closed", TD_VID(pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
|
||||
if (rollback) {
|
||||
tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn);
|
||||
} else {
|
||||
|
@ -218,7 +218,8 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
|
|||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode),
|
||||
tstrerror(code));
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
@ -253,11 +254,11 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
|
|||
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
|
||||
// do nothing
|
||||
}
|
||||
tqInfo("vgId:%d, stream-task snapshot write", TD_VID(pTq->pVnode));
|
||||
tqDebug("vgId:%d, vnode stream-task snapshot write", TD_VID(pTq->pVnode));
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, stream-task snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
tqError("vgId:%d, vnode stream-task snapshot failed to write since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue