From c7d115d8aaa6f20b26575ec73ccdb269f4ec875b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 4 Mar 2024 14:58:13 +0800 Subject: [PATCH 1/2] fix:add excluded msg for delete in tmq --- include/common/tmsg.h | 3 +++ source/client/src/clientRawBlockWrite.c | 2 ++ source/common/src/tmsg.c | 9 +++++++++ source/dnode/vnode/src/tq/tqUtil.c | 3 +-- source/libs/qworker/src/qwMsg.c | 1 + source/libs/scheduler/src/schRemote.c | 1 + tests/script/sh/deploy.sh | 2 +- utils/test/c/tmq_taosx_ci.c | 1 + 8 files changed, 19 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 587e2f9f3e..e400698f69 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3813,8 +3813,10 @@ typedef struct { uint32_t phyLen; char* sql; char* msg; + int8_t source; } SVDeleteReq; +extern int8_t deleteFromTaosx; int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); @@ -3834,6 +3836,7 @@ typedef struct SDeleteRes { char tableFName[TSDB_TABLE_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN]; int64_t ctimeMs; // fill by vnode + int8_t source; } SDeleteRes; int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index f143624bab..3857e47f57 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1256,7 +1256,9 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); + deleteFromTaosx = TD_REQ_FROM_TAOX; TAOS_RES* res = taos_query(taos, sql); + deleteFromTaosx = TD_REQ_FROM_APP; SRequestObj* pRequest = (SRequestObj*)res; code = pRequest->code; if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 85f5d462c7..e7414c6004 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7148,6 +7148,7 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) { return 0; } +int8_t deleteFromTaosx = TD_REQ_FROM_APP; int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -7165,6 +7166,7 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeBinary(&encoder, pReq->msg, pReq->phyLen) < 0) return -1; + if (tEncodeI8(&encoder, pReq->source) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7201,6 +7203,9 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, &msgLen) < 0) return -1; pReq->phyLen = msgLen; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI8(&decoder, &pReq->source) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -8400,6 +8405,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1; if (tEncodeI64(pCoder, pRes->ctimeMs) < 0) return -1; + if (tEncodeI8(pCoder, pRes->source) < 0) return -1; return 0; } @@ -8424,6 +8430,9 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { if (!tDecodeIsEnd(pCoder)) { if (tDecodeI64(pCoder, &pRes->ctimeMs) < 0) return -1; } + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeI8(pCoder, &pRes->source) < 0) return -1; + } return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index dad1211326..6029575e2c 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -263,8 +263,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) { PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq) } else if (pHead->msgType == TDMT_VND_DELETE) { - fetchVer++; - continue; + PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes) } } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 66ec460861..faa90dcbf8 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -715,6 +715,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD uint64_t tId = req.taskId; int64_t rId = 0; int32_t eId = -1; + pRes->source = req.source; SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info}; QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1c0b31109e..f6465316d2 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1086,6 +1086,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, req.sqlLen = strlen(pJob->sql); req.sql = (char *)pJob->sql; req.msg = pTask->msg; + req.source = deleteFromTaosx; msgSize = tSerializeSVDeleteReq(NULL, 0, &req); msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 3b3d275a07..9af2525c74 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -117,7 +117,7 @@ echo "supportVnodes 1024" >> $TAOS_CFG echo "statusInterval 1" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG -echo "debugFlag 0" >> $TAOS_CFG +echo "debugFlag 135" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG echo "uDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 2257089f06..5012e50bab 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -574,6 +574,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); if (g_conf.snapShot) { tmq_conf_set(conf, "experimental.snapshot.enable", "true"); From af6d6b69c3380385a6effe88d0c557cc4735e693 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 5 Mar 2024 11:25:43 +0800 Subject: [PATCH 2/2] fix:add excluded msg for delete in tmq --- include/common/tmsg.h | 1 - include/libs/scheduler/scheduler.h | 1 + source/client/inc/clientInt.h | 5 +++-- source/client/src/clientImpl.c | 9 ++++++--- source/client/src/clientMain.c | 6 +++--- source/client/src/clientRawBlockWrite.c | 4 +--- source/common/src/tmsg.c | 1 - source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schJob.c | 1 + source/libs/scheduler/src/schRemote.c | 2 +- 10 files changed, 17 insertions(+), 14 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index e400698f69..4fa244a07c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3816,7 +3816,6 @@ typedef struct { int8_t source; } SVDeleteReq; -extern int8_t deleteFromTaosx; int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 958d63349d..952af3c443 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -78,6 +78,7 @@ typedef struct SSchedulerReq { void* chkKillParam; SExecResult* pExecRes; void** pFetchRes; + int8_t source; } SSchedulerReq; int32_t schedulerInit(void); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index f73d121b15..6cc2a62236 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -284,6 +284,7 @@ typedef struct SRequestObj { void* pWrapper; SMetaData parseMeta; char* effectiveUser; + int8_t source; } SRequestObj; typedef struct SSyncQueryParam { @@ -306,10 +307,10 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo); int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq); void syncCatalogFn(SMetaData* pResult, void* param, int32_t code); -TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly); +TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source); TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid); -void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly); +void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source); void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int64_t reqid); void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 971ed407f9..aa18ef1c94 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -729,6 +729,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .chkKillFp = chkRequestKilled, .chkKillParam = (void*)pRequest->self, .pExecRes = &res, + .source = pRequest->source, }; int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob); @@ -1198,6 +1199,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat .chkKillFp = chkRequestKilled, .chkKillParam = (void*)pRequest->self, .pExecRes = NULL, + .source = pRequest->source, }; code = schedulerExecJob(&req, &pRequest->body.queryJob); taosArrayDestroy(pNodeList); @@ -2461,7 +2463,7 @@ void syncQueryFn(void* param, void* res, int32_t code) { tsem_post(&pParam->sem); } -void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) { +void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source) { if (sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; if (fp) { @@ -2487,6 +2489,7 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, return; } + pRequest->source = source; pRequest->body.queryFp = fp; doAsyncQuery(pRequest, false); } @@ -2521,7 +2524,7 @@ void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_ doAsyncQuery(pRequest, false); } -TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { +TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) { if (NULL == taos) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; @@ -2536,7 +2539,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { } tsem_init(¶m->sem, 0, 0); - taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly); + taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source); tsem_wait(¶m->sem); SRequestObj* pRequest = NULL; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index f6aef5aa26..eacad0f18f 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -401,7 +401,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { return pResInfo->userFields; } -TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false); } +TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false, TD_REQ_FROM_APP); } TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqid) { return taosQueryImplWithReqid(taos, sql, false, reqid); } @@ -827,7 +827,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { } int taos_validate_sql(TAOS *taos, const char *sql) { - TAOS_RES *pObj = taosQueryImpl(taos, sql, true); + TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP); int code = taos_errno(pObj); @@ -1125,7 +1125,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest) void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { int64_t connId = *(int64_t *)taos; tscDebug("taos_query_a start with sql:%s", sql); - taosAsyncQueryImpl(connId, sql, fp, param, false); + taosAsyncQueryImpl(connId, sql, fp, param, false, TD_REQ_FROM_APP); tscDebug("taos_query_a end with sql:%s", sql); } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 3857e47f57..adc8c361cd 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1256,9 +1256,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); - deleteFromTaosx = TD_REQ_FROM_TAOX; - TAOS_RES* res = taos_query(taos, sql); - deleteFromTaosx = TD_REQ_FROM_APP; + TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX); SRequestObj* pRequest = (SRequestObj*)res; code = pRequest->code; if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e7414c6004..467300352c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7148,7 +7148,6 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) { return 0; } -int8_t deleteFromTaosx = TD_REQ_FROM_APP; int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 1c90e61ea3..276b668717 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -304,6 +304,7 @@ typedef struct SSchJob { SSchResInfo userRes; char *sql; SQueryProfileSummary summary; + int8_t source; } SSchJob; typedef struct SSchTaskCtx { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index e50ec64d54..48aab63ba3 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -746,6 +746,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp; pJob->userRes.cbParam = pReq->cbParam; + pJob->source = pReq->source; if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index f6465316d2..39273ffa50 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1086,7 +1086,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, req.sqlLen = strlen(pJob->sql); req.sql = (char *)pJob->sql; req.msg = pTask->msg; - req.source = deleteFromTaosx; + req.source = pJob->source; msgSize = tSerializeSVDeleteReq(NULL, 0, &req); msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) {