Merge pull request #17930 from taosdata/enh/showWrite
enh: support grant check in vnode query
This commit is contained in:
commit
3e3354782c
|
@ -216,14 +216,9 @@ typedef struct SEp {
|
|||
uint16_t port;
|
||||
} SEp;
|
||||
|
||||
#define SHOW_REWRITE_MASK() (1 << 0)
|
||||
|
||||
#define TEST_SHOW_REWRITE_MASK(m) (((m) & SHOW_REWRITE_MASK()) != 0)
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
int32_t msgMask;
|
||||
} SMsgHead;
|
||||
|
||||
// Submit message for one table
|
||||
|
@ -1616,6 +1611,7 @@ typedef struct SSubQueryMsg {
|
|||
int8_t needFetch;
|
||||
uint32_t sqlLen; // the query sql,
|
||||
uint32_t phyLen;
|
||||
int32_t msgMask;
|
||||
char msg[];
|
||||
} SSubQueryMsg;
|
||||
|
||||
|
|
|
@ -57,6 +57,10 @@ typedef enum {
|
|||
#define QUERY_RSP_POLICY_DELAY 0
|
||||
#define QUERY_RSP_POLICY_QUICK 1
|
||||
|
||||
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
|
||||
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
|
||||
|
||||
|
||||
typedef struct STableComInfo {
|
||||
uint8_t numOfTags; // the number of tags in schema
|
||||
uint8_t precision; // the number of precision
|
||||
|
|
|
@ -76,7 +76,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
|
|||
|
||||
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant);
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
|
||||
|
||||
|
|
|
@ -145,7 +145,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
pHead->msgMask = ntohl(pHead->msgMask);
|
||||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||
if (pVnode == NULL) {
|
||||
|
@ -156,15 +155,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
|
||||
switch (qtype) {
|
||||
case QUERY_QUEUE:
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) && !TEST_SHOW_REWRITE_MASK(pHead->msgMask)) {
|
||||
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||
code = terrno;
|
||||
dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
|
||||
} else {
|
||||
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||
}
|
||||
break;
|
||||
case STREAM_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
|
||||
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return 0;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
|
||||
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg, false);
|
||||
}
|
||||
|
||||
void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
|
||||
|
|
|
@ -519,7 +519,6 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
|
|||
|
||||
pHead->contLen = htonl(contLen);
|
||||
pHead->vgId = htonl(pVgroup->vgId);
|
||||
pHead->msgMask = htonl(0);
|
||||
|
||||
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
|
||||
|
||||
|
|
|
@ -69,7 +69,7 @@ int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg);
|
||||
return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg, false);
|
||||
}
|
||||
|
||||
int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
|
||||
|
|
|
@ -329,7 +329,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
|
||||
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
|
||||
}
|
||||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
#include "tgrant.h"
|
||||
|
||||
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
|
||||
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
|
||||
|
@ -305,7 +306,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant) {
|
||||
if (NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
@ -326,6 +327,12 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
msg->execId = ntohl(msg->execId);
|
||||
msg->phyLen = ntohl(msg->phyLen);
|
||||
msg->sqlLen = ntohl(msg->sqlLen);
|
||||
msg->msgMask = ntohl(msg->msgMask);
|
||||
|
||||
if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg->msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
|
||||
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg->msgMask);
|
||||
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
|
||||
}
|
||||
|
||||
uint64_t sId = msg->sId;
|
||||
uint64_t qId = msg->queryId;
|
||||
|
|
|
@ -1047,7 +1047,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
|
||||
SSubQueryMsg *pMsg = msg;
|
||||
pMsg->header.vgId = htonl(addr->nodeId);
|
||||
pMsg->header.msgMask = htonl((pTask->plan->showRewrite) ? SHOW_REWRITE_MASK() : 0);
|
||||
pMsg->sId = htobe64(schMgmt.sId);
|
||||
pMsg->queryId = htobe64(pJob->queryId);
|
||||
pMsg->taskId = htobe64(pTask->taskId);
|
||||
|
@ -1058,6 +1057,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask);
|
||||
pMsg->phyLen = htonl(pTask->msgLen);
|
||||
pMsg->sqlLen = htonl(len);
|
||||
pMsg->msgMask = htonl((pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0);
|
||||
|
||||
memcpy(pMsg->msg, pJob->sql, len);
|
||||
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
|
||||
|
|
|
@ -72,7 +72,6 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
|
|||
SMsgHead head;
|
||||
head.vgId = vgId;
|
||||
head.contLen = sizeof(SMsgHead);
|
||||
head.msgMask = 0;
|
||||
SRpcMsg rpcMsg;
|
||||
memset(&rpcMsg, 0, sizeof(SRpcMsg));
|
||||
rpcMsg.contLen = head.contLen;
|
||||
|
|
Loading…
Reference in New Issue