fix:[TD-31017]process return value in vnode for tmq
This commit is contained in:
parent
e53c8a35a4
commit
99ac957290
|
@ -345,7 +345,7 @@ int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* pWriter);
|
||||||
int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** ppWriter, int8_t rollback);
|
int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** ppWriter, int8_t rollback);
|
||||||
// STqSnapshotReader ==
|
// STqSnapshotReader ==
|
||||||
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader);
|
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader);
|
||||||
int32_t tqSnapReaderClose(STqSnapReader** ppReader);
|
void tqSnapReaderClose(STqSnapReader** ppReader);
|
||||||
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
|
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
|
||||||
// STqSnapshotWriter ======================================
|
// STqSnapshotWriter ======================================
|
||||||
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
|
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
|
||||||
|
|
|
@ -76,14 +76,10 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
|
void tqSnapReaderClose(STqSnapReader** ppReader) {
|
||||||
int32_t code = 0;
|
(void)tdbTbcClose((*ppReader)->pCur);
|
||||||
|
|
||||||
tdbTbcClose((*ppReader)->pCur);
|
|
||||||
taosMemoryFree(*ppReader);
|
taosMemoryFree(*ppReader);
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
@ -99,14 +95,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
|
||||||
|
|
||||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||||
if (*ppData == NULL) {
|
if (*ppData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||||
pHdr->type = pReader->type;
|
pHdr->type = pReader->type;
|
||||||
pHdr->size = vLen;
|
pHdr->size = vLen;
|
||||||
memcpy(pHdr->data, pVal, vLen);
|
(void)memcpy(pHdr->data, pVal, vLen);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
|
@ -131,20 +127,20 @@ struct STqSnapWriter {
|
||||||
|
|
||||||
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
|
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STqSnapWriter* pWriter;
|
STqSnapWriter* pWriter = NULL;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||||
if (pWriter == NULL) {
|
if (pWriter == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
pWriter->pTq = pTq;
|
pWriter->pTq = pTq;
|
||||||
pWriter->sver = sver;
|
pWriter->sver = sver;
|
||||||
pWriter->ever = ever;
|
pWriter->ever = ever;
|
||||||
|
|
||||||
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
code = tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
|
||||||
code = -1;
|
if (code < 0) {
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -164,7 +160,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
if (rollback) {
|
if (rollback) {
|
||||||
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
(void)tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
} else {
|
} else {
|
||||||
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -207,7 +203,8 @@ int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nD
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
STqCheckInfo info = {0};
|
STqCheckInfo info = {0};
|
||||||
if(tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)) != 0){
|
code = tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
|
if(code != 0){
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,7 +214,7 @@ int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nD
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -227,7 +224,8 @@ int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
STqOffset info = {0};
|
STqOffset info = {0};
|
||||||
if(tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)) != 0){
|
code = tqMetaDecodeOffsetInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||||
|
if(code != 0){
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -370,8 +370,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
} else {
|
||||||
pReader->tqHandleDone = 1;
|
pReader->tqHandleDone = 1;
|
||||||
code = tqSnapReaderClose(&pReader->pTqSnapReader);
|
tqSnapReaderClose(&pReader->pTqSnapReader);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!pReader->tqCheckInfoDone) {
|
if (!pReader->tqCheckInfoDone) {
|
||||||
|
@ -387,8 +386,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
} else {
|
||||||
pReader->tqCheckInfoDone = 1;
|
pReader->tqCheckInfoDone = 1;
|
||||||
code = tqSnapReaderClose(&pReader->pTqCheckInfoReader);
|
tqSnapReaderClose(&pReader->pTqCheckInfoReader);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!pReader->tqOffsetDone) {
|
if (!pReader->tqOffsetDone) {
|
||||||
|
@ -404,8 +402,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
||||||
goto _exit;
|
goto _exit;
|
||||||
} else {
|
} else {
|
||||||
pReader->tqOffsetDone = 1;
|
pReader->tqOffsetDone = 1;
|
||||||
code = tqSnapReaderClose(&pReader->pTqOffsetReader);
|
tqSnapReaderClose(&pReader->pTqOffsetReader);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue