refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-08-31 14:31:45 +08:00
parent e2bb64eb18
commit 92e258617a
2 changed files with 13 additions and 13 deletions

View File

@ -89,7 +89,6 @@ int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
taosArrayDestroy(pReader->tdbTbList); taosArrayDestroy(pReader->tdbTbList);
tdbTbcClose(pReader->pCur); tdbTbcClose(pReader->pCur);
taosMemoryFree(pReader); taosMemoryFree(pReader);
return code; return code;
} }

View File

@ -87,6 +87,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pReader->pVnode; SVnode *pVnode = pReader->pVnode;
int32_t vgId = TD_VID(pReader->pVnode);
// CONFIG ============== // CONFIG ==============
// FIXME: if commit multiple times and the config changed? // FIXME: if commit multiple times and the config changed?
@ -220,30 +221,30 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
// STREAM ============ // STREAM ============
vInfo("stream task start"); vInfo("vgId:%d stream task start", vgId);
if (!pReader->streamTaskDone) { if (!pReader->streamTaskDone) {
if (pReader->pStreamTaskReader == NULL) { if (pReader->pStreamTaskReader == NULL) {
vInfo("stream task start 1");
code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);
if (code) { if (code) {
vInfo("stream task start err"); vError("vgId:%d open streamtask snapshot reader failed, code:%s", vgId, tstrerror(code));
goto _err; goto _err;
} }
} }
code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData); code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData);
vInfo("stream task start 2");
if (code) { if (code) {
vInfo("stream task start 3"); vError("vgId:%d error happens during read data from streatask snapshot, code:%s", vgId, tstrerror(code));
goto _err; goto _err;
} else { } else {
if (*ppData) { if (*ppData) {
vInfo("vgId:%d no streamTask snapshot", vgId);
goto _exit; goto _exit;
vInfo("stream task start 4");
} else { } else {
pReader->streamTaskDone = 1; pReader->streamTaskDone = 1;
code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
vInfo("stream task start 5"); if (code) {
if (code) goto _err; goto _err;
}
pReader->pStreamTaskReader = NULL; pReader->pStreamTaskReader = NULL;
} }
} }
@ -305,15 +306,15 @@ _exit:
pReader->index++; pReader->index++;
*nData = sizeof(SSnapDataHdr) + pHdr->size; *nData = sizeof(SSnapDataHdr) + pHdr->size;
pHdr->index = pReader->index; pHdr->index = pReader->index;
vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", TD_VID(pReader->pVnode), vDebug("vgId:%d, vnode snapshot read data, index:%" PRId64 " type:%d blockLen:%d ", vgId, pReader->index,
pReader->index, pHdr->type, *nData); pHdr->type, *nData);
} else { } else {
vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, TD_VID(pReader->pVnode), pReader->index); vInfo("vgId:%d, vnode snapshot read data end, index:%" PRId64, vgId, pReader->index);
} }
return code; return code;
_err: _err:
vError("vgId:%d, vnode snapshot read failed since %s", TD_VID(pReader->pVnode), tstrerror(code)); vError("vgId:%d, vnode snapshot read failed since %s", vgId, tstrerror(code));
return code; return code;
} }