|
|
|
@ -9,6 +9,17 @@
|
|
|
|
|
#include "tpagedfile.h"
|
|
|
|
|
#include "tref.h"
|
|
|
|
|
#include "parser.h"
|
|
|
|
|
#include "planner.h"
|
|
|
|
|
#include "scheduler.h"
|
|
|
|
|
|
|
|
|
|
#define CHECK_CODE_GOTO(expr, lable) \
|
|
|
|
|
do { \
|
|
|
|
|
int32_t code = expr; \
|
|
|
|
|
if (TSDB_CODE_SUCCESS != code) { \
|
|
|
|
|
terrno = code; \
|
|
|
|
|
goto lable; \
|
|
|
|
|
} \
|
|
|
|
|
} while (0)
|
|
|
|
|
|
|
|
|
|
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
|
|
|
|
|
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
|
|
|
|
@ -113,11 +124,92 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
|
|
|
|
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool supportedQueryType(int32_t type) {
|
|
|
|
|
return (type == TDMT_MND_CREATE_USER || type == TDMT_MND_SHOW || type == TDMT_MND_DROP_USER ||
|
|
|
|
|
type == TDMT_MND_DROP_ACCT || type == TDMT_MND_CREATE_DB || type == TDMT_MND_CREATE_ACCT ||
|
|
|
|
|
type == TDMT_MND_CREATE_TABLE || type == TDMT_MND_CREATE_STB || type == TDMT_MND_USE_DB ||
|
|
|
|
|
type == TDMT_MND_DROP_DB || type == TDMT_MND_DROP_STB);
|
|
|
|
|
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
|
|
|
|
|
*pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
|
|
|
|
|
if (*pRequest == NULL) {
|
|
|
|
|
tscError("failed to malloc sqlObj");
|
|
|
|
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(*pRequest)->sqlstr = malloc(sqlLen + 1);
|
|
|
|
|
if ((*pRequest)->sqlstr == NULL) {
|
|
|
|
|
tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
|
|
|
|
|
(*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
|
|
|
|
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
|
|
|
|
|
(*pRequest)->sqlstr[sqlLen] = 0;
|
|
|
|
|
(*pRequest)->sqlLen = sqlLen;
|
|
|
|
|
|
|
|
|
|
tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
|
|
|
|
|
SParseContext cxt = {
|
|
|
|
|
.ctx = {.requestId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, .db = getConnectionDB(pRequest->pTscObj)},
|
|
|
|
|
.pSql = pRequest->sqlstr,
|
|
|
|
|
.sqlLen = pRequest->sqlLen,
|
|
|
|
|
.pMsg = pRequest->msgBuf,
|
|
|
|
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
|
|
|
|
};
|
|
|
|
|
int32_t code = qParseQuerySql(&cxt, pQuery);
|
|
|
|
|
tfree(cxt.ctx.db);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
|
|
|
|
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
|
|
|
|
|
pRequest->type = pDcl->msgType;
|
|
|
|
|
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};
|
|
|
|
|
|
|
|
|
|
SRequestMsgBody body = buildRequestMsgImpl(pRequest);
|
|
|
|
|
SEpSet* pEpSet = &pRequest->pTscObj->pAppInfo->mgmtEp.epSet;
|
|
|
|
|
|
|
|
|
|
if (pDcl->msgType == TDMT_MND_CREATE_TABLE) {
|
|
|
|
|
struct SCatalog* pCatalog = NULL;
|
|
|
|
|
|
|
|
|
|
char buf[12] = {0};
|
|
|
|
|
sprintf(buf, "%d", pRequest->pTscObj->pAppInfo->clusterId);
|
|
|
|
|
int32_t code = catalogGetHandle(buf, &pCatalog);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SCreateTableMsg* pMsg = body.msgInfo.pMsg;
|
|
|
|
|
|
|
|
|
|
SName t = {0};
|
|
|
|
|
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
|
|
|
|
|
|
|
|
|
char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
|
|
|
|
|
tNameGetFullDbName(&t, db);
|
|
|
|
|
|
|
|
|
|
SVgroupInfo info = {0};
|
|
|
|
|
catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
|
|
|
|
|
|
|
|
|
|
int64_t transporterId = 0;
|
|
|
|
|
SEpSet ep = {0};
|
|
|
|
|
ep.inUse = info.inUse;
|
|
|
|
|
ep.numOfEps = info.numOfEps;
|
|
|
|
|
for(int32_t i = 0; i < ep.numOfEps; ++i) {
|
|
|
|
|
ep.port[i] = info.epAddr[i].port;
|
|
|
|
|
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sendMsgToServer(pRequest->pTscObj->pTransporter, &ep, &body, &transporterId);
|
|
|
|
|
} else {
|
|
|
|
|
int64_t transporterId = 0;
|
|
|
|
|
sendMsgToServer(pRequest->pTscObj->pTransporter, pEpSet, &body, &transporterId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsem_wait(&pRequest->body.rspSem);
|
|
|
|
|
destroyRequestMsgBody(&body);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
|
|
|
|
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|
|
|
@ -130,93 +222,27 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|
|
|
|
|
|
|
|
|
nPrintTsc("%s", sql)
|
|
|
|
|
|
|
|
|
|
SRequestObj* pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
|
|
|
|
|
if (pRequest == NULL) {
|
|
|
|
|
tscError("failed to malloc sqlObj");
|
|
|
|
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRequest->sqlstr = malloc(sqlLen + 1);
|
|
|
|
|
if (pRequest->sqlstr == NULL) {
|
|
|
|
|
tscError("0x%"PRIx64" failed to prepare sql string buffer", pRequest->self);
|
|
|
|
|
|
|
|
|
|
pRequest->msgBuf = strdup("failed to prepare sql string buffer");
|
|
|
|
|
terrno = pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
|
|
|
return pRequest;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
strntolower(pRequest->sqlstr, sql, (int32_t)sqlLen);
|
|
|
|
|
pRequest->sqlstr[sqlLen] = 0;
|
|
|
|
|
|
|
|
|
|
tscDebugL("0x%"PRIx64" SQL: %s", pRequest->requestId, pRequest->sqlstr);
|
|
|
|
|
|
|
|
|
|
SParseContext cxt = {
|
|
|
|
|
.ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)},
|
|
|
|
|
.pSql = pRequest->sqlstr,
|
|
|
|
|
.sqlLen = sqlLen,
|
|
|
|
|
.pMsg = pRequest->msgBuf,
|
|
|
|
|
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
SRequestObj* pRequest = NULL;
|
|
|
|
|
SQueryNode* pQuery = NULL;
|
|
|
|
|
int32_t code = qParseQuerySql(&cxt, &pQuery);
|
|
|
|
|
if (qIsDclQuery(pQuery)) {
|
|
|
|
|
SDclStmtInfo* pDcl = (SDclStmtInfo*) pQuery;
|
|
|
|
|
pRequest->type = pDcl->msgType;
|
|
|
|
|
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};
|
|
|
|
|
SQueryDag* pDag = NULL;
|
|
|
|
|
void* pJob = NULL;
|
|
|
|
|
|
|
|
|
|
SRequestMsgBody body = buildRequestMsgImpl(pRequest);
|
|
|
|
|
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
|
|
|
|
|
|
|
|
|
if (pDcl->msgType == TDMT_MND_CREATE_TABLE) {
|
|
|
|
|
struct SCatalog* pCatalog = NULL;
|
|
|
|
|
|
|
|
|
|
char buf[12] = {0};
|
|
|
|
|
sprintf(buf, "%d", pTscObj->pAppInfo->clusterId);
|
|
|
|
|
code = catalogGetHandle(buf, &pCatalog);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
pRequest->code = code;
|
|
|
|
|
return pRequest;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SCreateTableMsg* pMsg = body.msgInfo.pMsg;
|
|
|
|
|
|
|
|
|
|
SName t = {0};
|
|
|
|
|
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
|
|
|
|
|
|
|
|
|
|
char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
|
|
|
|
|
tNameGetFullDbName(&t, db);
|
|
|
|
|
|
|
|
|
|
SVgroupInfo info = {0};
|
|
|
|
|
catalogGetTableHashVgroup(pCatalog, pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
|
|
|
|
|
|
|
|
|
|
int64_t transporterId = 0;
|
|
|
|
|
SEpSet ep = {0};
|
|
|
|
|
ep.inUse = info.inUse;
|
|
|
|
|
ep.numOfEps = info.numOfEps;
|
|
|
|
|
for(int32_t i = 0; i < ep.numOfEps; ++i) {
|
|
|
|
|
ep.port[i] = info.epAddr[i].port;
|
|
|
|
|
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId);
|
|
|
|
|
} else {
|
|
|
|
|
int64_t transporterId = 0;
|
|
|
|
|
sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsem_wait(&pRequest->body.rspSem);
|
|
|
|
|
destroyRequestMsgBody(&body);
|
|
|
|
|
terrno = TSDB_CODE_SUCCESS;
|
|
|
|
|
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
|
|
|
|
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
|
|
|
|
if (qIsDdlQuery(pQuery)) {
|
|
|
|
|
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
|
|
|
|
goto _return;
|
|
|
|
|
}
|
|
|
|
|
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
|
|
|
|
|
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
|
|
|
|
|
|
|
|
|
|
tfree(cxt.ctx.db);
|
|
|
|
|
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
pRequest->code = code;
|
|
|
|
|
return pRequest;
|
|
|
|
|
_return:
|
|
|
|
|
qDestoryQuery(pQuery);
|
|
|
|
|
qDestroyQueryDag(pDag);
|
|
|
|
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
|
|
|
|
|
pRequest->code = terrno;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return pRequest;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|