diff --git a/examples/c/tmq_taosx.c b/examples/c/tmq_taosx.c index 9dd42bef6e..7ef6415743 100644 --- a/examples/c/tmq_taosx.c +++ b/examples/c/tmq_taosx.c @@ -379,6 +379,8 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e5865c5782..912d3e4eee 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5562,7 +5562,7 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) { int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) { if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1; - if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { if (tEncodeI64(pEncoder, pOffsetVal->uid) < 0) return -1; if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { @@ -5577,7 +5577,7 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (tDecodeI8(pDecoder, &pOffsetVal->type) < 0) return -1; - if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) { if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1; if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { @@ -5600,7 +5600,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { snprintf(buf, maxLen, "offset(reset to latest)"); } else if (pVal->type == TMQ_OFFSET__LOG) { snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version); - } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { + } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts); } else { ASSERT(0); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index cdadc0a55d..064bad10a9 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -267,14 +267,14 @@ int32_t destroySnapContext(SSnapContext* ctx){ static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_t *contLen){ int32_t ret = 0; - SVCreateTbBatchReq reqs = {}; + SVCreateTbBatchReq reqs = {0}; reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq)); if (NULL == reqs.pArray){ ret = -1; goto end; } - taosArrayPush(reqs.pArray, &req); + taosArrayPush(reqs.pArray, req); reqs.nReqs = 1; tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret); @@ -289,7 +289,7 @@ static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_ goto end; } SEncoder coder = {0}; - tEncoderInit(&coder, *pBuf + sizeof(SMsgHead), *contLen); + tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen); if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) { taosMemoryFreeClear(*pBuf); tEncoderClear(&coder); @@ -317,7 +317,7 @@ static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *co } SEncoder encoder = {0}; - tEncoderInit(&encoder, *pBuf + sizeof(SMsgHead), *contLen); + tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen); if (tEncodeSVCreateStbReq(&encoder, req) < 0) { taosMemoryFreeClear(*pBuf); tEncoderClear(&encoder); @@ -418,7 +418,10 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in || (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) { STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t)); - ASSERT(data); + if(!data){ // if table has been deleted + tDecoderClear(&dc); + continue; + } SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f672647b96..76dffab319 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -206,7 +206,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { } tDecoderClear(&decoder); - if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) { tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts); } else if (offset.val.type == TMQ_OFFSET__LOG) {