Merge pull request #14215 from taosdata/feature/3.0_wxy
feat: subplan adds 'user' field
This commit is contained in:
commit
c3aabbdd5a
|
@ -458,6 +458,7 @@ typedef struct SSubplan {
|
|||
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
|
||||
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char user[TSDB_USER_LEN];
|
||||
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
|
||||
SQueryNodeStat execNodeStat; // only for scan subplan
|
||||
SNodeList* pChildren; // the datasource subplan,from which to fetch the result
|
||||
|
|
|
@ -36,6 +36,7 @@ typedef struct SPlanContext {
|
|||
int64_t watermark;
|
||||
char* pMsg;
|
||||
int32_t msgLen;
|
||||
const char* pUser;
|
||||
} SPlanContext;
|
||||
|
||||
// Create the physical plan for the query, according to the AST.
|
||||
|
|
|
@ -376,7 +376,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
|
|||
.pAstRoot = pQuery->pRoot,
|
||||
.showRewrite = pQuery->showRewrite,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pUser = pRequest->pTscObj->user};
|
||||
|
||||
return qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||
}
|
||||
|
@ -495,7 +496,6 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
|
||||
SArray* pDbVgList = NULL;
|
||||
SArray* pQnodeList = NULL;
|
||||
|
@ -609,7 +609,6 @@ _return:
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
||||
tsem_init(&schdRspSem, 0, 0);
|
||||
|
||||
|
@ -833,8 +832,8 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
|
|||
}
|
||||
}
|
||||
|
||||
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64,
|
||||
pRequest->self, code, tstrerror(code), pRequest->requestId);
|
||||
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
|
||||
tstrerror(code), pRequest->requestId);
|
||||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||
|
@ -956,7 +955,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultM
|
|||
.pAstRoot = pQuery->pRoot,
|
||||
.showRewrite = pQuery->showRewrite,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pUser = pRequest->pTscObj->user};
|
||||
|
||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pMnodeList);
|
||||
|
@ -1538,7 +1538,6 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
|
|||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
}
|
||||
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
int32_t lenTmp = numOfRows * sizeof(int32_t);
|
||||
|
@ -1655,7 +1654,6 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
pStart += len;
|
||||
pStart1 += len;
|
||||
memcpy(pStart1, pStart, colLen);
|
||||
|
||||
}
|
||||
pStart += colLen;
|
||||
pStart1 += colLen1;
|
||||
|
@ -1840,7 +1838,8 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, int32_t acctId, char* db) {
|
||||
int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
|
||||
int32_t acctId, char* db) {
|
||||
SName name;
|
||||
|
||||
if (len1 <= 0) {
|
||||
|
@ -1966,8 +1965,7 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
|
|||
continue;
|
||||
}
|
||||
|
||||
if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) ||
|
||||
('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
|
||||
if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
|
||||
('0' <= *(tbList + i) && '9' >= *(tbList + i))) {
|
||||
if (vLen[vIdx] > 0) {
|
||||
goto _return;
|
||||
|
@ -2000,7 +1998,6 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
|
|||
tsem_post(&pParam->sem);
|
||||
}
|
||||
|
||||
|
||||
void syncQueryFn(void* param, void* res, int32_t code) {
|
||||
SSyncQueryParam* pParam = param;
|
||||
pParam->pRequest = res;
|
||||
|
@ -2045,7 +2042,6 @@ void taosAsyncQueryImpl(TAOS *taos, const char *sql, __taos_async_fn_t fp, void
|
|||
doAsyncQuery(pRequest, false);
|
||||
}
|
||||
|
||||
|
||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
|
@ -2084,6 +2080,3 @@ TAOS_RES *taosQueryImpl(TAOS *taos, const char *sql, bool validateOnly) {
|
|||
return pRes;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -2294,6 +2294,7 @@ static const char* jkSubplanType = "SubplanType";
|
|||
static const char* jkSubplanMsgType = "MsgType";
|
||||
static const char* jkSubplanLevel = "Level";
|
||||
static const char* jkSubplanDbFName = "DbFName";
|
||||
static const char* jkSubplanUser = "User";
|
||||
static const char* jkSubplanNodeAddr = "NodeAddr";
|
||||
static const char* jkSubplanRootNode = "RootNode";
|
||||
static const char* jkSubplanDataSink = "DataSink";
|
||||
|
@ -2316,6 +2317,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkSubplanDbFName, pNode->dbFName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkSubplanUser, pNode->user);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode);
|
||||
}
|
||||
|
@ -2352,6 +2356,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkSubplanDbFName, pNode->dbFName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkSubplanUser, pNode->user);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode);
|
||||
}
|
||||
|
|
|
@ -1453,6 +1453,9 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
|
|||
pSubplan->id = pLogicSubplan->id;
|
||||
pSubplan->subplanType = pLogicSubplan->subplanType;
|
||||
pSubplan->level = pLogicSubplan->level;
|
||||
if (NULL != pCxt->pPlanCxt->pUser) {
|
||||
strcpy(pSubplan->user, pCxt->pPlanCxt->pUser);
|
||||
}
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
|
|
|
@ -85,8 +85,9 @@ class PlannerTestBaseImpl {
|
|||
public:
|
||||
PlannerTestBaseImpl() : sqlNo_(0) {}
|
||||
|
||||
void useDb(const string& acctId, const string& db) {
|
||||
caseEnv_.acctId_ = acctId;
|
||||
void useDb(const string& user, const string& db) {
|
||||
caseEnv_.acctId_ = 0;
|
||||
caseEnv_.user_ = user;
|
||||
caseEnv_.db_ = db;
|
||||
caseEnv_.nsql_ = g_skipSql;
|
||||
}
|
||||
|
@ -193,7 +194,8 @@ class PlannerTestBaseImpl {
|
|||
|
||||
private:
|
||||
struct caseEnv {
|
||||
string acctId_;
|
||||
int32_t acctId_;
|
||||
string user_;
|
||||
string db_;
|
||||
int32_t nsql_;
|
||||
|
||||
|
@ -295,7 +297,7 @@ class PlannerTestBaseImpl {
|
|||
transform(stmtEnv_.sql_.begin(), stmtEnv_.sql_.end(), stmtEnv_.sql_.begin(), ::tolower);
|
||||
|
||||
SParseContext cxt = {0};
|
||||
cxt.acctId = atoi(caseEnv_.acctId_.c_str());
|
||||
cxt.acctId = caseEnv_.acctId_;
|
||||
cxt.db = caseEnv_.db_.c_str();
|
||||
cxt.pSql = stmtEnv_.sql_.c_str();
|
||||
cxt.sqlLen = stmtEnv_.sql_.length();
|
||||
|
@ -319,12 +321,13 @@ class PlannerTestBaseImpl {
|
|||
|
||||
void doParseBoundSql(SQuery* pQuery) {
|
||||
SParseContext cxt = {0};
|
||||
cxt.acctId = atoi(caseEnv_.acctId_.c_str());
|
||||
cxt.acctId = caseEnv_.acctId_;
|
||||
cxt.db = caseEnv_.db_.c_str();
|
||||
cxt.pSql = stmtEnv_.sql_.c_str();
|
||||
cxt.sqlLen = stmtEnv_.sql_.length();
|
||||
cxt.pMsg = stmtEnv_.msgBuf_.data();
|
||||
cxt.msgLen = stmtEnv_.msgBuf_.max_size();
|
||||
cxt.pUser = caseEnv_.user_.c_str();
|
||||
|
||||
DO_WITH_THROW(qStmtParseQuerySql, &cxt, pQuery);
|
||||
res_.ast_ = toString(pQuery->pRoot);
|
||||
|
@ -364,6 +367,7 @@ class PlannerTestBaseImpl {
|
|||
|
||||
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
|
||||
pCxt->queryId = 1;
|
||||
pCxt->pUser = caseEnv_.user_.c_str();
|
||||
if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
|
||||
pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
|
||||
pCxt->topicQuery = true;
|
||||
|
@ -403,7 +407,7 @@ PlannerTestBase::PlannerTestBase() : impl_(new PlannerTestBaseImpl()) {}
|
|||
|
||||
PlannerTestBase::~PlannerTestBase() {}
|
||||
|
||||
void PlannerTestBase::useDb(const std::string& acctId, const std::string& db) { impl_->useDb(acctId, db); }
|
||||
void PlannerTestBase::useDb(const std::string& user, const std::string& db) { impl_->useDb(user, db); }
|
||||
|
||||
void PlannerTestBase::run(const std::string& sql) { return impl_->run(sql); }
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ class PlannerTestBase : public testing::Test {
|
|||
PlannerTestBase();
|
||||
virtual ~PlannerTestBase();
|
||||
|
||||
void useDb(const std::string& acctId, const std::string& db);
|
||||
void useDb(const std::string& user, const std::string& db);
|
||||
void run(const std::string& sql);
|
||||
// stmt mode APIs
|
||||
void prepare(const std::string& sql);
|
||||
|
|
Loading…
Reference in New Issue