From c7d115d8aaa6f20b26575ec73ccdb269f4ec875b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 4 Mar 2024 14:58:13 +0800 Subject: [PATCH] fix:add excluded msg for delete in tmq --- include/common/tmsg.h | 3 +++ source/client/src/clientRawBlockWrite.c | 2 ++ source/common/src/tmsg.c | 9 +++++++++ source/dnode/vnode/src/tq/tqUtil.c | 3 +-- source/libs/qworker/src/qwMsg.c | 1 + source/libs/scheduler/src/schRemote.c | 1 + tests/script/sh/deploy.sh | 2 +- utils/test/c/tmq_taosx_ci.c | 1 + 8 files changed, 19 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 587e2f9f3e..e400698f69 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3813,8 +3813,10 @@ typedef struct { uint32_t phyLen; char* sql; char* msg; + int8_t source; } SVDeleteReq; +extern int8_t deleteFromTaosx; int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); @@ -3834,6 +3836,7 @@ typedef struct SDeleteRes { char tableFName[TSDB_TABLE_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN]; int64_t ctimeMs; // fill by vnode + int8_t source; } SDeleteRes; int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index f143624bab..3857e47f57 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1256,7 +1256,9 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); + deleteFromTaosx = TD_REQ_FROM_TAOX; TAOS_RES* res = taos_query(taos, sql); + deleteFromTaosx = TD_REQ_FROM_APP; SRequestObj* pRequest = (SRequestObj*)res; code = pRequest->code; if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 85f5d462c7..e7414c6004 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7148,6 +7148,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) { return 0; } +int8_t deleteFromTaosx = TD_REQ_FROM_APP; int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -7165,6 +7166,7 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeBinary(&encoder, pReq->msg, pReq->phyLen) < 0) return -1; + if (tEncodeI8(&encoder, pReq->source) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7201,6 +7203,9 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, &msgLen) < 0) return -1; pReq->phyLen = msgLen; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->source) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -8400,6 +8405,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1; if (tEncodeI64(pCoder, pRes->ctimeMs) < 0) return -1; + if (tEncodeI8(pCoder, pRes->source) < 0) return -1; return 0; } @@ -8424,6 +8430,9 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { if (!tDecodeIsEnd(pCoder)) { if (tDecodeI64(pCoder, &pRes->ctimeMs) < 0) return -1; } + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI8(pCoder, &pRes->source) < 0) return -1; + } return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index dad1211326..6029575e2c 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -263,8 +263,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) } else if (pHead->msgType == TDMT_VND_DELETE) { - fetchVer++; - continue; + PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes) } } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 66ec460861..faa90dcbf8 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -715,6 +715,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD uint64_t tId = req.taskId; int64_t rId = 0; int32_t eId = -1; + pRes->source = req.source; SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info}; QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1c0b31109e..f6465316d2 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1086,6 +1086,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, req.sqlLen = strlen(pJob->sql); req.sql = (char *)pJob->sql; req.msg = pTask->msg; + req.source = deleteFromTaosx; msgSize = tSerializeSVDeleteReq(NULL, 0, &req); msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 3b3d275a07..9af2525c74 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -117,7 +117,7 @@ echo "supportVnodes 1024" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG -echo "debugFlag 0" >> $TAOS_CFG +echo "debugFlag 135" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 2257089f06..5012e50bab 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -574,6 +574,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); if (g_conf.snapShot) { tmq_conf_set(conf, "experimental.snapshot.enable", "true");