From ae5f6ef1752832028276870c550187130068293c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 3 Nov 2022 16:28:48 +0800 Subject: [PATCH 1/3] enh: add show rewrite mask in query msg --- include/common/tmsg.h | 5 +++++ source/common/src/tglobal.c | 3 +++ source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 3 ++- source/libs/scheduler/src/schRemote.c | 1 + 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7bc56daab0..1781374165 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -216,9 +216,14 @@ typedef struct SEp { uint16_t port; } SEp; +#define SHOW_REWRITE_MASK() (1 << 0) + +#define TEST_SHOW_REWRITE_MASK(m) ((m) & SHOW_REWRITE_MASK() != 0) + typedef struct { int32_t contLen; int32_t vgId; + int32_t msgMask; } SMsgHead; // Submit message for one table diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7f4a826c5e..de1347a52c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -786,6 +786,9 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { case 'd': { if (strcasecmp("dDebugFlag", name) == 0) { dDebugFlag = cfgGetItem(pCfg, "dDebugFlag")->i32; + } else if (strcasecmp("debugFlag", name) == 0) { + int32_t flag = cfgGetItem(pCfg, "debugFlag")->i32; + taosSetAllDebugFlag(flag, true); } break; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 40d0e32c2b..0c9f09644f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -145,6 +145,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); + pHead->msgMask = ntohl(pHead->msgMask); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { @@ -155,7 +156,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp switch (qtype) { case QUERY_QUEUE: - if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { + if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) && !TEST_SHOW_REWRITE_MASK(pHead->msgMask)) { terrno = TSDB_CODE_GRANT_EXPIRED; code = terrno; dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 47de2528fa..7aa70175da 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1047,6 +1047,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); + pMsg->header.msgMask = htonl((pTask->plan->showRewrite) ? SHOW_REWRITE_MASK() : 0); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); From 65636154ea38cba8e0ba8f30e84d4cb5d41144df Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 4 Nov 2022 10:47:49 +0800 Subject: [PATCH 2/3] fix: fix msg head issue --- source/dnode/mnode/impl/src/mndStb.c | 5 +++-- source/libs/sync/src/syncRaftEntry.c | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 641a8af437..b6f7e31638 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -466,7 +466,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt contLen += sizeof(SMsgHead); - SMsgHead *pHead = taosMemoryMalloc(contLen); + SMsgHead *pHead = taosMemoryCalloc(1, contLen); if (pHead == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -519,6 +519,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, pHead->contLen = htonl(contLen); pHead->vgId = htonl(pVgroup->vgId); + pHead->msgMask = htonl(0); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); @@ -2596,4 +2597,4 @@ const char *mndGetStbStr(const char *src) { if (posStb != NULL) ++posStb; if (posStb == NULL) return posDb; return posStb; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 6d372acf2f..aba61edf0d 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -72,6 +72,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) SMsgHead head; head.vgId = vgId; head.contLen = sizeof(SMsgHead); + head.msgMask = 0; SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(SRpcMsg)); rpcMsg.contLen = head.contLen; From 85520d8c70ebc43c9d6b98224ce6b1d510680c3a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 4 Nov 2022 15:10:42 +0800 Subject: [PATCH 3/3] fix: fix msg mask check issue --- include/common/tmsg.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1781374165..c0ac7da5bf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -218,7 +218,7 @@ typedef struct SEp { #define SHOW_REWRITE_MASK() (1 << 0) -#define TEST_SHOW_REWRITE_MASK(m) ((m) & SHOW_REWRITE_MASK() != 0) +#define TEST_SHOW_REWRITE_MASK(m) (((m) & SHOW_REWRITE_MASK()) != 0) typedef struct { int32_t contLen;