diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8686d91a10..4e89beabcb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,13 +70,11 @@ typedef uint16_t tmsg_t; #define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 -typedef enum { - HEARTBEAT_TYPE_MQ = 0, - HEARTBEAT_TYPE_QUERY, - // types can be added here - // - HEARTBEAT_TYPE_MAX -} EHbType; +enum { + CONN_TYPE__QUERY = 1, + CONN_TYPE__TMQ, + CONN_TYPE__MAX +}; enum { HEARTBEAT_KEY_DBINFO = 1, @@ -346,7 +344,7 @@ int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); typedef struct { int32_t acctId; int64_t clusterId; - int32_t connId; + uint32_t connId; int8_t superUser; int8_t connType; SEpSet epSet; @@ -1048,40 +1046,6 @@ typedef struct { int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); -typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; - int32_t queryId; - int64_t useconds; - int64_t stime; - int64_t qId; - int64_t sqlObjId; - int32_t pid; - char fqdn[TSDB_FQDN_LEN]; - int8_t stableQuery; - int32_t numOfSub; - char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) -} SQueryDesc; - -typedef struct { - int32_t connId; - int32_t pid; - int32_t numOfQueries; - int32_t numOfStreams; - char app[TSDB_APP_NAME_LEN]; - char pData[]; -} SHeartBeatReq; - -typedef struct { - int32_t connId; - int32_t queryId; - int32_t streamId; - int32_t totalDnodes; - int32_t onlineDnodes; - int8_t killConnection; - int8_t align[3]; - SEpSet epSet; -} SHeartBeatRsp; - typedef struct { int32_t connId; int32_t queryId; @@ -1684,13 +1648,48 @@ typedef struct { } SKv; typedef struct { - int32_t connId; - int32_t hbType; + int64_t tscRid; + int8_t connType; } SClientHbKey; typedef struct { - SClientHbKey connKey; - SHashObj* info; // hash + int64_t tid; + int32_t status; +} SQuerySubDesc; + +typedef struct { + char sql[TSDB_SHOW_SQL_LEN]; + uint64_t queryId; + int64_t useconds; + int64_t stime; + int64_t reqRid; + int32_t pid; + char fqdn[TSDB_FQDN_LEN]; + int32_t subPlanNum; + SArray* subDesc; // SArray +} SQueryDesc; + +typedef struct { + uint32_t connId; + int32_t pid; + char app[TSDB_APP_NAME_LEN]; + SArray* queryDesc; // SArray +} SQueryHbReqBasic; + +typedef struct { + uint32_t connId; + uint64_t killRid; + int32_t totalDnodes; + int32_t onlineDnodes; + int8_t killConnection; + int8_t align[3]; + SEpSet epSet; +} SQueryHbRspBasic; + +typedef struct { + SClientHbKey connKey; + SQueryHbReqBasic* query; + SHashObj* info; // hash } SClientHbReq; typedef struct { @@ -1699,9 +1698,10 @@ typedef struct { } SClientHbBatchReq; typedef struct { - SClientHbKey connKey; - int32_t status; - SArray* info; // Array + SClientHbKey connKey; + int32_t status; + SQueryHbRspBasic* query; + SArray* info; // Array } SClientHbRsp; typedef struct { @@ -1721,8 +1721,23 @@ static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { } } +static FORCE_INLINE void tFreeClientHbQueryDesc(void* pDesc) { + SQueryDesc* desc = (SQueryDesc*)pDesc; + if (desc->subDesc) { + taosArrayDestroy(desc->subDesc); + desc->subDesc = NULL; + } +} + static FORCE_INLINE void tFreeClientHbReq(void* pReq) { SClientHbReq* req = (SClientHbReq*)pReq; + if (req->query) { + if (req->query->queryDesc) { + taosArrayDestroyEx(req->query->queryDesc, tFreeClientHbQueryDesc); + } + taosMemoryFreeClear(req->query); + } + if (req->info) { tFreeReqKvHash(req->info); taosHashCleanup(req->info); @@ -1751,6 +1766,7 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) { static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { SClientHbRsp* rsp = (SClientHbRsp*)pRsp; + taosMemoryFreeClear(rsp->query); if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } @@ -1779,14 +1795,14 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { } static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { - if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1; - if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1; + if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1; + if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1; return 0; } static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { - if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1; - if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1; + if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1; + if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1; return 0; } diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 5ab4ead89c..460749243c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -89,6 +89,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD */ int32_t schedulerFetchRows(int64_t job, void **data); +int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); + /** * Cancel query job diff --git a/include/util/tarray.h b/include/util/tarray.h index 521e54040d..383af8309d 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -205,6 +205,14 @@ SArray* taosArrayDup(const SArray* pSrc); */ void taosArrayClear(SArray* pArray); +/** + * clear the array (remove all element) + * @param pArray + * @param fp + */ +void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); + + /** * destroy array list * @param pArray diff --git a/include/util/tdef.h b/include/util/tdef.h index 6baf784fe3..5fc30540ee 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -128,6 +128,13 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_INS_TABLE_QUERIES "queries" #define TSDB_INS_TABLE_VNODES "vnodes" +#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" +#define TSDB_PERFS_TABLE_CONNECTIONS "connections" +#define TSDB_PERFS_TABLE_QUERIES "queries" +#define TSDB_PERFS_TABLE_TOPICS "topics" +#define TSDB_PERFS_TABLE_CONSUMERS "consumers" +#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" + #define TSDB_INDEX_TYPE_SMA "SMA" #define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT" diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 0f12880272..79d6b2fdf1 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -45,11 +45,6 @@ extern "C" { #define HEARTBEAT_INTERVAL 1500 // ms -enum { - CONN_TYPE__QUERY = 1, - CONN_TYPE__TMQ, -}; - typedef struct SAppInstInfo SAppInstInfo; typedef struct { @@ -84,8 +79,8 @@ typedef struct { TdThread thread; TdThreadMutex lock; // used when app init and cleanup SArray* appHbMgrs; // SArray one for each cluster - FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; - FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; + FHbReqHandle reqHandle[CONN_TYPE__MAX]; + FHbRspHandle rspHandle[CONN_TYPE__MAX]; } SClientHbMgr; typedef struct SQueryExecMetric { @@ -144,6 +139,7 @@ typedef struct STscObj { TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; + SHashObj* pRequests; } STscObj; typedef struct SResultColumn { @@ -256,11 +252,15 @@ int taos_init(); void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); void destroyTscObj(void* pObj); +STscObj *acquireTscObj(int64_t rid); +int32_t releaseTscObj(int64_t rid); uint64_t generateRequestId(); void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void destroyRequest(SRequestObj* pRequest); +SRequestObj *acquireRequest(int64_t rid); +int32_t releaseRequest(int64_t rid); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); @@ -302,7 +302,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); // conn level -int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType); +int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h new file mode 100644 index 0000000000..c29361758d --- /dev/null +++ b/source/client/inc/clientStmt.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_CLIENTSTMT_H +#define TDENGINE_CLIENTSTMT_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { + STMT_TYPE_INSERT = 1, + STMT_TYPE_MULTI_INSERT, + STMT_TYPE_QUERY, +} STMT_TYPE; + +typedef struct STscStmt { + STMT_TYPE type; + //int16_t last; + //STscObj* taos; + //SSqlObj* pSql; + //SMultiTbStmt mtb; + //SNormalStmt normal; + + //int numOfRows; +} STscStmt; + +#define STMT_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) + +TAOS_STMT *stmtInit(TAOS *taos); +int stmtClose(TAOS_STMT *stmt); +int stmtExec(TAOS_STMT *stmt); +char *stmtErrstr(TAOS_STMT *stmt); +int stmtAffectedRows(TAOS_STMT *stmt); +int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind); +int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); +int stmtIsInsert(TAOS_STMT *stmt, int *insert); +int stmtGetParamNum(TAOS_STMT *stmt, int *nums); +int stmtAddBatch(TAOS_STMT *stmt); +TAOS_RES *stmtUseResult(TAOS_STMT *stmt); +int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_CLIENTSTMT_H diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fec6c8e5db..969b18f067 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -37,7 +37,8 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj *pRequest) { - STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id); + STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); + assert(pTscObj != NULL); // connection has been released already, abort creating request. @@ -69,7 +70,7 @@ static void deregisterRequest(SRequestObj *pRequest) { tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst); - taosReleaseRef(clientConnRefPool, pTscObj->id); + releaseTscObj(pTscObj->id); } // todo close the transporter properly @@ -107,12 +108,24 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { return pDnodeConn; } +void closeAllRequests(SHashObj *pRequests) { + void *pIter = taosHashIterate(pRequests, NULL); + while (pIter != NULL) { + int64_t *rid = pIter; + + releaseRequest(*rid); + + pIter = taosHashIterate(pRequests, pIter); + } +} + void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; - SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType}; + SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + closeAllRequests(pTscObj->pRequests); tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); @@ -125,6 +138,13 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI return NULL; } + pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pObj->pRequests) { + taosMemoryFree(pObj); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + pObj->pAppInfo = pAppInfo; tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); @@ -140,6 +160,14 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI return pObj; } +STscObj *acquireTscObj(int64_t rid) { + return (STscObj *)taosAcquireRef(clientConnRefPool, rid); +} + +int32_t releaseTscObj(int64_t rid) { + return taosReleaseRef(clientConnRefPool, rid); +} + void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) { assert(pObj != NULL); @@ -161,6 +189,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty tsem_init(&pRequest->body.rspSem, 0, 0); registerRequest(pRequest); + return pRequest; } @@ -186,6 +215,8 @@ static void doDestroyRequest(void *p) { assert(RID_VALID(pRequest->self)); + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); + taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->pInfo); @@ -214,9 +245,18 @@ void destroyRequest(SRequestObj *pRequest) { return; } - taosReleaseRef(clientReqRefPool, pRequest->self); + taosRemoveRef(clientReqRefPool, pRequest->self); } +SRequestObj *acquireRequest(int64_t rid) { + return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); +} + +int32_t releaseRequest(int64_t rid) { + return taosReleaseRef(clientReqRefPool, rid); +} + + void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. @@ -457,11 +497,18 @@ uint64_t generateRequestId() { } } - int64_t ts = taosGetTimestampMs(); - uint64_t pid = taosGetPId(); - int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + uint64_t id = 0; + + while (true) { + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); - uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + if (id) { + break; + } + } return id; } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 82788b2e11..a6678b2ec0 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -14,6 +14,7 @@ */ #include "catalog.h" +#include "scheduler.h" #include "clientInt.h" #include "clientLog.h" #include "trpc.h" @@ -109,10 +110,36 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { - tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); + tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.connType); return TSDB_CODE_SUCCESS; } + if (pRsp->query) { + STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid); + if (NULL == pTscObj) { + tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid); + } else { + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); + pTscObj->connId = pRsp->query->connId; + + if (pRsp->query->killRid) { + SRequestObj *pRequest = acquireRequest(pRsp->query->killRid); + if (NULL == pRequest) { + tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid); + } else { + taos_stop_query((TAOS_RES *)pRequest); + releaseRequest(pRsp->query->killRid); + } + } + + if (pRsp->query->killConnection) { + taos_close(pTscObj); + } + + releaseTscObj(pRsp->connKey.tscRid); + } + } + int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; tscDebug("hb got %d rsp kv", kvNum); @@ -197,7 +224,7 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) for (int32_t i = 0; i < rspNum; ++i) { SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); - code = (*clientHbMgr.rspHandle[rsp->connKey.hbType])((*pInst)->pAppHbMgr, rsp); + code = (*clientHbMgr.rspHandle[rsp->connKey.connType])((*pInst)->pAppHbMgr, rsp); if (code) { break; } @@ -208,6 +235,97 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) return code; } +int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { + int64_t now = taosGetTimestampUs(); + SQueryDesc desc = {0}; + int32_t code = 0; + + void *pIter = taosHashIterate(pObj->pRequests, NULL); + while (pIter != NULL) { + int64_t *rid = pIter; + SRequestObj *pRequest = acquireRequest(*rid); + if (NULL == pRequest) { + continue; + } + + tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql)); + desc.stime = pRequest->metric.start; + desc.queryId = pRequest->requestId; + desc.useconds = now - pRequest->metric.start; + desc.reqRid = pRequest->self; + desc.pid = hbBasic->pid; + taosGetFqdn(desc.fqdn); + desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0; + + if (desc.subPlanNum) { + desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc)); + if (NULL == desc.subDesc) { + releaseRequest(*rid); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc); + if (code) { + taosArrayDestroy(desc.subDesc); + desc.subDesc = NULL; + } + } + + releaseRequest(*rid); + taosArrayPush(hbBasic->queryDesc, &desc); + + pIter = taosHashIterate(pObj->pRequests, pIter); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { + STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); + if (NULL == pTscObj) { + tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); + return TSDB_CODE_QRY_APP_ERROR; + } + + int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0; + if (numOfQueries <= 0) { + releaseTscObj(connKey->tscRid); + tscDebug("no queries on connection"); + return TSDB_CODE_QRY_APP_ERROR; + } + + SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic)); + if (NULL == hbBasic) { + tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic)); + releaseTscObj(connKey->tscRid); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc)); + if (NULL == hbBasic->queryDesc) { + tscWarn("taosArrayInit %d queryDesc failed", numOfQueries); + releaseTscObj(connKey->tscRid); + taosMemoryFree(hbBasic); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + hbBasic->connId = pTscObj->connId; + hbBasic->pid = taosGetPId(); + taosGetAppName(hbBasic->app, NULL); + + int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); + if (code) { + releaseTscObj(connKey->tscRid); + taosMemoryFree(hbBasic); + return code; + } + + req->query = hbBasic; + releaseTscObj(connKey->tscRid); + + return TSDB_CODE_SUCCESS; +} + int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SDbVgVersion *dbs = NULL; uint32_t dbNum = 0; @@ -286,6 +404,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return code; } + hbGetQueryBasicInfo(connKey, req); + code = hbGetExpiredDBInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -300,11 +420,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req } void hbMgrInitMqHbHandle() { - clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; - clientHbMgr.reqHandle[HEARTBEAT_TYPE_MQ] = hbMqHbReqHandle; + clientHbMgr.reqHandle[CONN_TYPE__QUERY] = hbQueryHbReqHandle; + clientHbMgr.reqHandle[CONN_TYPE__TMQ] = hbMqHbReqHandle; - clientHbMgr.rspHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbRspHandle; - clientHbMgr.rspHandle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; + clientHbMgr.rspHandle[CONN_TYPE__QUERY] = hbQueryHbRspHandle; + clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle; } static FORCE_INLINE void hbMgrInitHandle() { @@ -317,6 +437,11 @@ void hbFreeReq(void *req) { tFreeReqKvHash(pReq->info); } +void hbClearClientHbReq(SClientHbReq *pReq) { + pReq->query = NULL; + pReq->info = NULL; +} + SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { @@ -333,22 +458,23 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey)); if (info) { - code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); + code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq); if (code) { - taosHashCancelIterate(pAppHbMgr->activeInfo, pIter); - break; + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + continue; } } taosArrayPush(pBatchReq->reqs, pOneReq); + hbClearClientHbReq(pOneReq); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } - if (code) { - taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); - taosMemoryFreeClear(pBatchReq); - } +// if (code) { +// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); +// taosMemoryFreeClear(pBatchReq); +// } return pBatchReq; } @@ -523,13 +649,13 @@ int hbMgrInit() { hbMgrInitHandle(); // init backgroud thread - hbCreateThread(); + //hbCreateThread(); return 0; } void hbMgrCleanUp() { - hbStopThread(); + //hbStopThread(); // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); @@ -549,7 +675,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * if (data != NULL) { return 0; } - SClientHbReq hbReq; + SClientHbReq hbReq = {0}; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); @@ -566,22 +692,22 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * return 0; } -int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { +int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) { SClientHbKey connKey = { - .connId = connId, - .hbType = hbType, + .tscRid = tscRefId, + .connType = connType, }; SHbConnInfo info = {0}; - switch (hbType) { - case HEARTBEAT_TYPE_QUERY: { + switch (connType) { + case CONN_TYPE__QUERY: { int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t)); *pClusterId = clusterId; info.param = pClusterId; return hbRegisterConnImpl(pAppHbMgr, connKey, &info); } - case HEARTBEAT_TYPE_MQ: { + case CONN_TYPE__TMQ: { return 0; } default: diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 96a7230ff3..75b4537364 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -132,6 +132,13 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; + if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, sizeof((*pRequest)->self))) { + destroyRequest(*pRequest); + *pRequest = NULL; + tscError("put request to request hash failed"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } @@ -447,7 +454,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, + tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 42d204284e..ae8a31f367 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -14,7 +14,9 @@ */ #include "catalog.h" +#include "scheduler.h" #include "clientInt.h" +#include "clientStmt.h" #include "clientLog.h" #include "os.h" #include "query.h" @@ -66,6 +68,7 @@ void taos_cleanup(void) { rpcCleanup(); catalogDestroy(); + schedulerDestroy(); taosCloseLog(); tscInfo("all local resources released"); @@ -98,7 +101,7 @@ void taos_close(TAOS *taos) { STscObj *pTscObj = (STscObj *)taos; tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - /*taosRemoveRef(clientConnRefPool, pTscObj->id);*/ + taosRemoveRef(clientConnRefPool, pTscObj->id); } int taos_errno(TAOS_RES *tres) { @@ -400,7 +403,7 @@ void taos_stop_query(TAOS_RES *res) { return; } - // scheduleCancelJob(pRequest->body.pQueryJob); + schedulerFreeJob(pRequest->body.queryJob); } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { @@ -565,76 +568,149 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } TAOS_STMT *taos_stmt_init(TAOS *taos) { - // TODO - return NULL; + if (taos == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + return stmtInit(taos); } int taos_stmt_close(TAOS_STMT *stmt) { - // TODO - return -1; + if (stmt == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtClose(stmt); } int taos_stmt_execute(TAOS_STMT *stmt) { - // TODO - return -1; + if (stmt == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtExec(stmt); } char *taos_stmt_errstr(TAOS_STMT *stmt) { - // TODO - return NULL; + if (stmt == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + return stmtErrstr(stmt); } int taos_stmt_affected_rows(TAOS_STMT *stmt) { - // TODO - return -1; + if (stmt == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return 0; + } + + return stmtAffectedRows(stmt); } +int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { + if (stmt == NULL || bind == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtBind(stmt, bind); +} + +int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { + if (stmt == NULL || sql == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtPrepare(stmt, sql, length); +} + +int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { + if (stmt == NULL || name == NULL || tags == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtSetTbNameTags(stmt, name, tags); +} + +int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { + if (stmt == NULL || name == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtSetTbNameTags(stmt, name, NULL); +} + +int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) { + if (stmt == NULL || insert == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtIsInsert(stmt, insert); +} + +int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) { + if (stmt == NULL || nums == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtGetParamNum(stmt, nums); +} + +int taos_stmt_add_batch(TAOS_STMT *stmt) { + if (stmt == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtAddBatch(stmt); +} + +TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { + if (stmt == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + return stmtUseResult(stmt); +} + +int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { + if (stmt == NULL || bind == NULL) { + tscError("NULL parameter for %s", __FUNCTION__); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return stmtBindBatch(stmt, bind); +} + + TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { // TODO return NULL; } -int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind) { - // TODO - return -1; -} -int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { - // TODO - return -1; -} - -int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { - // TODO - return -1; -} - -int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { - // TODO - return -1; -} - -int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) { - // TODO - return -1; -} - -int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) { - // TODO - return -1; -} - -int taos_stmt_add_batch(TAOS_STMT *stmt) { - // TODO - return -1; -} - -TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { - // TODO - return NULL; -} - -int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { - // TODO - return -1; -} diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 67c5679cac..f822da7c8d 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -71,7 +71,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connType = connectRsp.connType; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, connectRsp.connType); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c new file mode 100644 index 0000000000..8c4cff9251 --- /dev/null +++ b/source/client/src/clientStmt.c @@ -0,0 +1,99 @@ + +#include "clientInt.h" +#include "clientLog.h" +#include "clientStmt.h" +#include "tdef.h" + +TAOS_STMT *stmtInit(TAOS *taos) { + STscObj* pObj = (STscObj*)taos; + STscStmt* pStmt = NULL; + +#if 0 + pStmt = taosMemoryCalloc(1, sizeof(STscStmt)); + if (pStmt == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscError("failed to allocate memory for statement"); + return NULL; + } + pStmt->taos = pObj; + + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + + if (pSql == NULL) { + free(pStmt); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscError("failed to allocate memory for statement"); + return NULL; + } + + if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { + free(pSql); + free(pStmt); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscError("failed to malloc payload buffer"); + return NULL; + } + + tsem_init(&pSql->rspSem, 0, 0); + pSql->signature = pSql; + pSql->pTscObj = pObj; + pSql->maxRetry = TSDB_MAX_REPLICA; + pStmt->pSql = pSql; + pStmt->last = STMT_INIT; + pStmt->numOfRows = 0; + registerSqlObj(pSql); +#endif + + return pStmt; +} + +int stmtClose(TAOS_STMT *stmt) { + return TSDB_CODE_SUCCESS; +} + +int stmtExec(TAOS_STMT *stmt) { + return TSDB_CODE_SUCCESS; +} + +char *stmtErrstr(TAOS_STMT *stmt) { + return NULL; +} + +int stmtAffectedRows(TAOS_STMT *stmt) { + return TSDB_CODE_SUCCESS; +} + +int stmtBind(TAOS_STMT *stmt, TAOS_BIND *bind) { + return TSDB_CODE_SUCCESS; +} + +int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) { + return TSDB_CODE_SUCCESS; +} + +int stmtSetTbNameTags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags) { + return TSDB_CODE_SUCCESS; +} + +int stmtIsInsert(TAOS_STMT *stmt, int *insert) { + return TSDB_CODE_SUCCESS; +} + +int stmtGetParamNum(TAOS_STMT *stmt, int *nums) { + return TSDB_CODE_SUCCESS; +} + +int stmtAddBatch(TAOS_STMT *stmt) { + return TSDB_CODE_SUCCESS; +} + +TAOS_RES *stmtUseResult(TAOS_STMT *stmt) { + return NULL; +} + +int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { + return TSDB_CODE_SUCCESS; +} + + + diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 031ebdaf49..3e37c52c26 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -134,6 +134,42 @@ void *taosDecodeSEpSet(void *buf, SEpSet *pEp) { static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; + if (pReq->connKey.connType == CONN_TYPE__QUERY) { + int32_t queryNum = 0; + if (pReq->query) { + queryNum = 1; + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + if (tEncodeU32(pEncoder, pReq->query->connId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->query->pid) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->query->app) < 0) return -1; + + int32_t num = taosArrayGetSize(pReq->query->queryDesc); + if (tEncodeI32(pEncoder, num) < 0) return -1; + + for (int32_t i = 0; i < num; ++i) { + SQueryDesc *desc = taosArrayGet(pReq->query->queryDesc, i); + if (tEncodeCStr(pEncoder, desc->sql) < 0) return -1; + if (tEncodeU64(pEncoder, desc->queryId) < 0) return -1; + if (tEncodeI64(pEncoder, desc->useconds) < 0) return -1; + if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; + if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; + if (tEncodeI32(pEncoder, desc->pid) < 0) return -1; + if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; + if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; + + int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0; + if (tEncodeI32(pEncoder, snum) < 0) return -1; + for (int32_t m = 0; m < snum; ++m) { + SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m); + if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1; + if (tEncodeI32(pEncoder, sDesc->status) < 0) return -1; + } + } + } else { + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + } + } + int32_t kvNum = taosHashGetSize(pReq->info); if (tEncodeI32(pEncoder, kvNum) < 0) return -1; void *pIter = taosHashIterate(pReq->info, NULL); @@ -149,6 +185,53 @@ static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq static int32_t tDeserializeSClientHbReq(SCoder *pDecoder, SClientHbReq *pReq) { if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1; + if (pReq->connKey.connType == CONN_TYPE__QUERY) { + int32_t queryNum = 0; + if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; + if (queryNum) { + pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query)); + if (NULL == pReq->query) return -1; + if (tDecodeU32(pDecoder, &pReq->query->connId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->query->pid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pReq->query->app) < 0) return -1; + + int32_t num = 0; + if (tDecodeI32(pDecoder, &num) < 0) return -1; + if (num > 0) { + pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc)); + if (NULL == pReq->query->queryDesc) return -1; + + for (int32_t i = 0; i < num; ++i) { + SQueryDesc desc = {0}; + if (tDecodeCStrTo(pDecoder, desc.sql) < 0) return -1; + if (tDecodeU64(pDecoder, &desc.queryId) < 0) return -1; + if (tDecodeI64(pDecoder, &desc.useconds) < 0) return -1; + if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; + if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; + if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; + if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; + + int32_t snum = 0; + if (tDecodeI32(pDecoder, &snum) < 0) return -1; + if (snum > 0) { + desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc)); + if (NULL == desc.subDesc) return -1; + + for (int32_t m = 0; m < snum; ++m) { + SQuerySubDesc sDesc = {0}; + if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1; + if (tDecodeI32(pDecoder, &sDesc.status) < 0) return -1; + taosArrayPush(desc.subDesc, &sDesc); + } + } + + taosArrayPush(pReq->query->queryDesc, &desc); + } + } + } + } + int32_t kvNum = 0; if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; if (pReq->info == NULL) { @@ -168,6 +251,20 @@ static int32_t tSerializeSClientHbRsp(SCoder *pEncoder, const SClientHbRsp *pRsp if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1; + int32_t queryNum = 0; + if (pRsp->query) { + queryNum = 1; + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + if (tEncodeU32(pEncoder, pRsp->query->connId) < 0) return -1; + if (tEncodeU64(pEncoder, pRsp->query->killRid) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->query->totalDnodes) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1; + } else { + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + } + int32_t kvNum = taosArrayGetSize(pRsp->info); if (tEncodeI32(pEncoder, kvNum) < 0) return -1; for (int32_t i = 0; i < kvNum; i++) { @@ -182,6 +279,19 @@ static int32_t tDeserializeSClientHbRsp(SCoder *pDecoder, SClientHbRsp *pRsp) { if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1; + int32_t queryNum = 0; + if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; + if (queryNum) { + pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query)); + if (NULL == pRsp->query) return -1; + if (tDecodeU32(pDecoder, &pRsp->query->connId) < 0) return -1; + if (tDecodeU64(pDecoder, &pRsp->query->killRid) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->query->totalDnodes) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1; + } + int32_t kvNum = 0; if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); @@ -224,8 +334,9 @@ int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchR int32_t reqNum = 0; if (tDecodeI32(&decoder, &reqNum) < 0) return -1; - if (pBatchReq->reqs == NULL) { + if (reqNum > 0) { pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq)); + if (NULL == pBatchReq->reqs) return -1; } for (int32_t i = 0; i < reqNum; i++) { SClientHbReq req = {0}; @@ -2567,7 +2678,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1; if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1; + if (tEncodeU32(&encoder, pRsp->connId) < 0) return -1; if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; @@ -2586,7 +2697,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; + if (tDecodeU32(&decoder, &pRsp->connId) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 11c1b09cc9..7b67308876 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -38,6 +38,10 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} +#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) + typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); @@ -74,7 +78,6 @@ typedef struct { } SShowMgmt; typedef struct { - int32_t connId; SCacheObj *cache; } SProfileMgmt; @@ -118,6 +121,7 @@ struct SMnode { STelemMgmt telemMgmt; SSyncMgmt syncMgmt; SHashObj *infosMeta; + SHashObj *perfsMeta; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; SMsgCb msgCb; diff --git a/source/dnode/mnode/impl/inc/mndPerfSchema.h b/source/dnode/mnode/impl/inc/mndPerfSchema.h new file mode 100644 index 0000000000..2c613e253c --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndPerfSchema.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_PERF_SCHEMA_H_ +#define _TD_MND_PERF_SCHEMA_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SPerfsTableSchema { + char *name; + int32_t type; + int32_t bytes; +} SPerfsTableSchema; + +typedef struct SPerfsTableMeta { + char *name; + const SPerfsTableSchema *schema; + int32_t colNum; +} SPerfsTableMeta; + +int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp); +int32_t mndInitPerfs(SMnode *pMnode); +void mndCleanupPerfs(SMnode *pMnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_PERF_SCHEMA_H_*/ diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 3db4e9870e..43d5814ffb 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1128,6 +1128,8 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; + } else { + code = 0; } } else { usedbRsp.vgVersion = usedbReq.vgVersion; diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index c21a6e61df..55eb1ca4bd 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mndInfoSchema.h" +#include "mndInt.h" #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c new file mode 100644 index 0000000000..38e9e79228 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndPerfSchema.h" +#include "mndInt.h" + +//!!!! Note: only APPEND columns in below tables, NO insert !!!! +static const SPerfsTableSchema connectionsSchema[] = { + {.name = "conn_id", .bytes = 4, .type = TSDB_DATA_TYPE_UINT}, + {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "program", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "login_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, +}; +static const SPerfsTableSchema queriesSchema[] = { + {.name = "query_id", .bytes = 4, .type = TSDB_DATA_TYPE_UBIGINT}, + {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "user", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "fqdn", .bytes = TSDB_FQDN_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "sub_queries", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "sub_query_info", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, +}; + +static const SPerfsTableSchema topicSchema[] = { + {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, + {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, +}; + +static const SPerfsTableSchema consumerSchema[] = { + {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + // ep + // up time + // topics +}; + +static const SPerfsTableSchema subscribeSchema[] = { + {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, +}; + +static const SPerfsTableMeta perfsMeta[] = { + {TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)}, + {TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)}, + {TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)}, + {TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)}, + {TSDB_PERFS_TABLE_SUBSCRIBES, subscribeSchema, tListLen(subscribeSchema)}, +}; + +// connection/application/ +int32_t mndInitPerfsTableSchema(const SPerfsTableSchema *pSrc, int32_t colNum, SSchema **pDst) { + SSchema *schema = taosMemoryCalloc(colNum, sizeof(SSchema)); + if (NULL == schema) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < colNum; ++i) { + strcpy(schema[i].name, pSrc[i].name); + + schema[i].type = pSrc[i].type; + schema[i].colId = i + 1; + schema[i].bytes = pSrc[i].bytes; + } + + *pDst = schema; + return TSDB_CODE_SUCCESS; +} + +int32_t mndPerfsInitMeta(SHashObj *hash) { + STableMetaRsp meta = {0}; + + strcpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB); + meta.tableType = TSDB_SYSTEM_TABLE; + meta.sversion = 1; + meta.tversion = 1; + + for (int32_t i = 0; i < tListLen(perfsMeta); ++i) { + strcpy(meta.tbName, perfsMeta[i].name); + meta.numOfColumns = perfsMeta[i].colNum; + + if (mndInitPerfsTableSchema(perfsMeta[i].schema, perfsMeta[i].colNum, &meta.pSchemas)) { + return -1; + } + + if (taosHashPut(hash, meta.tbName, strlen(meta.tbName), &meta, sizeof(meta))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { + if (NULL == pMnode->perfsMeta) { + terrno = TSDB_CODE_MND_NOT_READY; + return -1; + } + + STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->perfsMeta, tbName, strlen(tbName)); + if (NULL == meta) { + mError("invalid performance schema table name:%s", tbName); + terrno = TSDB_CODE_MND_INVALID_INFOS_TBL; + return -1; + } + + *pRsp = *meta; + + pRsp->pSchemas = taosMemoryCalloc(meta->numOfColumns, sizeof(SSchema)); + if (pRsp->pSchemas == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pRsp->pSchemas = NULL; + return -1; + } + + memcpy(pRsp->pSchemas, meta->pSchemas, meta->numOfColumns * sizeof(SSchema)); + + return 0; +} + +int32_t mndInitPerfs(SMnode *pMnode) { + pMnode->perfsMeta = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (pMnode->perfsMeta == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return mndPerfsInitMeta(pMnode->perfsMeta); +} + +void mndCleanupPerfs(SMnode *pMnode) { + if (NULL == pMnode->perfsMeta) { + return; + } + + void *pIter = taosHashIterate(pMnode->perfsMeta, NULL); + while (pIter) { + STableMetaRsp *meta = (STableMetaRsp *)pIter; + + taosMemoryFreeClear(meta->pSchemas); + + pIter = taosHashIterate(pMnode->perfsMeta, pIter); + } + + taosHashCleanup(pMnode->perfsMeta); + pMnode->perfsMeta = NULL; +} + diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 353a147a92..826a73afc6 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -24,7 +24,7 @@ #include "version.h" typedef struct { - int32_t id; + uint32_t id; int8_t connType; char user[TSDB_USER_LEN]; char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc @@ -35,15 +35,15 @@ typedef struct { int8_t killed; int64_t loginTimeMs; int64_t lastAccessTimeMs; - int32_t queryId; + uint64_t killId; int32_t numOfQueries; - SQueryDesc *pQueries; + SArray *pQueries; //SArray } SConnObj; static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime); static void mndFreeConn(SConnObj *pConn); -static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); +static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); @@ -91,8 +91,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType int32_t pid, const char *app, int64_t startTime) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); - if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1); + char connStr[255] = {0}; + int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); + int32_t connId = mndGenerateUid(connStr, len); if (startTime == 0) startTime = taosGetTimestampMs(); SConnObj connObj = {.id = connId, @@ -104,7 +105,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType .killed = 0, .loginTimeMs = taosGetTimestampMs(), .lastAccessTimeMs = 0, - .queryId = 0, + .killId = 0, .numOfQueries = 0, .pQueries = NULL}; @@ -119,35 +120,35 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); return NULL; } else { - mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user); + mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user); return pConn; } } static void mndFreeConn(SConnObj *pConn) { taosMemoryFreeClear(pConn->pQueries); - mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn); + mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn); } -static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) { +static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId)); if (pConn == NULL) { - mDebug("conn:%d, already destroyed", connId); + mDebug("conn:%u, already destroyed", connId); return NULL; } int32_t keepTime = tsShellActivityTimer * 3; pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs(); - mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn); + mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn); return pConn; } static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { if (pConn == NULL) return; - mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn); + mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn); SProfileMgmt *pMgmt = &pMnode->profileMgmt; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); @@ -212,6 +213,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { goto CONN_OVER; } + mndAcquireConn(pMnode, pConn->id); + SConnectRsp connectRsp = {0}; connectRsp.acctId = pUser->acctId; connectRsp.superUser = pUser->superUser; @@ -232,7 +235,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { pReq->rspLen = contLen; pReq->pRsp = pRsp; - mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app); + mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->user, ip, pConn->port, pConn->id, connReq.app); code = 0; @@ -245,22 +248,13 @@ CONN_OVER: return code; } -static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { - pConn->numOfQueries = 0; - int32_t numOfQueries = htonl(pReq->numOfQueries); +static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { + taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); - if (numOfQueries > 0) { - if (pConn->pQueries == NULL) { - pConn->pQueries = taosMemoryCalloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE); - } - - pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries); - - int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc); - if (saveSize > 0 && pConn->pQueries != NULL) { - memcpy(pConn->pQueries, pReq->pData, saveSize); - } - } + pConn->pQueries = pBasic->queryDesc; + pBasic->queryDesc = NULL; + + pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; return TSDB_CODE_SUCCESS; } @@ -330,6 +324,111 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { return NULL; } +static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; + + if (pHbReq->query) { + SQueryHbReqBasic *pBasic = pHbReq->query; + + SRpcConnInfo connInfo = {0}; + rpcGetConnInfo(pMsg->handle, &connInfo); + + SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); + if (pConn == NULL) { + pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0); + if (pConn == NULL) { + mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr()); + return -1; + } else { + mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id); + } + } else if (pConn->killed) { + mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id); + mndReleaseConn(pMnode, pConn); + terrno = TSDB_CODE_MND_INVALID_CONNECTION; + return -1; + } + + SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); + if (rspBasic == NULL) { + mndReleaseConn(pMnode, pConn); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr()); + return -1; + } + + mndSaveQueryList(pConn, pBasic); + if (pConn->killed != 0) { + rspBasic->killConnection = 1; + } + + if (pConn->killId != 0) { + rspBasic->killRid = pConn->killId; + pConn->killId = 0; + } + + rspBasic->connId = pConn->id; + rspBasic->totalDnodes = 1; //TODO + rspBasic->onlineDnodes = 1; //TODO + mndGetMnodeEpSet(pMnode, &rspBasic->epSet); + mndReleaseConn(pMnode, pConn); + + hbRsp.query = rspBasic; + } + + int32_t kvNum = taosHashGetSize(pHbReq->info); + if (NULL == pHbReq->info || kvNum <= 0) { + taosArrayPush(pBatchRsp->rsps, &hbRsp); + return TSDB_CODE_SUCCESS; + } + + hbRsp.info = taosArrayInit(kvNum, sizeof(SKv)); + if (NULL == hbRsp.info) { + mError("taosArrayInit %d rsp kv failed", kvNum); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + void *pIter = taosHashIterate(pHbReq->info, NULL); + while (pIter != NULL) { + SKv *kv = pIter; + + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } + case HEARTBEAT_KEY_STBINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } + default: + mError("invalid kv key:%d", kv->key); + hbRsp.status = TSDB_CODE_MND_APP_ERROR; + break; + } + + pIter = taosHashIterate(pHbReq->info, pIter); + } + + taosArrayPush(pBatchRsp->rsps, &hbRsp); + + return TSDB_CODE_SUCCESS; +} + static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; @@ -345,50 +444,9 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { int32_t sz = taosArrayGetSize(batchReq.reqs); for (int i = 0; i < sz; i++) { SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i); - if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { - int32_t kvNum = taosHashGetSize(pHbReq->info); - if (NULL == pHbReq->info || kvNum <= 0) { - continue; - } - - SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))}; - - void *pIter = taosHashIterate(pHbReq->info, NULL); - while (pIter != NULL) { - SKv *kv = pIter; - - switch (kv->key) { - case HEARTBEAT_KEY_DBINFO: { - void *rspMsg = NULL; - int32_t rspLen = 0; - mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen); - if (rspMsg && rspLen > 0) { - SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp.info, &kv1); - } - break; - } - case HEARTBEAT_KEY_STBINFO: { - void *rspMsg = NULL; - int32_t rspLen = 0; - mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); - if (rspMsg && rspLen > 0) { - SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp.info, &kv1); - } - break; - } - default: - mError("invalid kv key:%d", kv->key); - hbRsp.status = TSDB_CODE_MND_APP_ERROR; - break; - } - - pIter = taosHashIterate(pHbReq->info, pIter); - } - - taosArrayPush(batchRsp.rsps, &hbRsp); - } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { + if (pHbReq->connKey.connType == CONN_TYPE__QUERY) { + mndProcessQueryHeartBeat(pMnode, &pReq->rpcMsg, pHbReq, &batchRsp); + } else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { taosArrayPush(batchRsp.rsps, pRsp); @@ -416,73 +474,8 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { taosArrayDestroy(batchRsp.rsps); pReq->rspLen = tlen; pReq->pRsp = buf; + return 0; - -#if 0 - SMnode *pMnode = pReq->pNode; - SProfileMgmt *pMgmt = &pMnode->profileMgmt; - - SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont; - pHeartbeat->connId = htonl(pHeartbeat->connId); - pHeartbeat->pid = htonl(pHeartbeat->pid); - - SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId); - if (pConn == NULL) { - pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0); - if (pConn == NULL) { - mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr()); - return -1; - } else { - mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id); - } - } else if (pConn->killed) { - mError("user:%s, conn:%d is already killed", pReq->user, pConn->id); - terrno = TSDB_CODE_MND_INVALID_CONNECTION; - return -1; - } else { - if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) { - char oldIpStr[40]; - char newIpStr[40]; - taosIpPort2String(pConn->ip, pConn->port, oldIpStr); - taosIpPort2String(info.clientIp, info.clientPort, newIpStr); - mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr, - pConn->user, oldIpStr); - - if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1; - taosCacheRelease(pMgmt->cache, (void **)&pConn, false); - terrno = TSDB_CODE_MND_INVALID_CONNECTION; - return -1; - } - } - - SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp)); - if (pRsp == NULL) { - mndReleaseConn(pMnode, pConn); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr()); - return -1; - } - - mndSaveQueryStreamList(pConn, pHeartbeat); - if (pConn->killed != 0) { - pRsp->killConnection = 1; - } - - if (pConn->queryId != 0) { - pRsp->queryId = htonl(pConn->queryId); - pConn->queryId = 0; - } - - pRsp->connId = htonl(pConn->id); - pRsp->totalDnodes = htonl(1); - pRsp->onlineDnodes = htonl(1); - mndGetMnodeEpSet(pMnode, &pRsp->epSet); - mndReleaseConn(pMnode, pConn); - - pReq->contLen = sizeof(SConnectRsp); - pReq->pRsp = pRsp; - return 0; -#endif } static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { @@ -513,7 +506,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { return -1; } else { mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user); - pConn->queryId = killReq.queryId; + pConn->killId = killReq.queryId; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return 0; } @@ -571,7 +564,7 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pConn->id; + *(uint32_t *)pWrite = pConn->id; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -613,6 +606,7 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pNode; int32_t numOfRows = 0; +#if 0 SConnObj *pConn = NULL; int32_t cols = 0; char *pWrite; @@ -709,6 +703,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i } pShow->numOfRows += numOfRows; +#endif return numOfRows; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 92a67fd7e9..0ce73005e8 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -18,6 +18,7 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndInfoSchema.h" +#include "mndPerfSchema.h" #include "mndMnode.h" #include "mndShow.h" #include "mndTrans.h" @@ -1516,6 +1517,11 @@ static int32_t mndProcessTableMetaReq(SNodeMsg *pReq) { if (mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { goto RETRIEVE_META_OVER; } + } else if (0 == strcmp(infoReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) { + mDebug("performance_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName); + if (mndBuildPerfsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { + goto RETRIEVE_META_OVER; + } } else { mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName); if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index f765b0ce11..3ac1c522b2 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -23,6 +23,7 @@ #include "mndDnode.h" #include "mndFunc.h" #include "mndInfoSchema.h" +#include "mndPerfSchema.h" #include "mndMnode.h" #include "mndOffset.h" #include "mndProfile.h" @@ -210,6 +211,7 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) { if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1; if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (deploy) { diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 14b31e5282..b06c98466b 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -99,7 +99,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { SClientHbBatchReq batchReq = {0}; batchReq.reqs = taosArrayInit(0, sizeof(SClientHbReq)); SClientHbReq req = {0}; - req.connKey = {.connId = 123, .hbType = HEARTBEAT_TYPE_MQ}; + req.connKey = {.connId = 123, .hbType = CONN_TYPE__TMQ}; req.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); SKv kv = {0}; kv.key = 123; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 09f51dc03e..8938084724 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -227,20 +227,20 @@ typedef struct SCtgAction { #define CTG_FLAG_STB 0x1 #define CTG_FLAG_NOT_STB 0x2 #define CTG_FLAG_UNKNOWN_STB 0x4 -#define CTG_FLAG_INF_DB 0x8 +#define CTG_FLAG_SYS_DB 0x8 #define CTG_FLAG_FORCE_UPDATE 0x10 #define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB) #define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB) #define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB) -#define CTG_FLAG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB) +#define CTG_FLAG_IS_SYS_DB(_flag) ((_flag) & CTG_FLAG_SYS_DB) #define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE) -#define CTG_FLAG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB) +#define CTG_FLAG_SET_SYS_DB(_flag) ((_flag) |= CTG_FLAG_SYS_DB) #define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0) #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) -#define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) +#define CTG_IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB)))) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2aa858fe06..21e0be40a4 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -217,7 +217,7 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) } char *p = strchr(dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -304,7 +304,7 @@ int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t d } char *p = strchr(dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -336,7 +336,7 @@ int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, boo } char *p = strchr(output->dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { memmove(output->dbFName, p + 1, strlen(p + 1)); } @@ -410,7 +410,7 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) { int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { char *p = strchr(dbFName, '.'); - if (p && CTG_IS_INF_DBNAME(p + 1)) { + if (p && CTG_IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; } @@ -688,7 +688,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable } char dbFName[TSDB_DB_FNAME_LEN] = {0}; - if (CTG_FLAG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_SYS_DB(flag)) { strcpy(dbFName, pTableName->dbname); } else { tNameGetFullDbName(pTableName, dbFName); @@ -1721,7 +1721,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SVgroupInfo vgroupInfo = {0}; int32_t code = 0; - if (!CTG_FLAG_IS_INF_DB(flag)) { + if (!CTG_FLAG_IS_SYS_DB(flag)) { CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo)); } @@ -1732,7 +1732,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - if (CTG_FLAG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_SYS_DB(flag)) { ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output)); @@ -1820,8 +1820,8 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons uint64_t suid = 0; STableMetaOutput *output = NULL; - if (CTG_IS_INF_DBNAME(pTableName->dbname)) { - CTG_FLAG_SET_INF_DB(flag); + if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { + CTG_FLAG_SET_SYS_DB(flag); } CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &inCache, flag, &dbId)); @@ -1829,7 +1829,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons int32_t tbType = 0; if (inCache) { - if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_INF_DB(flag)))) { + if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_SYS_DB(flag)))) { goto _return; } @@ -1885,7 +1885,7 @@ _return: if (CTG_TABLE_NOT_EXIST(code) && inCache) { char dbFName[TSDB_DB_FNAME_LEN] = {0}; - if (CTG_FLAG_IS_INF_DB(flag)) { + if (CTG_FLAG_IS_SYS_DB(flag)) { strcpy(dbFName, pTableName->dbname); } else { tNameGetFullDbName(pTableName, dbFName); @@ -2633,7 +2633,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - if (CTG_IS_INF_DBNAME(pTableName->dbname)) { + if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -2666,7 +2666,7 @@ _return: int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { CTG_API_ENTER(); - if (CTG_IS_INF_DBNAME(pTableName->dbname)) { + if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index e1a486d8b1..8731b26c4a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1201,7 +1201,7 @@ void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: - pVal->pz = pNode->datum.p; + pVal->pz = pNode->datum.p + VARSTR_HEADER_SIZE; break; case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_DECIMAL: diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 9e53eee5c8..acc597d61b 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -223,11 +223,15 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar return code; } -static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { +static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) { SParseContext* pBasicCtx = pCxt->pComCxt; SName name = {0}; createSName(&name, pTname, pBasicCtx, &pCxt->msg); - CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta)); + if (isStb) { + CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta)); + } else { + CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta)); + } SVgroupInfo vg; CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); @@ -235,6 +239,15 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return TSDB_CODE_SUCCESS; } +static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { + return getTableMetaImpl(pCxt, pTname, false); +} + +static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { + return getTableMetaImpl(pCxt, pTname, true); +} + + static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) { while (start < end) { if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) { @@ -818,7 +831,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) SToken sToken; // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); - CHECK_CODE(getTableMeta(pCxt, &sToken)); + CHECK_CODE(getSTableMeta(pCxt, &sToken)); if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 62a96b6438..d1def1bef1 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -156,8 +156,8 @@ typedef struct SSchJob { int32_t levelNum; int32_t taskNum; void *transport; - SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr - SArray *levels; // Element is SQueryLevel, starting from 0. SArray + SArray *nodeList; // qnode/vnode list, SArray + SArray *levels; // starting from 0. SArray SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler int32_t levelIdx; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index c5258afcc1..93bc16a7a2 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2655,6 +2655,34 @@ _return: SCH_RET(code); } +int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { + int32_t code = 0; + SSchJob *pJob = schAcquireJob(job); + if (NULL == pJob) { + qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) { + qDebug("job not initialized or not executable job, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + + for (int32_t m = 0; m < pLevel->taskNum; ++m) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status}; + + taosArrayPush(pSub, &subDesc); + } + } + + return TSDB_CODE_SUCCESS; +} + + int32_t scheduleCancelJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { @@ -2672,7 +2700,7 @@ int32_t scheduleCancelJob(int64_t job) { void schedulerFreeJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); return; } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index a74b26a386..4477a5cacd 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -303,6 +303,21 @@ void taosArrayClear(SArray* pArray) { pArray->size = 0; } +void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { + if (pArray == NULL) return; + if (fp == NULL) { + pArray->size = 0; + return; + } + + for (int32_t i = 0; i < pArray->size; ++i) { + fp(TARRAY_GET_ELEM(pArray, i)); + } + + pArray->size = 0; +} + + void* taosArrayDestroy(SArray* pArray) { if (pArray) { taosMemoryFree(pArray->pData); diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 1809d99209..ac2010efa3 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -344,6 +344,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { atomic_store_64(&result, 0); freeResultWithRid(oresult); + taos_free_result(pSql); + return; }