feat: support uniq grant

This commit is contained in:
kailixu 2024-02-04 12:28:00 +08:00
parent f53d0bce5d
commit a9316c3710
18 changed files with 70 additions and 20 deletions

View File

@ -92,6 +92,8 @@ typedef struct SScanLogicNode {
STimeWindow scanRange;
SName tableName;
bool showRewrite;
bool isView;
bool isAudit;
double ratio;
SNodeList* pDynamicScanFuncs;
int32_t dataRequired;
@ -713,8 +715,10 @@ typedef struct SSubplan {
SNode* pTagCond;
SNode* pTagIndexCond;
bool showRewrite;
int32_t rowsThreshold;
bool isView;
bool isAudit;
bool dynamicRowThreshold;
int32_t rowsThreshold;
} SSubplan;
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;

View File

@ -517,6 +517,8 @@ typedef struct SQuery {
SArray* pTableList;
SArray* pDbList;
bool showRewrite;
// bool isView;
// bool isAudit;
int32_t placeholderNum;
SArray* pPlaceholderValues;
SNode* pPrepareRoot;

View File

@ -86,8 +86,10 @@ typedef struct SParseContext {
bool enableSysInfo;
bool async;
bool hasInvisibleCol;
const char* svrVer;
bool isView;
bool isAudit;
bool nodeOffline;
const char* svrVer;
SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos
int64_t allocatorId;

View File

@ -32,6 +32,8 @@ typedef struct SPlanContext {
bool streamQuery;
bool rSmaQuery;
bool showRewrite;
bool isView;
bool isAudit;
int8_t triggerType;
int64_t watermark;
int64_t deleteMark;

View File

@ -66,7 +66,12 @@ typedef enum {
#define QUERY_RSP_POLICY_QUICK 1
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
#define QUERY_MSG_MASK_VIEW() (1 << 1)
#define QUERY_MSG_MASK_AUDIT() (1 << 2)
#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
#define TEST_VIEW_MASK(m) (((m)&QUERY_MSG_MASK_VIEW()) != 0)
#define TEST_AUDIT_MASK(m) (((m)&QUERY_MSG_MASK_MASK()) != 0)
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema

View File

@ -404,8 +404,6 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *p
tscError("invalid grant info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
assert(0);
hbProcessGrantInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
@ -917,7 +915,7 @@ int32_t hbGetExpiredGrantInfo(SClientHbKey *connKey, struct SCatalog *pCatalog,
gv->version = htonl(gv->version);
}
tscDebug("hb got %d expired grant, valueLen:%u", grantNum, (int32_t)sizeof(SGrantVersion) * grantNum);
tscDebug("hb got %d expired grant, valueLen:%d", grantNum, (int32_t)(sizeof(SGrantVersion) * grantNum));
if (!req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);

View File

@ -458,6 +458,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.isView = pQuery->isView;
.isAudit = pQuery->isAudit;
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user,
@ -1154,6 +1156,8 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.isView = pWrapper->pParseCtx->isView,
.isAudit = pWrapper->pParseCtx->isAudit,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user,

View File

@ -423,6 +423,8 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_OBJECT_FIELD(scanRange, sizeof(STimeWindow));
COPY_OBJECT_FIELD(tableName, sizeof(SName));
COPY_SCALAR_FIELD(showRewrite);
COPY_SCALAR_FIELD(isView);
COPY_SCALAR_FIELD(isAudit);
COPY_SCALAR_FIELD(ratio);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
COPY_SCALAR_FIELD(dataRequired);

View File

@ -3332,6 +3332,12 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->isView);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->isAudit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanRowsThreshold, pNode->rowsThreshold);
}
@ -3379,6 +3385,12 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->isView);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->isAudit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanRowsThreshold, &pNode->rowsThreshold);
}

View File

@ -3930,6 +3930,12 @@ static int32_t subplanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->dynamicRowThreshold);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->isView);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->isAudit);
}
return code;
}
@ -3985,7 +3991,12 @@ static int32_t msgToSubplanInline(STlvDecoder* pDecoder, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->dynamicRowThreshold);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->isView);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->isAudit);
}
return code;
}

View File

@ -48,8 +48,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues);
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals,
uint32_t insertType);
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
int32_t initTableColSubmitData(STableDataCxt *pTableCxt);
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild);
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);

View File

@ -43,6 +43,8 @@ typedef struct STranslateContext {
bool createStream;
bool stableQuery;
bool showRewrite;
bool isView;
bool isAudit;
SNode* pPrevRoot;
SNode* pPostRoot;
} STranslateContext;

View File

@ -169,8 +169,7 @@ STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta) {
STableDataCxt* pTableCxt = NULL;
SVCreateTbReq* pCreateTbReq = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, &pTableCxt, false, false,
((SVnodeModifyOpStmt*)(query->pRoot))->insertType);
sizeof(pTableMeta->uid), pTableMeta, &pCreateTbReq, &pTableCxt, false, false);
if (ret != TSDB_CODE_SUCCESS) {
return NULL;
}
@ -314,8 +313,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
STableDataCxt* pTableCxt = NULL;
ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false,
((SVnodeModifyOpStmt*)(query->pRoot))->insertType);
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false, false);
if (ret != TSDB_CODE_SUCCESS) {
buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
goto end;

View File

@ -1333,7 +1333,7 @@ static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModif
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt** pTableCxt) {
if (pCxt->pComCxt->async) {
return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid),
pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false, pStmt->insertType);
pStmt->pTableMeta, &pStmt->pCreateTblReq, pTableCxt, false, false);
}
char tbFName[TSDB_TABLE_FNAME_LEN];
@ -1342,7 +1342,7 @@ static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
pStmt->pTableMeta->uid = 0;
}
return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
&pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb, false, pStmt->insertType);
&pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb, false);
}
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, STableDataCxt* pTableCxt) {
@ -1931,7 +1931,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
}
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true, pStmt->insertType);
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
initTableColSubmitData(*ppTableDataCxt);
if (code == TSDB_CODE_SUCCESS) {
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
@ -2157,6 +2157,9 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
pStmt->totalRowsNum += numOfRows;
pStmt->totalTbNum += 1;
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
if (rowsDataCxt.pTableDataCxt && rowsDataCxt.pTableDataCxt->pData) {
rowsDataCxt.pTableDataCxt->pData->flags |= SUBMIT_REQ_FROM_FILE;
}
if (!pStmt->fileProcessing) {
taosCloseFile(&pStmt->fp);
} else {

View File

@ -208,7 +208,7 @@ void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) {
void insDestroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput,
bool colMode, bool ignoreColVals, uint32_t insertType) {
bool colMode, bool ignoreColVals) {
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
if (NULL == pTableCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -249,7 +249,6 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
} else {
pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
pTableCxt->pData->flags |= insertType == TSDB_QUERY_TYPE_FILE_INSERT ? SUBMIT_REQ_FROM_FILE : 0;
pTableCxt->pData->suid = pTableMeta->suid;
pTableCxt->pData->uid = pTableMeta->uid;
pTableCxt->pData->sver = pTableMeta->sversion;
@ -331,7 +330,7 @@ static void resetColValues(SArray* pValues) {
}
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals, uint32_t insertType) {
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode, bool ignoreColVals) {
STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
if (NULL != tmp) {
*pTableCxt = *tmp;
@ -340,7 +339,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
}
return TSDB_CODE_SUCCESS;
}
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals, insertType);
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode, ignoreColVals);
if (TSDB_CODE_SUCCESS == code) {
void* pData = *pTableCxt; // deal scan coverity
code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
@ -646,7 +645,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false, ((SVnodeModifyOpStmt*)(query->pRoot))->insertType);
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false);
if (ret != TSDB_CODE_SUCCESS) {
uError("insGetTableDataCxt error");
goto end;

View File

@ -192,6 +192,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Out of memory";
case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS:
return "ORDER BY \"%s\" is ambiguous";
case TSDB_CODE_GRANT_EXPIRED:
return "License expired";
default:
return "Unknown error";
}

View File

@ -340,6 +340,8 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
strcpy(pScan->tableName.dbname, pRealTable->table.dbName);
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
pScan->isView = pCxt->pPlanCxt->isView;
pScan->isAudit = pCxt->pPlanCxt->isAudit;
pScan->ratio = pRealTable->ratio;
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
pScan->cacheLastMode = pRealTable->cacheLastMode;

View File

@ -1109,6 +1109,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
qMsg.refId = pJob->refId;
qMsg.execId = pTask->execId;
qMsg.msgMask = (pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0;
qMsg.msgMask |= (pTask->plan->isView) ? QUERY_MSG_MASK_VIEW() : 0;
qMsg.msgMask |= (pTask->plan->isAudit) ? QUERY_MSG_MASK_AUDIT() : 0;
qMsg.taskType = TASK_TYPE_TEMP;
qMsg.explain = SCH_IS_EXPLAIN_JOB(pJob);
qMsg.needFetch = SCH_TASK_NEED_FETCH(pTask);