fix:error in tmp for snapshot
This commit is contained in:
parent
9f9788c205
commit
507adf9e7b
|
@ -379,6 +379,8 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
|
||||
|
||||
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
|
||||
|
||||
|
|
|
@ -5562,7 +5562,7 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
|
|||
|
||||
int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) {
|
||||
if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1;
|
||||
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
if (tEncodeI64(pEncoder, pOffsetVal->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1;
|
||||
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
|
||||
|
@ -5577,7 +5577,7 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal)
|
|||
|
||||
int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
|
||||
if (tDecodeI8(pDecoder, &pOffsetVal->type) < 0) return -1;
|
||||
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1;
|
||||
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
|
||||
|
@ -5600,7 +5600,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
|||
snprintf(buf, maxLen, "offset(reset to latest)");
|
||||
} else if (pVal->type == TMQ_OFFSET__LOG) {
|
||||
snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version);
|
||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
snprintf(buf, maxLen, "offset(ss data) uid:%" PRId64 ", ts:%" PRId64, pVal->uid, pVal->ts);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -267,14 +267,14 @@ int32_t destroySnapContext(SSnapContext* ctx){
|
|||
|
||||
static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_t *contLen){
|
||||
int32_t ret = 0;
|
||||
SVCreateTbBatchReq reqs = {};
|
||||
SVCreateTbBatchReq reqs = {0};
|
||||
|
||||
reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
|
||||
if (NULL == reqs.pArray){
|
||||
ret = -1;
|
||||
goto end;
|
||||
}
|
||||
taosArrayPush(reqs.pArray, &req);
|
||||
taosArrayPush(reqs.pArray, req);
|
||||
reqs.nReqs = 1;
|
||||
|
||||
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
|
||||
|
@ -289,7 +289,7 @@ static int32_t buildNormalChildTableInfo(SVCreateTbReq *req, void **pBuf, int32_
|
|||
goto end;
|
||||
}
|
||||
SEncoder coder = {0};
|
||||
tEncoderInit(&coder, *pBuf + sizeof(SMsgHead), *contLen);
|
||||
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
|
||||
if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
|
||||
taosMemoryFreeClear(*pBuf);
|
||||
tEncoderClear(&coder);
|
||||
|
@ -317,7 +317,7 @@ static int32_t buildSuperTableInfo(SVCreateStbReq *req, void **pBuf, int32_t *co
|
|||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, *pBuf + sizeof(SMsgHead), *contLen);
|
||||
tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
|
||||
if (tEncodeSVCreateStbReq(&encoder, req) < 0) {
|
||||
taosMemoryFreeClear(*pBuf);
|
||||
tEncoderClear(&encoder);
|
||||
|
@ -418,7 +418,10 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
|
|||
|| (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
|
||||
|
||||
STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
|
||||
ASSERT(data);
|
||||
if(!data){ // if table has been deleted
|
||||
tDecoderClear(&dc);
|
||||
continue;
|
||||
}
|
||||
SVCreateTbReq req = {0};
|
||||
|
||||
req.type = TD_CHILD_TABLE;
|
||||
|
|
|
@ -206,7 +206,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
|
||||
offset.subKey, TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
|
||||
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||
|
|
Loading…
Reference in New Issue