Merge pull request #13851 from taosdata/feature/showqueries
feat: support some show and kill commands
This commit is contained in:
commit
69eb9daddb
|
@ -2001,16 +2001,17 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t tid;
|
||||
int32_t status;
|
||||
char status[TSDB_JOB_STATUS_LEN];
|
||||
} SQuerySubDesc;
|
||||
|
||||
typedef struct {
|
||||
char sql[TSDB_SHOW_SQL_LEN];
|
||||
uint64_t queryId;
|
||||
int64_t useconds;
|
||||
int64_t stime;
|
||||
int64_t stime; // timestamp precision ms
|
||||
int64_t reqRid;
|
||||
int32_t pid;
|
||||
bool stableQuery;
|
||||
char fqdn[TSDB_FQDN_LEN];
|
||||
int32_t subPlanNum;
|
||||
SArray* subDesc; // SArray<SQuerySubDesc>
|
||||
|
|
|
@ -351,6 +351,7 @@ typedef struct SQuery {
|
|||
int32_t placeholderNum;
|
||||
SArray* pPlaceholderValues;
|
||||
SNode* pPrepareRoot;
|
||||
bool stableQuery;
|
||||
} SQuery;
|
||||
|
||||
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
|
||||
|
|
|
@ -243,6 +243,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_USET_PASSWORD_LEN 129
|
||||
#define TSDB_VERSION_LEN 12
|
||||
#define TSDB_LABEL_LEN 8
|
||||
#define TSDB_JOB_STATUS_LEN 32
|
||||
|
||||
#define TSDB_CLUSTER_ID_LEN 40
|
||||
#define TSDB_FQDN_LEN 128
|
||||
|
|
|
@ -213,6 +213,7 @@ typedef struct SRequestObj {
|
|||
SArray* tableList;
|
||||
SQueryExecMetric metric;
|
||||
SRequestSendRecvBody body;
|
||||
bool stableQuery;
|
||||
|
||||
uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog
|
||||
uint32_t retry;
|
||||
|
@ -294,7 +295,7 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
|
|||
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
|
||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
||||
|
||||
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType);
|
||||
|
||||
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
|
||||
|
@ -305,6 +306,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
|
|||
|
||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||
|
||||
void taos_close_internal(void *taos);
|
||||
|
||||
// --- heartbeat
|
||||
// global, called by mgmt
|
||||
int hbMgrInit();
|
||||
|
|
|
@ -107,7 +107,7 @@ typedef struct STscStmt {
|
|||
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
TAOS_STMT *stmtInit(TAOS *taos);
|
||||
TAOS_STMT *stmtInit(STscObj* taos);
|
||||
int stmtClose(TAOS_STMT *stmt);
|
||||
int stmtExec(TAOS_STMT *stmt);
|
||||
const char *stmtErrstr(TAOS_STMT *stmt);
|
||||
|
|
|
@ -174,7 +174,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
}
|
||||
|
||||
if (pRsp->query->killConnection) {
|
||||
taos_close(pTscObj);
|
||||
taos_close_internal(pTscObj);
|
||||
}
|
||||
|
||||
if (pRsp->query->pQnodeList) {
|
||||
|
@ -310,11 +310,12 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
|||
}
|
||||
|
||||
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
|
||||
desc.stime = pRequest->metric.start;
|
||||
desc.stime = pRequest->metric.start / 1000;
|
||||
desc.queryId = pRequest->requestId;
|
||||
desc.useconds = now - pRequest->metric.start;
|
||||
desc.reqRid = pRequest->self;
|
||||
desc.pid = hbBasic->pid;
|
||||
desc.stableQuery = pRequest->stableQuery;
|
||||
taosGetFqdn(desc.fqdn);
|
||||
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
|
||||
|
||||
|
@ -329,6 +330,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
|||
if (code) {
|
||||
taosArrayDestroy(desc.subDesc);
|
||||
desc.subDesc = NULL;
|
||||
desc.subPlanNum = 0;
|
||||
}
|
||||
} else {
|
||||
desc.subDesc = NULL;
|
||||
|
@ -350,19 +352,24 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
|||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
|
||||
if (numOfQueries <= 0) {
|
||||
releaseTscObj(connKey->tscRid);
|
||||
tscDebug("no queries on connection");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
|
||||
if (NULL == hbBasic) {
|
||||
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
|
||||
releaseTscObj(connKey->tscRid);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
hbBasic->connId = pTscObj->connId;
|
||||
hbBasic->pid = taosGetPId();
|
||||
taosGetAppName(hbBasic->app, NULL);
|
||||
|
||||
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
|
||||
if (numOfQueries <= 0) {
|
||||
req->query = hbBasic;
|
||||
releaseTscObj(connKey->tscRid);
|
||||
tscDebug("no queries on connection");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc));
|
||||
if (NULL == hbBasic->queryDesc) {
|
||||
|
@ -372,9 +379,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
|||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
hbBasic->connId = pTscObj->connId;
|
||||
hbBasic->pid = taosGetPId();
|
||||
taosGetAppName(hbBasic->app, NULL);
|
||||
|
||||
int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
|
||||
if (code) {
|
||||
|
|
|
@ -58,7 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
|
|||
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo, int connType);
|
||||
|
||||
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType) {
|
||||
if (taos_init() != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
|
@ -692,6 +692,8 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
|||
return pRequest;
|
||||
}
|
||||
|
||||
pRequest->stableQuery = pQuery->stableQuery;
|
||||
|
||||
return launchQueryImpl(pRequest, pQuery, false, NULL);
|
||||
}
|
||||
|
||||
|
@ -917,7 +919,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
|
|||
|
||||
terrno = pRequest->code;
|
||||
destroyRequest(pRequest);
|
||||
taos_close(pTscObj);
|
||||
taos_close_internal(pTscObj);
|
||||
pTscObj = NULL;
|
||||
} else {
|
||||
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
|
||||
|
@ -952,8 +954,8 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
|
|||
taosMemoryFreeClear(db);
|
||||
|
||||
connectReq.connType = pObj->connType;
|
||||
connectReq.pid = htonl(appInfo.pid);
|
||||
connectReq.startTime = htobe64(appInfo.startTime);
|
||||
connectReq.pid = appInfo.pid;
|
||||
connectReq.startTime = appInfo.startTime;
|
||||
|
||||
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
|
||||
tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
|
||||
|
@ -1081,7 +1083,12 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
|
|||
return NULL;
|
||||
}
|
||||
|
||||
return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
|
||||
STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
|
||||
if (pObj) {
|
||||
return (TAOS*)pObj->id;
|
||||
}
|
||||
|
||||
return (TAOS*)0;
|
||||
}
|
||||
|
||||
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
|
||||
|
|
|
@ -97,10 +97,15 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
|||
pass = TSDB_DEFAULT_PASS;
|
||||
}
|
||||
|
||||
return taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
|
||||
STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
|
||||
if (pObj) {
|
||||
return (TAOS*)pObj->id;
|
||||
}
|
||||
|
||||
return (TAOS*)0;
|
||||
}
|
||||
|
||||
void taos_close(TAOS *taos) {
|
||||
void taos_close_internal(void *taos) {
|
||||
if (taos == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -111,6 +116,17 @@ void taos_close(TAOS *taos) {
|
|||
taosRemoveRef(clientConnRefPool, pTscObj->id);
|
||||
}
|
||||
|
||||
void taos_close(TAOS *taos) {
|
||||
STscObj* pObj = acquireTscObj((int64_t)taos);
|
||||
if (NULL == pObj) {
|
||||
return;
|
||||
}
|
||||
|
||||
taos_close_internal(pObj);
|
||||
releaseTscObj((int64_t)taos);
|
||||
}
|
||||
|
||||
|
||||
int taos_errno(TAOS_RES *tres) {
|
||||
if (tres == NULL) {
|
||||
return terrno;
|
||||
|
@ -190,29 +206,36 @@ static void syncQueryFn(void *param, void *res, int32_t code) {
|
|||
}
|
||||
|
||||
TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
||||
if (taos == NULL || sql == NULL) {
|
||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
||||
if (pTscObj == NULL || sql == NULL) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = (STscObj *)taos;
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
||||
tsem_init(¶m->sem, 0, 0);
|
||||
|
||||
taos_query_a(pTscObj, sql, syncQueryFn, param);
|
||||
taos_query_a(taos, sql, syncQueryFn, param);
|
||||
tsem_wait(¶m->sem);
|
||||
|
||||
releaseTscObj((int64_t)taos);
|
||||
|
||||
return param->pRequest;
|
||||
#else
|
||||
size_t sqlLen = strlen(sql);
|
||||
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||
releaseTscObj((int64_t)taos);
|
||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return execQuery(pTscObj, sql, sqlLen);
|
||||
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen);
|
||||
|
||||
releaseTscObj((int64_t)taos);
|
||||
|
||||
return pRes;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -429,13 +452,15 @@ int taos_result_precision(TAOS_RES *res) {
|
|||
}
|
||||
|
||||
int taos_select_db(TAOS *taos, const char *db) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
STscObj* pObj = acquireTscObj((int64_t)taos);
|
||||
if (pObj == NULL) {
|
||||
releaseTscObj((int64_t)taos);
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return TSDB_CODE_TSC_DISCONNECTED;
|
||||
}
|
||||
|
||||
if (db == NULL || strlen(db) == 0) {
|
||||
releaseTscObj((int64_t)taos);
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
return terrno;
|
||||
}
|
||||
|
@ -447,6 +472,7 @@ int taos_select_db(TAOS *taos, const char *db) {
|
|||
int32_t code = taos_errno(pRequest);
|
||||
|
||||
taos_free_result(pRequest);
|
||||
releaseTscObj((int64_t)taos);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -593,19 +619,26 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
|||
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
|
||||
|
||||
void taos_reset_current_db(TAOS *taos) {
|
||||
if (taos == NULL) {
|
||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
||||
if (pTscObj == NULL) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return;
|
||||
}
|
||||
|
||||
resetConnectDB(taos);
|
||||
resetConnectDB(pTscObj);
|
||||
|
||||
releaseTscObj((int64_t)taos);
|
||||
}
|
||||
|
||||
const char *taos_get_server_info(TAOS *taos) {
|
||||
if (taos == NULL) {
|
||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
||||
if (pTscObj == NULL) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = (STscObj *)taos;
|
||||
releaseTscObj((int64_t)taos);
|
||||
|
||||
return pTscObj->ver;
|
||||
}
|
||||
|
||||
|
@ -637,6 +670,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
|||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
|
||||
pRequest->stableQuery = pQuery->stableQuery;
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
@ -670,10 +704,14 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
|||
}
|
||||
|
||||
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
||||
ASSERT(fp != NULL);
|
||||
|
||||
if (taos == NULL || sql == NULL) {
|
||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
||||
if (pTscObj == NULL || sql == NULL || NULL == fp) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
if (pTscObj) {
|
||||
releaseTscObj((int64_t)taos);
|
||||
} else {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
}
|
||||
fp(param, NULL, terrno);
|
||||
return;
|
||||
}
|
||||
|
@ -688,7 +726,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
|
|||
}
|
||||
|
||||
SRequestObj *pRequest = NULL;
|
||||
int32_t code = buildRequest(taos, sql, sqlLen, &pRequest);
|
||||
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
fp(param, NULL, terrno);
|
||||
|
@ -888,13 +926,18 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
|||
}
|
||||
|
||||
TAOS_STMT *taos_stmt_init(TAOS *taos) {
|
||||
if (taos == NULL) {
|
||||
tscError("NULL parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
STscObj* pObj = acquireTscObj((int64_t)taos);
|
||||
if (NULL == pObj) {
|
||||
tscError("invalid parameter for %s", __FUNCTION__);
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return stmtInit(taos);
|
||||
TAOS_STMT* pStmt = stmtInit(pObj);
|
||||
|
||||
releaseTscObj((int64_t)taos);
|
||||
|
||||
return pStmt;
|
||||
}
|
||||
|
||||
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
|
||||
|
|
|
@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
|||
case SCHEMA_ACTION_ADD_COLUMN: {
|
||||
int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
|
||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
const char *errStr = taos_errstr(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
|||
case SCHEMA_ACTION_ADD_TAG: {
|
||||
int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
|
||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
const char *errStr = taos_errstr(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
|||
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
|
||||
int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
|
||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||
|
@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
|||
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
|
||||
int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
|
||||
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
|
||||
TAOS_RES *res = taos_query(info->taos, result); // TODO async doAsyncQuery
|
||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
|
||||
code = taos_errno(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||
|
@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
|
|||
pos--;
|
||||
++freeBytes;
|
||||
outBytes = snprintf(pos, freeBytes, ")");
|
||||
TAOS_RES *res = taos_query(info->taos, result);
|
||||
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result);
|
||||
code = taos_errno(res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
|
||||
|
@ -1453,9 +1453,9 @@ static void smlDestroyInfo(SSmlHandle *info) {
|
|||
taosMemoryFreeClear(info);
|
||||
}
|
||||
|
||||
static SSmlHandle *smlBuildSmlInfo(TAOS *taos, SRequestObj *request, SMLProtocolType protocol, int8_t precision) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
|
||||
static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
|
||||
if (NULL == info) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1476,7 +1476,7 @@ static SSmlHandle *smlBuildSmlInfo(TAOS *taos, SRequestObj *request, SMLProtocol
|
|||
}
|
||||
((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
info->taos = (STscObj *)taos;
|
||||
info->taos = pTscObj;
|
||||
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
|
||||
|
@ -2433,14 +2433,22 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
|
|||
*
|
||||
*/
|
||||
|
||||
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
|
||||
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
|
||||
if (!request) {
|
||||
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
|
||||
STscObj* pTscObj = acquireTscObj((int64_t)taos);
|
||||
if (NULL == pTscObj) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
uError("SML:taos_schemaless_insert invalid taos");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
|
||||
if(!request){
|
||||
releaseTscObj((int64_t)taos);
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
((STscObj *)taos)->schemalessType = 1;
|
||||
pTscObj->schemalessType = 1;
|
||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
||||
|
||||
int cnt = ceil(((double)numLines) / LINE_BATCH);
|
||||
|
@ -2455,7 +2463,7 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (isSchemalessDb(((STscObj *)taos), request) != TSDB_CODE_SUCCESS) {
|
||||
if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){
|
||||
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
|
||||
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
|
||||
goto end;
|
||||
|
@ -2481,14 +2489,14 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
|||
}
|
||||
|
||||
for (int i = 0; i < cnt; ++i) {
|
||||
SRequestObj *req = (SRequestObj *)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
|
||||
if (!req) {
|
||||
SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
|
||||
if(!req){
|
||||
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
goto end;
|
||||
}
|
||||
SSmlHandle *info = smlBuildSmlInfo(taos, req, (SMLProtocolType)protocol, precision);
|
||||
if (!info) {
|
||||
SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
|
||||
if(!info){
|
||||
request->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
uError("SML:taos_schemaless_insert error SSmlHandle is null");
|
||||
goto end;
|
||||
|
@ -2520,8 +2528,9 @@ TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int pr
|
|||
end:
|
||||
taosThreadSpinDestroy(¶ms.lock);
|
||||
tsem_destroy(¶ms.sem);
|
||||
// ((STscObj *)taos)->schemalessType = 0;
|
||||
((STscObj *)taos)->schemalessType = 1;
|
||||
// ((STscObj *)taos)->schemalessType = 0;
|
||||
pTscObj->schemalessType = 1;
|
||||
uDebug("resultend:%s", request->msgBuf);
|
||||
return (TAOS_RES *)request;
|
||||
releaseTscObj((int64_t)taos);
|
||||
return (TAOS_RES*)request;
|
||||
}
|
||||
|
|
|
@ -478,7 +478,7 @@ int32_t stmtResetStmt(STscStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
TAOS_STMT* stmtInit(TAOS* taos) {
|
||||
TAOS_STMT* stmtInit(STscObj* taos) {
|
||||
STscObj* pObj = (STscObj*)taos;
|
||||
STscStmt* pStmt = NULL;
|
||||
|
||||
|
|
|
@ -257,8 +257,8 @@ static const SSysTableMeta infosMeta[] = {
|
|||
static const SSysDbTableSchema connectionsSchema[] = {
|
||||
{.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
|
||||
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "program", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
|
||||
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "login_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
|
@ -302,19 +302,18 @@ static const SSysDbTableSchema offsetSchema[] = {
|
|||
};
|
||||
|
||||
static const SSysDbTableSchema querySchema[] = {
|
||||
{.name = "query_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "query_id", .bytes = 26 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "req_id", .bytes = 8, .type = TSDB_DATA_TYPE_UBIGINT},
|
||||
{.name = "connId", .bytes = 4, .type = TSDB_DATA_TYPE_UINT},
|
||||
{.name = "app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "qid", .bytes = 22 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "time", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||
{.name = "sql_obj_id", .bytes = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "ep", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "exec_usec", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||
{.name = "stable_query", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
|
||||
{.name = "sub_queries", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "sub_query_info", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
};
|
||||
|
||||
|
|
|
@ -210,6 +210,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
|
|||
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, desc->pid) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1;
|
||||
|
||||
|
@ -218,7 +219,7 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
|
|||
for (int32_t m = 0; m < snum; ++m) {
|
||||
SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m);
|
||||
if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, sDesc->status) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, sDesc->status) < 0) return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -265,6 +266,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
|
|||
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, (int8_t*)&desc.stableQuery) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1;
|
||||
|
||||
|
@ -277,7 +279,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
|
|||
for (int32_t m = 0; m < snum; ++m) {
|
||||
SQuerySubDesc sDesc = {0};
|
||||
if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &sDesc.status) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, sDesc.status) < 0) return -1;
|
||||
taosArrayPush(desc.subDesc, &sDesc);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ typedef struct {
|
|||
int64_t lastAccessTimeMs;
|
||||
uint64_t killId;
|
||||
int32_t numOfQueries;
|
||||
SRWLatch queryLock;
|
||||
SArray *pQueries; // SArray<SQueryDesc>
|
||||
} SConnObj;
|
||||
|
||||
|
@ -53,8 +54,8 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq);
|
|||
static int32_t mndProcessConnectReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessKillConnReq(SRpcMsg *pReq);
|
||||
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitProfile(SMnode *pMnode) {
|
||||
|
@ -74,9 +75,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
||||
|
||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
|
||||
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
|
||||
|
||||
return 0;
|
||||
|
@ -129,7 +130,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType
|
|||
}
|
||||
|
||||
static void mndFreeConn(SConnObj *pConn) {
|
||||
taosWLockLatch(&pConn->queryLock);
|
||||
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
|
||||
taosWUnLockLatch(&pConn->queryLock);
|
||||
|
||||
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
|
||||
}
|
||||
|
@ -222,8 +225,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
goto CONN_OVER;
|
||||
}
|
||||
|
||||
mndAcquireConn(pMnode, pConn->id);
|
||||
|
||||
SConnectRsp connectRsp = {0};
|
||||
connectRsp.acctId = pUser->acctId;
|
||||
connectRsp.superUser = pUser->superUser;
|
||||
|
@ -259,12 +260,17 @@ CONN_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
|
||||
taosWLockLatch(&pConn->queryLock);
|
||||
|
||||
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
|
||||
|
||||
pConn->pQueries = pBasic->queryDesc;
|
||||
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
|
||||
pBasic->queryDesc = NULL;
|
||||
|
||||
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
|
||||
mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries);
|
||||
|
||||
taosWUnLockLatch(&pConn->queryLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -354,13 +360,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
} else {
|
||||
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
|
||||
}
|
||||
} else if (pConn->killed) {
|
||||
mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id);
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
terrno = TSDB_CODE_MND_INVALID_CONNECTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
|
||||
if (rspBasic == NULL) {
|
||||
mndReleaseConn(pMnode, pConn);
|
||||
|
@ -389,6 +390,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
mndReleaseConn(pMnode, pConn);
|
||||
|
||||
hbRsp.query = rspBasic;
|
||||
} else {
|
||||
mDebug("no query info in hb msg");
|
||||
}
|
||||
|
||||
int32_t kvNum = taosHashGetSize(pHbReq->info);
|
||||
|
@ -559,76 +562,13 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t numOfRows = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
int32_t cols = 0;
|
||||
char *pWrite;
|
||||
char ipStr[TSDB_IPv4ADDR_LEN + 6];
|
||||
|
||||
if (pShow->pIter == NULL) {
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
|
||||
}
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pConn = mndGetNextConn(pMnode, pShow->pIter);
|
||||
if (pConn == NULL) break;
|
||||
|
||||
cols = 0;
|
||||
#if 0
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(uint32_t *)pWrite = pConn->id;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
|
||||
// app name
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
|
||||
// app pid
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int32_t *)pWrite = pConn->pid;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
taosIpPort2String(pConn->ip, pConn->port, ipStr);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int64_t *)pWrite = pConn->loginTimeMs;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
|
||||
*(int64_t *)pWrite = pConn->lastAccessTimeMs;
|
||||
cols++;
|
||||
#endif
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
pShow->numOfRows += numOfRows;
|
||||
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t numOfRows = 0;
|
||||
#if 0
|
||||
SConnObj *pConn = NULL;
|
||||
int32_t cols = 0;
|
||||
char *pWrite;
|
||||
void *pIter;
|
||||
char str[TSDB_IPv4ADDR_LEN + 6] = {0};
|
||||
|
||||
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
if (pShow->pIter == NULL) {
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
|
||||
|
@ -641,85 +581,142 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, char *data, in
|
|||
break;
|
||||
}
|
||||
|
||||
if (numOfRows + pConn->numOfQueries >= rows) {
|
||||
taosCacheDestroyIter(pShow->pIter);
|
||||
cols = 0;
|
||||
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
|
||||
|
||||
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(user, pConn->user);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)user, false);
|
||||
|
||||
char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
|
||||
STR_TO_VARSTR(app, pConn->app);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)app, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->pid, false);
|
||||
|
||||
char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
|
||||
sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
|
||||
varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
pShow->numOfRows += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SConnObj *pConn = NULL;
|
||||
|
||||
if (pShow->pIter == NULL) {
|
||||
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
|
||||
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
|
||||
}
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pConn = mndGetNextConn(pMnode, pShow->pIter);
|
||||
if (pConn == NULL) {
|
||||
pShow->pIter = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pConn->numOfQueries; ++i) {
|
||||
SQueryDesc *pDesc = pConn->pQueries + i;
|
||||
taosRLockLatch(&pConn->queryLock);
|
||||
if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) {
|
||||
taosRUnLockLatch(&pConn->queryLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
|
||||
for (int32_t i = 0; i < numOfQueries; ++i) {
|
||||
SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int64_t *)pWrite = htobe64(pDesc->queryId);
|
||||
cols++;
|
||||
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
|
||||
sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
|
||||
varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)queryId, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int64_t *)pWrite = htobe64(pConn->id);
|
||||
cols++;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->queryId, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pConn->id, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port);
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
|
||||
STR_TO_VARSTR(app, pConn->app);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)app, false);
|
||||
|
||||
char handleBuf[24] = {0};
|
||||
snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId));
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->pid, false);
|
||||
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(user, pConn->user);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)user, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int64_t *)pWrite = htobe64(pDesc->stime);
|
||||
cols++;
|
||||
char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
|
||||
sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
|
||||
varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)endpoint, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int64_t *)pWrite = htobe64(pDesc->useconds);
|
||||
cols++;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stime, false);
|
||||
|
||||
snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId));
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->useconds, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int32_t *)pWrite = htonl(pDesc->pid);
|
||||
cols++;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false);
|
||||
|
||||
char epBuf[TSDB_EP_LEN + 1] = {0};
|
||||
snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port);
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(bool *)pWrite = pDesc->stableQuery;
|
||||
cols++;
|
||||
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
int32_t strSize = sizeof(subStatus);
|
||||
int32_t offset = VARSTR_HEADER_SIZE;
|
||||
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
|
||||
if (i) {
|
||||
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
|
||||
}
|
||||
SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
|
||||
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
|
||||
}
|
||||
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, subStatus, false);
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
*(int32_t *)pWrite = htonl(pDesc->numOfSub);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->pMeta->pSchemas[cols].bytes * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->pMeta->pSchemas[cols].bytes);
|
||||
cols++;
|
||||
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(sql, pQuery->sql);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
||||
|
||||
numOfRows++;
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&pConn->queryLock);
|
||||
}
|
||||
|
||||
pShow->numOfRows += numOfRows;
|
||||
#endif
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
|
|
|
@ -1284,7 +1284,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
|
|||
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
||||
if (pStb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
terrno = TSDB_CODE_MND_INVALID_STB;
|
||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
|
|||
|
||||
TEST_F(MndTestProfile, 03_ConnectMsg_Show) {
|
||||
test.SendShowReq(TSDB_MGMT_TABLE_CONNS, "connections", "");
|
||||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
}
|
||||
|
||||
TEST_F(MndTestProfile, 04_HeartBeatMsg) {
|
||||
|
|
|
@ -468,8 +468,8 @@ int32_t udfdConnectToMnode() {
|
|||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
||||
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
|
||||
connReq.pid = htonl(taosGetPId());
|
||||
connReq.startTime = htobe64(taosGetTimestampMs());
|
||||
connReq.pid = taosGetPId();
|
||||
connReq.startTime = taosGetTimestampMs();
|
||||
|
||||
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
|
||||
void * pReq = rpcMallocCont(contLen);
|
||||
|
|
|
@ -199,6 +199,7 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_SHOW_SUBSCRIBES_STMT:
|
||||
case QUERY_NODE_SHOW_SMAS_STMT:
|
||||
case QUERY_NODE_SHOW_CONFIGS_STMT:
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
case QUERY_NODE_SHOW_VNODES_STMT:
|
||||
case QUERY_NODE_SHOW_APPS_STMT:
|
||||
|
|
|
@ -377,6 +377,16 @@ static int32_t collectMetaKeyFromShowTopics(SCollectMetaKeyCxt* pCxt, SShowStmt*
|
|||
pCxt->pMetaCache);
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowConnections(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_CONNECTIONS,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowQueries(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_QUERIES,
|
||||
pCxt->pMetaCache);
|
||||
}
|
||||
|
||||
static int32_t collectMetaKeyFromShowTransactions(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
|
||||
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS,
|
||||
pCxt->pMetaCache);
|
||||
|
@ -447,6 +457,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
|
|||
return collectMetaKeyFromShowVgroups(pCxt, (SShowStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_TOPICS_STMT:
|
||||
return collectMetaKeyFromShowTopics(pCxt, (SShowStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
return collectMetaKeyFromShowConnections(pCxt, (SShowStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
return collectMetaKeyFromShowQueries(pCxt, (SShowStmt*)pStmt);
|
||||
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
|
||||
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
|
||||
case QUERY_NODE_DELETE_STMT:
|
||||
|
|
|
@ -42,6 +42,7 @@ typedef struct STranslateContext {
|
|||
SExplainOptions* pExplainOpt;
|
||||
SParseMetaCache* pMetaCache;
|
||||
bool createStream;
|
||||
bool stableQuery;
|
||||
} STranslateContext;
|
||||
|
||||
typedef struct SFullDatabaseName {
|
||||
|
@ -1508,6 +1509,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addNamespace(pCxt, pRealTable);
|
||||
}
|
||||
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
|
||||
pCxt->stableQuery = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_TEMP_TABLE: {
|
||||
|
@ -4805,6 +4809,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
|
|||
|
||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && pClause->ignoreNotExists) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
goto over;
|
||||
}
|
||||
|
||||
*pIsSuperTable = false;
|
||||
|
@ -4898,7 +4903,7 @@ static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
}
|
||||
}
|
||||
|
||||
if (isSuperTable) {
|
||||
if (isSuperTable || 0 == taosHashGetSize(pVgroupHashmap)) {
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -5343,6 +5348,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
break;
|
||||
}
|
||||
|
||||
pQuery->stableQuery = pCxt->stableQuery;
|
||||
|
||||
if (pQuery->haveResultSet) {
|
||||
if (TSDB_CODE_SUCCESS != extractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -151,7 +151,9 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
|
|||
|
||||
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
|
||||
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
|
||||
SQuerySubDesc subDesc = {0};
|
||||
subDesc.tid = pTask->taskId;
|
||||
strcpy(subDesc.status, jobTaskStatusStr(pTask->status));
|
||||
|
||||
taosArrayPush(pSub, &subDesc);
|
||||
}
|
||||
|
|
|
@ -931,15 +931,15 @@ bool taosCacheIterNext(SCacheIter *pIter) {
|
|||
SCacheObj *pCacheObj = pIter->pCacheObj;
|
||||
|
||||
if (pIter->index + 1 >= pIter->numOfObj) {
|
||||
if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// release the reference for all objects in the snapshot
|
||||
for (int32_t i = 0; i < pIter->numOfObj; ++i) {
|
||||
char *p = pIter->pCurrent[i]->data;
|
||||
taosCacheRelease(pCacheObj, (void **)&p, false);
|
||||
pIter->pCurrent[i] = NULL;
|
||||
}
|
||||
|
||||
if (pIter->entryIndex + 1 >= pCacheObj->capacity) {
|
||||
return false;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
|
Loading…
Reference in New Issue