fix:add vnode snaphot for tmq
This commit is contained in:
parent
1dbd322fa0
commit
39fb5f4334
|
@ -65,7 +65,8 @@ set(
|
|||
"src/tq/tqSink.c"
|
||||
"src/tq/tqCommit.c"
|
||||
"src/tq/tqStreamTask.c"
|
||||
"src/tq/tqSnapshot.c"
|
||||
"src/tq/tqHandleSnapshot.c"
|
||||
"src/tq/tqCheckInfoSnapshot.c"
|
||||
"src/tq/tqOffsetSnapshot.c"
|
||||
"src/tq/tqStreamStateSnap.c"
|
||||
"src/tq/tqStreamTaskSnap.c"
|
||||
|
|
|
@ -134,7 +134,7 @@ int32_t tqMetaOpen(STQ* pTq);
|
|||
int32_t tqMetaClose(STQ* pTq);
|
||||
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
|
||||
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
|
||||
int32_t tqMetaRestoreHandle(STQ* pTq);
|
||||
//int32_t tqMetaRestoreHandle(STQ* pTq);
|
||||
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
||||
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
||||
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
||||
|
|
|
@ -69,6 +69,8 @@ typedef struct STqSnapReader STqSnapReader;
|
|||
typedef struct STqSnapWriter STqSnapWriter;
|
||||
typedef struct STqOffsetReader STqOffsetReader;
|
||||
typedef struct STqOffsetWriter STqOffsetWriter;
|
||||
typedef struct STqCheckInfoReader STqCheckInfoReader;
|
||||
typedef struct STqCheckInfoWriter STqCheckInfoWriter;
|
||||
typedef struct SStreamTaskReader SStreamTaskReader;
|
||||
typedef struct SStreamTaskWriter SStreamTaskWriter;
|
||||
typedef struct SStreamStateReader SStreamStateReader;
|
||||
|
@ -308,6 +310,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
|
|||
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
|
||||
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback);
|
||||
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
// STqCheckInfoshotReader ==
|
||||
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader);
|
||||
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader);
|
||||
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData);
|
||||
// STqCheckInfoshotWriter ======================================
|
||||
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter);
|
||||
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback);
|
||||
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
// STqOffsetReader ========================================
|
||||
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader);
|
||||
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader);
|
||||
|
@ -503,6 +513,7 @@ enum {
|
|||
SNAP_DATA_STREAM_TASK_CHECKPOINT = 10,
|
||||
SNAP_DATA_STREAM_STATE = 11,
|
||||
SNAP_DATA_STREAM_STATE_BACKEND = 12,
|
||||
SNAP_DATA_TQ_CHECKINFO = 13,
|
||||
};
|
||||
|
||||
struct SSnapDataHdr {
|
||||
|
|
|
@ -0,0 +1,211 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "meta.h"
|
||||
#include "tdbInt.h"
|
||||
#include "tq.h"
|
||||
|
||||
// STqCheckInfoReader ========================================
|
||||
struct STqCheckInfoReader {
|
||||
STQ* pTq;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
TBC* pCur;
|
||||
};
|
||||
|
||||
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader) {
|
||||
int32_t code = 0;
|
||||
STqCheckInfoReader* pReader = NULL;
|
||||
|
||||
// alloc
|
||||
pReader = (STqCheckInfoReader*)taosMemoryCalloc(1, sizeof(STqCheckInfoReader));
|
||||
if (pReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pReader->pTq = pTq;
|
||||
pReader->sver = sver;
|
||||
pReader->ever = ever;
|
||||
|
||||
// impl
|
||||
code = tdbTbcOpen(pTq->pCheckStore, &pReader->pCur, NULL);
|
||||
if (code) {
|
||||
taosMemoryFree(pReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = tdbTbcMoveToFirst(pReader->pCur);
|
||||
if (code) {
|
||||
taosMemoryFree(pReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d, vnode checkinfo tq reader opened", TD_VID(pTq->pVnode));
|
||||
|
||||
*ppReader = pReader;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode checkinfo tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader) {
|
||||
int32_t code = 0;
|
||||
|
||||
tdbTbcClose((*ppReader)->pCur);
|
||||
taosMemoryFree(*ppReader);
|
||||
*ppReader = NULL;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
void* pKey = NULL;
|
||||
void* pVal = NULL;
|
||||
int32_t kLen = 0;
|
||||
int32_t vLen = 0;
|
||||
SDecoder decoder;
|
||||
STqCheckInfo info;
|
||||
|
||||
*ppData = NULL;
|
||||
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||
pHdr->type = SNAP_DATA_TQ_CHECKINFO;
|
||||
pHdr->size = vLen;
|
||||
memcpy(pHdr->data, pVal, vLen);
|
||||
|
||||
tqInfo("vgId:%d, vnode check info tq read data, topic: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
|
||||
info.topic, vLen);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, vnode check info tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
// STqCheckInfoWriter ========================================
|
||||
struct STqCheckInfoWriter {
|
||||
STQ* pTq;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
TXN* txn;
|
||||
};
|
||||
|
||||
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter) {
|
||||
int32_t code = 0;
|
||||
STqCheckInfoWriter* pWriter;
|
||||
|
||||
// alloc
|
||||
pWriter = (STqCheckInfoWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||
if (pWriter == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pWriter->pTq = pTq;
|
||||
pWriter->sver = sver;
|
||||
pWriter->ever = ever;
|
||||
|
||||
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFree(pWriter);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
*ppWriter = pWriter;
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, tq check info writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
STqCheckInfoWriter* pWriter = *ppWriter;
|
||||
STQ* pTq = pWriter->pTq;
|
||||
|
||||
if (rollback) {
|
||||
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
} else {
|
||||
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
if (code) goto _err;
|
||||
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int vgId = TD_VID(pWriter->pTq->pVnode);
|
||||
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
|
||||
// restore from metastore
|
||||
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, tq check info writer close failed since %s", vgId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
STQ* pTq = pWriter->pTq;
|
||||
STqCheckInfo info = {0};
|
||||
SDecoder decoder;
|
||||
SDecoder* pDecoder = &decoder;
|
||||
|
||||
|
||||
code = tDecodeSTqCheckInfo(pDecoder, &info);
|
||||
if (code) goto _err;
|
||||
code = tqMetaSaveCheckInfo(pTq, info.topic, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
|
||||
if (code < 0) goto _err;
|
||||
tDecoderClear(pDecoder);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tDecoderClear(pDecoder);
|
||||
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
|
@ -173,20 +173,18 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
int vgId = TD_VID(pWriter->pTq->pVnode);
|
||||
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
|
||||
// restore from metastore
|
||||
if (tqMetaRestoreHandle(pTq) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
// // restore from metastore
|
||||
// if (tqMetaRestoreHandle(pTq) < 0) {
|
||||
// goto _err;
|
||||
// }
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tqError("vgId:%d, tq snapshot writer close failed since %s", vgId, tstrerror(code));
|
||||
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
|
@ -388,34 +388,34 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
|||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||
int code = 0;
|
||||
TBC* pCur = NULL;
|
||||
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* pKey = NULL;
|
||||
int kLen = 0;
|
||||
void* pVal = NULL;
|
||||
int vLen = 0;
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
STqHandle handle = {0};
|
||||
code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||
if (code < 0) {
|
||||
tqDestroyTqHandle(&handle);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
return code;
|
||||
}
|
||||
//int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||
// int code = 0;
|
||||
// TBC* pCur = NULL;
|
||||
// if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// void* pKey = NULL;
|
||||
// int kLen = 0;
|
||||
// void* pVal = NULL;
|
||||
// int vLen = 0;
|
||||
//
|
||||
// tdbTbcMoveToFirst(pCur);
|
||||
//
|
||||
// while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
// STqHandle handle = {0};
|
||||
// code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||
// if (code < 0) {
|
||||
// tqDestroyTqHandle(&handle);
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// tdbFree(pKey);
|
||||
// tdbFree(pVal);
|
||||
// tdbTbcClose(pCur);
|
||||
// return code;
|
||||
//}
|
||||
|
||||
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
|
||||
void* pVal = NULL;
|
||||
|
|
|
@ -159,6 +159,7 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa
|
|||
taosCloseFile(&pFile);
|
||||
return -1;
|
||||
}
|
||||
taosCloseFile(&pFile);
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -34,6 +34,8 @@ struct SVSnapReader {
|
|||
STqSnapReader *pTqSnapReader;
|
||||
int8_t tqOffsetDone;
|
||||
STqOffsetReader *pTqOffsetReader;
|
||||
int8_t tqCheckInfoDone;
|
||||
STqCheckInfoReader *pTqCheckInfoReader;
|
||||
// stream
|
||||
int8_t streamTaskDone;
|
||||
SStreamTaskReader *pStreamTaskReader;
|
||||
|
@ -81,6 +83,18 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
|
|||
metaSnapReaderClose(&pReader->pMetaReader);
|
||||
}
|
||||
|
||||
if (pReader->pTqSnapReader) {
|
||||
tqSnapReaderClose(&pReader->pTqSnapReader);
|
||||
}
|
||||
|
||||
if (pReader->pTqOffsetReader) {
|
||||
tqOffsetReaderClose(&pReader->pTqOffsetReader);
|
||||
}
|
||||
|
||||
if (pReader->pTqCheckInfoReader) {
|
||||
tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
|
||||
}
|
||||
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
|
||||
|
@ -181,6 +195,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
|||
}
|
||||
|
||||
// TQ ================
|
||||
vInfo("vgId:%d tq transform start", vgId);
|
||||
if (!pReader->tqHandleDone) {
|
||||
if (pReader->pTqSnapReader == NULL) {
|
||||
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader);
|
||||
|
@ -200,6 +215,25 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!pReader->tqCheckInfoDone) {
|
||||
if (pReader->pTqCheckInfoReader == NULL) {
|
||||
code = tqCheckInfoReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqCheckInfoReader);
|
||||
if (code < 0) goto _err;
|
||||
}
|
||||
|
||||
code = tqCheckInfoRead(pReader->pTqCheckInfoReader, ppData);
|
||||
if (code) {
|
||||
goto _err;
|
||||
} else {
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
} else {
|
||||
pReader->tqCheckInfoDone = 1;
|
||||
code = tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!pReader->tqOffsetDone) {
|
||||
if (pReader->pTqOffsetReader == NULL) {
|
||||
code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader);
|
||||
|
@ -334,6 +368,7 @@ struct SVSnapWriter {
|
|||
// tq
|
||||
STqSnapWriter *pTqSnapWriter;
|
||||
STqOffsetWriter *pTqOffsetWriter;
|
||||
STqCheckInfoWriter *pTqCheckInfoWriter;
|
||||
// stream
|
||||
SStreamTaskWriter *pStreamTaskWriter;
|
||||
SStreamStateWriter *pStreamStateWriter;
|
||||
|
@ -411,6 +446,21 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
|||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pTqSnapWriter) {
|
||||
code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pTqCheckInfoWriter) {
|
||||
code = tqCheckInfoWriterClose(&pWriter->pTqCheckInfoWriter, rollback);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pTqOffsetWriter) {
|
||||
code = tqOffsetWriterClose(&pWriter->pTqOffsetWriter, rollback);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pStreamTaskWriter) {
|
||||
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
|
||||
if (code) goto _exit;
|
||||
|
@ -519,8 +569,34 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
|||
if (code) goto _err;
|
||||
} break;
|
||||
case SNAP_DATA_TQ_HANDLE: {
|
||||
// tq handle
|
||||
if (pWriter->pTqSnapWriter == NULL) {
|
||||
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tqSnapWrite(pWriter->pTqSnapWriter, pData, nData);
|
||||
if (code) goto _err;
|
||||
} break;
|
||||
case SNAP_DATA_TQ_CHECKINFO: {
|
||||
// tq checkinfo
|
||||
if (pWriter->pTqCheckInfoWriter == NULL) {
|
||||
code = tqCheckInfoWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqCheckInfoWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tqCheckInfoWrite(pWriter->pTqCheckInfoWriter, pData, nData);
|
||||
if (code) goto _err;
|
||||
} break;
|
||||
case SNAP_DATA_TQ_OFFSET: {
|
||||
// tq offset
|
||||
if (pWriter->pTqOffsetWriter == NULL) {
|
||||
code = tqOffsetWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqOffsetWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tqOffsetSnapWrite(pWriter->pTqOffsetWriter, pData, nData);
|
||||
if (code) goto _err;
|
||||
} break;
|
||||
case SNAP_DATA_STREAM_TASK:
|
||||
case SNAP_DATA_STREAM_TASK_CHECKPOINT: {
|
||||
|
|
Loading…
Reference in New Issue