From b8391fad62eb5d23544f7218a272685325e40aea Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 12 Apr 2022 19:10:52 +0800 Subject: [PATCH] fix heartbeat msg --- include/common/tmsg.h | 114 ++++++---- include/libs/scheduler/scheduler.h | 2 + include/util/tarray.h | 8 + source/client/inc/clientInt.h | 10 +- source/client/src/clientEnv.c | 70 +++++- source/client/src/clientHb.c | 146 +++++++++++- source/client/src/clientImpl.c | 2 +- source/client/src/clientMain.c | 6 +- source/client/src/clientMsgHandler.c | 2 +- source/common/src/tmsg.c | 117 +++++++++- source/dnode/mnode/impl/inc/mndInt.h | 1 - source/dnode/mnode/impl/src/mndDb.c | 2 + source/dnode/mnode/impl/src/mndProfile.c | 277 +++++++++++------------ source/libs/scheduler/inc/schedulerInt.h | 4 +- source/libs/scheduler/src/scheduler.c | 30 ++- source/util/src/tarray.c | 15 ++ 16 files changed, 586 insertions(+), 220 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ba9147dcdd..dd7d904255 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -341,12 +341,12 @@ int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); typedef struct { - int32_t acctId; - int64_t clusterId; - int32_t connId; - int8_t superUser; - SEpSet epSet; - char sVersion[128]; + int32_t acctId; + int64_t clusterId; + uint32_t connId; + int8_t superUser; + SEpSet epSet; + char sVersion[128]; } SConnectRsp; int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); @@ -1038,40 +1038,6 @@ typedef struct { int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); -typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; - int32_t queryId; - int64_t useconds; - int64_t stime; - int64_t qId; - int64_t sqlObjId; - int32_t pid; - char fqdn[TSDB_FQDN_LEN]; - int8_t stableQuery; - int32_t numOfSub; - char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) -} SQueryDesc; - -typedef struct { - int32_t connId; - int32_t pid; - int32_t numOfQueries; - int32_t numOfStreams; - char app[TSDB_APP_NAME_LEN]; - char pData[]; -} SHeartBeatReq; - -typedef struct { - int32_t connId; - int32_t queryId; - int32_t streamId; - int32_t totalDnodes; - int32_t onlineDnodes; - int8_t killConnection; - int8_t align[3]; - SEpSet epSet; -} SHeartBeatRsp; - typedef struct { int32_t connId; int32_t queryId; @@ -1674,13 +1640,48 @@ typedef struct { } SKv; typedef struct { - int32_t connId; + int64_t tscRid; int32_t hbType; } SClientHbKey; typedef struct { - SClientHbKey connKey; - SHashObj* info; // hash + int64_t tid; + int32_t status; +} SQuerySubDesc; + +typedef struct { + char sql[TSDB_SHOW_SQL_LEN]; + uint64_t queryId; + int64_t useconds; + int64_t stime; + int64_t reqRid; + int32_t pid; + char fqdn[TSDB_FQDN_LEN]; + int32_t subPlanNum; + SArray* subDesc; // SArray +} SQueryDesc; + +typedef struct { + uint32_t connId; + int32_t pid; + char app[TSDB_APP_NAME_LEN]; + SArray* queryDesc; // SArray +} SQueryHbReqBasic; + +typedef struct { + uint32_t connId; + uint64_t killRid; + int32_t totalDnodes; + int32_t onlineDnodes; + int8_t killConnection; + int8_t align[3]; + SEpSet epSet; +} SQueryHbRspBasic; + +typedef struct { + SClientHbKey connKey; + SQueryHbReqBasic* query; + SHashObj* info; // hash } SClientHbReq; typedef struct { @@ -1689,9 +1690,10 @@ typedef struct { } SClientHbBatchReq; typedef struct { - SClientHbKey connKey; - int32_t status; - SArray* info; // Array + SClientHbKey connKey; + int32_t status; + SQueryHbRspBasic* query; + SArray* info; // Array } SClientHbRsp; typedef struct { @@ -1711,8 +1713,23 @@ static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { } } +static FORCE_INLINE void tFreeClientHbQueryDesc(void* pDesc) { + SQueryDesc* desc = (SQueryDesc*)pDesc; + if (desc->subDesc) { + taosArrayDestroy(desc->subDesc); + desc->subDesc = NULL; + } +} + static FORCE_INLINE void tFreeClientHbReq(void* pReq) { SClientHbReq* req = (SClientHbReq*)pReq; + if (req->query) { + if (req->query->queryDesc) { + taosArrayDestroyEx(req->query->queryDesc, tFreeClientHbQueryDesc); + } + taosMemoryFreeClear(req->query); + } + if (req->info) { tFreeReqKvHash(req->info); taosHashCleanup(req->info); @@ -1741,6 +1758,7 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) { static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { SClientHbRsp* rsp = (SClientHbRsp*)pRsp; + taosMemoryFreeClear(rsp->query); if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } @@ -1769,13 +1787,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { } static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { - if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1; + if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1; if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1; return 0; } static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { - if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1; + if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1; if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1; return 0; } diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 5ab4ead89c..460749243c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -89,6 +89,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD */ int32_t schedulerFetchRows(int64_t job, void **data); +int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); + /** * Cancel query job diff --git a/include/util/tarray.h b/include/util/tarray.h index 521e54040d..383af8309d 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -205,6 +205,14 @@ SArray* taosArrayDup(const SArray* pSrc); */ void taosArrayClear(SArray* pArray); +/** + * clear the array (remove all element) + * @param pArray + * @param fp + */ +void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); + + /** * destroy array list * @param pArray diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 96d7cead68..4bb1d8e3ff 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -43,7 +43,8 @@ extern "C" { } \ } while (0) -#define HEARTBEAT_INTERVAL 1500 // ms +//#define HEARTBEAT_INTERVAL 1500 // ms +#define HEARTBEAT_INTERVAL 15000 // ms TODO typedef struct SAppInstInfo SAppInstInfo; @@ -139,6 +140,7 @@ typedef struct STscObj { TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; + SHashObj* pRequests; } STscObj; typedef struct SResultColumn { @@ -215,11 +217,15 @@ int taos_init(); void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); void destroyTscObj(void* pObj); +STscObj *acquireTscObj(int64_t rid); +int32_t releaseTscObj(int64_t rid); uint64_t generateRequestId(); void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void destroyRequest(SRequestObj* pRequest); +SRequestObj *acquireRequest(int64_t rid); +int32_t releaseRequest(int64_t rid); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); @@ -258,7 +264,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); // conn level -int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType); +int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int32_t hbType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 359649884f..f997d4cff2 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -37,7 +37,8 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static void registerRequest(SRequestObj *pRequest) { - STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id); + STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); + assert(pTscObj != NULL); // connection has been released already, abort creating request. @@ -69,7 +70,7 @@ static void deregisterRequest(SRequestObj *pRequest) { tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 " ms, current:%d, app current:%d", pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst); - taosReleaseRef(clientConnRefPool, pTscObj->id); + releaseTscObj(pTscObj->id); } // todo close the transporter properly @@ -107,12 +108,24 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { return pDnodeConn; } +void closeAllRequests(SHashObj *pRequests) { + void *pIter = taosHashIterate(pRequests, NULL); + while (pIter != NULL) { + int64_t *rid = pIter; + + releaseRequest(*rid); + + pIter = taosHashIterate(pRequests, pIter); + } +} + void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; - SClientHbKey connKey = {.connId = pTscObj->connId, .hbType = pTscObj->connType}; + SClientHbKey connKey = {.tscRid = pTscObj->id, .hbType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + closeAllRequests(pTscObj->pRequests); tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); @@ -125,6 +138,13 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI return NULL; } + pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pObj->pRequests) { + taosMemoryFree(pObj); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + pObj->pAppInfo = pAppInfo; tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); @@ -140,6 +160,14 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI return pObj; } +STscObj *acquireTscObj(int64_t rid) { + return (STscObj *)taosAcquireRef(clientConnRefPool, rid); +} + +int32_t releaseTscObj(int64_t rid) { + return taosReleaseRef(clientConnRefPool, rid); +} + void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) { assert(pObj != NULL); @@ -160,6 +188,14 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty tsem_init(&pRequest->body.rspSem, 0, 0); registerRequest(pRequest); + + if (taosHashPut(pObj->pRequests, &pRequest->self, sizeof(pRequest->self), &pRequest->self, sizeof(pRequest->self))) { + destroyRequest(pRequest); + releaseTscObj(pObj->id); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + return pRequest; } @@ -185,6 +221,8 @@ static void doDestroyRequest(void *p) { assert(RID_VALID(pRequest->self)); + taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); + taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->pInfo); @@ -213,9 +251,18 @@ void destroyRequest(SRequestObj *pRequest) { return; } - taosReleaseRef(clientReqRefPool, pRequest->self); + taosRemoveRef(clientReqRefPool, pRequest->self); } +SRequestObj *acquireRequest(int64_t rid) { + return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); +} + +int32_t releaseRequest(int64_t rid) { + return taosReleaseRef(clientReqRefPool, rid); +} + + void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. @@ -456,11 +503,18 @@ uint64_t generateRequestId() { } } - int64_t ts = taosGetTimestampMs(); - uint64_t pid = taosGetPId(); - int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + uint64_t id = 0; + + while (true) { + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); - uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + if (id) { + break; + } + } return id; } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index d389fc34c6..642ba46336 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -14,6 +14,7 @@ */ #include "catalog.h" +#include "scheduler.h" #include "clientInt.h" #include "clientLog.h" #include "trpc.h" @@ -107,10 +108,36 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { - tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); + tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.hbType); return TSDB_CODE_SUCCESS; } + if (pRsp->query) { + STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid); + if (NULL == pTscObj) { + tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid); + } else { + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); + pTscObj->connId = pRsp->query->connId; + + if (pRsp->query->killRid) { + SRequestObj *pRequest = acquireRequest(pRsp->query->killRid); + if (NULL == pRequest) { + tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid); + } else { + taos_stop_query((TAOS_RES *)pRequest); + releaseRequest(pRsp->query->killRid); + } + } + + if (pRsp->query->killConnection) { + taos_close(pTscObj); + } + + releaseTscObj(pRsp->connKey.tscRid); + } + } + int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0; tscDebug("hb got %d rsp kv", kvNum); @@ -206,6 +233,97 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) return code; } +int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { + int64_t now = taosGetTimestampUs(); + SQueryDesc desc = {0}; + int32_t code = 0; + + void *pIter = taosHashIterate(pObj->pRequests, NULL); + while (pIter != NULL) { + int64_t *rid = pIter; + SRequestObj *pRequest = acquireRequest(*rid); + if (NULL == pRequest) { + continue; + } + + tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql)); + desc.stime = pRequest->metric.start; + desc.queryId = pRequest->requestId; + desc.useconds = now - pRequest->metric.start; + desc.reqRid = pRequest->self; + desc.pid = hbBasic->pid; + taosGetFqdn(desc.fqdn); + desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0; + + if (desc.subPlanNum) { + desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc)); + if (NULL == desc.subDesc) { + releaseRequest(*rid); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc); + if (code) { + taosArrayDestroy(desc.subDesc); + desc.subDesc = NULL; + } + } + + releaseRequest(*rid); + taosArrayPush(hbBasic->queryDesc, &desc); + + pIter = taosHashIterate(pObj->pRequests, pIter); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { + STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); + if (NULL == pTscObj) { + tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); + return TSDB_CODE_QRY_APP_ERROR; + } + + int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0; + if (numOfQueries <= 0) { + releaseTscObj(connKey->tscRid); + tscDebug("no queries on connection"); + return TSDB_CODE_QRY_APP_ERROR; + } + + SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic)); + if (NULL == hbBasic) { + tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic)); + releaseTscObj(connKey->tscRid); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + hbBasic->queryDesc = taosArrayInit(numOfQueries, sizeof(SQueryDesc)); + if (NULL == hbBasic->queryDesc) { + tscWarn("taosArrayInit %d queryDesc failed", numOfQueries); + releaseTscObj(connKey->tscRid); + taosMemoryFree(hbBasic); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + hbBasic->connId = pTscObj->connId; + hbBasic->pid = taosGetPId(); + taosGetAppName(hbBasic->app, NULL); + + int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); + if (code) { + releaseTscObj(connKey->tscRid); + taosMemoryFree(hbBasic); + return code; + } + + req->query = hbBasic; + releaseTscObj(connKey->tscRid); + + return TSDB_CODE_SUCCESS; +} + int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SDbVgVersion *dbs = NULL; uint32_t dbNum = 0; @@ -284,6 +402,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return code; } + hbGetQueryBasicInfo(connKey, req); + code = hbGetExpiredDBInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { return code; @@ -316,6 +436,11 @@ void hbFreeReq(void *req) { tFreeReqKvHash(pReq->info); } +void hbClearClientHbReq(SClientHbReq *pReq) { + pReq->query = NULL; + pReq->info = NULL; +} + SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { @@ -334,20 +459,21 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { if (info) { code = (*clientHbMgr.reqHandle[pOneReq->connKey.hbType])(&pOneReq->connKey, info->param, pOneReq); if (code) { - taosHashCancelIterate(pAppHbMgr->activeInfo, pIter); - break; + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + continue; } } taosArrayPush(pBatchReq->reqs, pOneReq); + hbClearClientHbReq(pOneReq); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } - if (code) { - taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); - taosMemoryFreeClear(pBatchReq); - } +// if (code) { +// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq); +// taosMemoryFreeClear(pBatchReq); +// } return pBatchReq; } @@ -548,7 +674,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * if (data != NULL) { return 0; } - SClientHbReq hbReq; + SClientHbReq hbReq = {0}; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); @@ -565,9 +691,9 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * return 0; } -int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { +int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int32_t hbType) { SClientHbKey connKey = { - .connId = connId, + .tscRid = tscRefId, .hbType = HEARTBEAT_TYPE_QUERY, }; SHbConnInfo info = {0}; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b311060ea9..2c58094b4d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -458,7 +458,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, + tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e10cf5179e..76257a7c0e 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -14,6 +14,7 @@ */ #include "catalog.h" +#include "scheduler.h" #include "clientInt.h" #include "clientLog.h" #include "os.h" @@ -66,6 +67,7 @@ void taos_cleanup(void) { rpcCleanup(); catalogDestroy(); + schedulerDestroy(); taosCloseLog(); tscInfo("all local resources released"); @@ -98,7 +100,7 @@ void taos_close(TAOS *taos) { STscObj *pTscObj = (STscObj *)taos; tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); - /*taosRemoveRef(clientConnRefPool, pTscObj->id);*/ + taosRemoveRef(clientConnRefPool, pTscObj->id); } int taos_errno(TAOS_RES *tres) { @@ -366,7 +368,7 @@ void taos_stop_query(TAOS_RES *res) { return; } - // scheduleCancelJob(pRequest->body.pQueryJob); + schedulerFreeJob(pRequest->body.queryJob); } bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 4314391743..ca4c681304 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -71,7 +71,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connType = HEARTBEAT_TYPE_QUERY; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY); // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5973a70b59..92fa9b8e97 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -134,6 +134,42 @@ void *taosDecodeSEpSet(void *buf, SEpSet *pEp) { static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; + if (pReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { + int32_t queryNum = 0; + if (pReq->query) { + queryNum = 1; + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + if (tEncodeU32(pEncoder, pReq->query->connId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->query->pid) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->query->app) < 0) return -1; + + int32_t num = taosArrayGetSize(pReq->query->queryDesc); + if (tEncodeI32(pEncoder, num) < 0) return -1; + + for (int32_t i = 0; i < num; ++i) { + SQueryDesc *desc = taosArrayGet(pReq->query->queryDesc, i); + if (tEncodeCStr(pEncoder, desc->sql) < 0) return -1; + if (tEncodeU64(pEncoder, desc->queryId) < 0) return -1; + if (tEncodeI64(pEncoder, desc->useconds) < 0) return -1; + if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; + if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; + if (tEncodeI32(pEncoder, desc->pid) < 0) return -1; + if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; + if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; + + int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0; + if (tEncodeI32(pEncoder, snum) < 0) return -1; + for (int32_t m = 0; m < snum; ++m) { + SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m); + if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1; + if (tEncodeI32(pEncoder, sDesc->status) < 0) return -1; + } + } + } else { + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + } + } + int32_t kvNum = taosHashGetSize(pReq->info); if (tEncodeI32(pEncoder, kvNum) < 0) return -1; void *pIter = taosHashIterate(pReq->info, NULL); @@ -149,6 +185,53 @@ static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq static int32_t tDeserializeSClientHbReq(SCoder *pDecoder, SClientHbReq *pReq) { if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1; + if (pReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { + int32_t queryNum = 0; + if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; + if (queryNum) { + pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query)); + if (NULL == pReq->query) return -1; + if (tDecodeU32(pDecoder, &pReq->query->connId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->query->pid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pReq->query->app) < 0) return -1; + + int32_t num = 0; + if (tDecodeI32(pDecoder, &num) < 0) return -1; + if (num > 0) { + pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc)); + if (NULL == pReq->query->queryDesc) return -1; + + for (int32_t i = 0; i < num; ++i) { + SQueryDesc desc = {0}; + if (tDecodeCStrTo(pDecoder, desc.sql) < 0) return -1; + if (tDecodeU64(pDecoder, &desc.queryId) < 0) return -1; + if (tDecodeI64(pDecoder, &desc.useconds) < 0) return -1; + if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; + if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; + if (tDecodeI32(pDecoder, &desc.pid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; + if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; + + int32_t snum = 0; + if (tDecodeI32(pDecoder, &snum) < 0) return -1; + if (snum > 0) { + desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc)); + if (NULL == desc.subDesc) return -1; + + for (int32_t m = 0; m < snum; ++m) { + SQuerySubDesc sDesc = {0}; + if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1; + if (tDecodeI32(pDecoder, &sDesc.status) < 0) return -1; + taosArrayPush(desc.subDesc, &sDesc); + } + } + + taosArrayPush(pReq->query->queryDesc, &desc); + } + } + } + } + int32_t kvNum = 0; if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; if (pReq->info == NULL) { @@ -168,6 +251,20 @@ static int32_t tSerializeSClientHbRsp(SCoder *pEncoder, const SClientHbRsp *pRsp if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1; + int32_t queryNum = 0; + if (pRsp->query) { + queryNum = 1; + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + if (tEncodeU32(pEncoder, pRsp->query->connId) < 0) return -1; + if (tEncodeU64(pEncoder, pRsp->query->killRid) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->query->totalDnodes) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1; + } else { + if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + } + int32_t kvNum = taosArrayGetSize(pRsp->info); if (tEncodeI32(pEncoder, kvNum) < 0) return -1; for (int32_t i = 0; i < kvNum; i++) { @@ -182,6 +279,19 @@ static int32_t tDeserializeSClientHbRsp(SCoder *pDecoder, SClientHbRsp *pRsp) { if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1; + int32_t queryNum = 0; + if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; + if (queryNum) { + pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query)); + if (NULL == pRsp->query) return -1; + if (tDecodeU32(pDecoder, &pRsp->query->connId) < 0) return -1; + if (tDecodeU64(pDecoder, &pRsp->query->killRid) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->query->totalDnodes) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1; + } + int32_t kvNum = 0; if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); @@ -224,8 +334,9 @@ int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchR int32_t reqNum = 0; if (tDecodeI32(&decoder, &reqNum) < 0) return -1; - if (pBatchReq->reqs == NULL) { + if (reqNum > 0) { pBatchReq->reqs = taosArrayInit(reqNum, sizeof(SClientHbReq)); + if (NULL == pBatchReq->reqs) return -1; } for (int32_t i = 0; i < reqNum; i++) { SClientHbReq req = {0}; @@ -2564,7 +2675,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1; if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1; + if (tEncodeU32(&encoder, pRsp->connId) < 0) return -1; if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1; @@ -2582,7 +2693,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; + if (tDecodeU32(&decoder, &pRsp->connId) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index fa1502fe10..ad42eebc1b 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,7 +75,6 @@ typedef struct { } SShowMgmt; typedef struct { - int32_t connId; SCacheObj *cache; } SProfileMgmt; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 49e9ccaba6..fe75e6c60c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1128,6 +1128,8 @@ static int32_t mndProcessUseDbReq(SNodeMsg *pReq) { if (taosArrayGetSize(usedbRsp.pVgroupInfos) <= 0) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; + } else { + code = 0; } } else { usedbRsp.vgVersion = usedbReq.vgVersion; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 320671c332..b036b857e5 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -29,7 +29,7 @@ #define QUERY_SAVE_SIZE 20 typedef struct { - int32_t id; + uint32_t id; char user[TSDB_USER_LEN]; char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc int64_t appStartTimeMs; // app start time @@ -39,15 +39,15 @@ typedef struct { int8_t killed; int64_t loginTimeMs; int64_t lastAccessTimeMs; - int32_t queryId; + uint64_t killId; int32_t numOfQueries; - SQueryDesc *pQueries; + SArray *pQueries; //SArray } SConnObj; static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime); static void mndFreeConn(SConnObj *pConn); -static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); +static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); @@ -97,8 +97,9 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui const char *app, int64_t startTime) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); - if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1); + char connStr[255] = {0}; + int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); + int32_t connId = mndGenerateUid(connStr, len); if (startTime == 0) startTime = taosGetTimestampMs(); SConnObj connObj = {.id = connId, @@ -109,7 +110,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui .killed = 0, .loginTimeMs = taosGetTimestampMs(), .lastAccessTimeMs = 0, - .queryId = 0, + .killId = 0, .numOfQueries = 0, .pQueries = NULL}; @@ -124,35 +125,35 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, ui mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); return NULL; } else { - mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user); + mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user); return pConn; } } static void mndFreeConn(SConnObj *pConn) { taosMemoryFreeClear(pConn->pQueries); - mTrace("conn:%d, is destroyed, data:%p", pConn->id, pConn); + mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn); } -static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) { +static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(connId)); if (pConn == NULL) { - mDebug("conn:%d, already destroyed", connId); + mDebug("conn:%u, already destroyed", connId); return NULL; } int32_t keepTime = tsShellActivityTimer * 3; pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs(); - mTrace("conn:%d, acquired from cache, data:%p", pConn->id, pConn); + mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn); return pConn; } static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { if (pConn == NULL) return; - mTrace("conn:%d, released from cache, data:%p", pConn->id, pConn); + mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn); SProfileMgmt *pMgmt = &pMnode->profileMgmt; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); @@ -217,6 +218,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { goto CONN_OVER; } + mndAcquireConn(pMnode, pConn->id); + SConnectRsp connectRsp = {0}; connectRsp.acctId = pUser->acctId; connectRsp.superUser = pUser->superUser; @@ -236,7 +239,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { pReq->rspLen = contLen; pReq->pRsp = pRsp; - mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app); + mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->user, ip, pConn->port, pConn->id, connReq.app); code = 0; @@ -249,22 +252,13 @@ CONN_OVER: return code; } -static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { - pConn->numOfQueries = 0; - int32_t numOfQueries = htonl(pReq->numOfQueries); +static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { + taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc); - if (numOfQueries > 0) { - if (pConn->pQueries == NULL) { - pConn->pQueries = taosMemoryCalloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE); - } - - pConn->numOfQueries = TMIN(QUERY_SAVE_SIZE, numOfQueries); - - int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc); - if (saveSize > 0 && pConn->pQueries != NULL) { - memcpy(pConn->pQueries, pReq->pData, saveSize); - } - } + pConn->pQueries = pBasic->queryDesc; + pBasic->queryDesc = NULL; + + pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; return TSDB_CODE_SUCCESS; } @@ -334,6 +328,111 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { return NULL; } +static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; + + if (pHbReq->query) { + SQueryHbReqBasic *pBasic = pHbReq->query; + + SRpcConnInfo connInfo = {0}; + rpcGetConnInfo(pMsg->handle, &connInfo); + + SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId); + if (pConn == NULL) { + pConn = mndCreateConn(pMnode, connInfo.user, connInfo.clientIp, connInfo.clientPort, pBasic->pid, pBasic->app, 0); + if (pConn == NULL) { + mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr()); + return -1; + } else { + mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id); + } + } else if (pConn->killed) { + mError("user:%s, conn:%u is already killed", connInfo.user, pConn->id); + mndReleaseConn(pMnode, pConn); + terrno = TSDB_CODE_MND_INVALID_CONNECTION; + return -1; + } + + SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); + if (rspBasic == NULL) { + mndReleaseConn(pMnode, pConn); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr()); + return -1; + } + + mndSaveQueryList(pConn, pBasic); + if (pConn->killed != 0) { + rspBasic->killConnection = 1; + } + + if (pConn->killId != 0) { + rspBasic->killRid = pConn->killId; + pConn->killId = 0; + } + + rspBasic->connId = pConn->id; + rspBasic->totalDnodes = 1; //TODO + rspBasic->onlineDnodes = 1; //TODO + mndGetMnodeEpSet(pMnode, &rspBasic->epSet); + mndReleaseConn(pMnode, pConn); + + hbRsp.query = rspBasic; + } + + int32_t kvNum = taosHashGetSize(pHbReq->info); + if (NULL == pHbReq->info || kvNum <= 0) { + taosArrayPush(pBatchRsp->rsps, &hbRsp); + return TSDB_CODE_SUCCESS; + } + + hbRsp.info = taosArrayInit(kvNum, sizeof(SKv)); + if (NULL == hbRsp.info) { + mError("taosArrayInit %d rsp kv failed", kvNum); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + void *pIter = taosHashIterate(pHbReq->info, NULL); + while (pIter != NULL) { + SKv *kv = pIter; + + switch (kv->key) { + case HEARTBEAT_KEY_DBINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } + case HEARTBEAT_KEY_STBINFO: { + void *rspMsg = NULL; + int32_t rspLen = 0; + mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); + if (rspMsg && rspLen > 0) { + SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; + taosArrayPush(hbRsp.info, &kv1); + } + break; + } + default: + mError("invalid kv key:%d", kv->key); + hbRsp.status = TSDB_CODE_MND_APP_ERROR; + break; + } + + pIter = taosHashIterate(pHbReq->info, pIter); + } + + taosArrayPush(pBatchRsp->rsps, &hbRsp); + + return TSDB_CODE_SUCCESS; +} + static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; @@ -351,48 +450,7 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { for (int i = 0; i < sz; i++) { SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i); if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { - int32_t kvNum = taosHashGetSize(pHbReq->info); - if (NULL == pHbReq->info || kvNum <= 0) { - continue; - } - - SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = taosArrayInit(kvNum, sizeof(SKv))}; - - void *pIter = taosHashIterate(pHbReq->info, NULL); - while (pIter != NULL) { - SKv *kv = pIter; - - switch (kv->key) { - case HEARTBEAT_KEY_DBINFO: { - void *rspMsg = NULL; - int32_t rspLen = 0; - mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbVgVersion), &rspMsg, &rspLen); - if (rspMsg && rspLen > 0) { - SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp.info, &kv1); - } - break; - } - case HEARTBEAT_KEY_STBINFO: { - void *rspMsg = NULL; - int32_t rspLen = 0; - mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); - if (rspMsg && rspLen > 0) { - SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; - taosArrayPush(hbRsp.info, &kv1); - } - break; - } - default: - mError("invalid kv key:%d", kv->key); - hbRsp.status = TSDB_CODE_MND_APP_ERROR; - break; - } - - pIter = taosHashIterate(pHbReq->info, pIter); - } - - taosArrayPush(batchRsp.rsps, &hbRsp); + mndProcessQueryHeartBeat(pMnode, &pReq->rpcMsg, pHbReq, &batchRsp); } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); if (pRsp != NULL) { @@ -421,73 +479,8 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { taosArrayDestroy(batchRsp.rsps); pReq->rspLen = tlen; pReq->pRsp = buf; + return 0; - -#if 0 - SMnode *pMnode = pReq->pNode; - SProfileMgmt *pMgmt = &pMnode->profileMgmt; - - SHeartBeatReq *pHeartbeat = pReq->rpcMsg.pCont; - pHeartbeat->connId = htonl(pHeartbeat->connId); - pHeartbeat->pid = htonl(pHeartbeat->pid); - - SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId); - if (pConn == NULL) { - pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0); - if (pConn == NULL) { - mError("user:%s, conn:%d is freed and failed to create new since %s", pReq->user, pHeartbeat->connId, terrstr()); - return -1; - } else { - mDebug("user:%s, conn:%d is freed and create a new conn:%d", pReq->user, pHeartbeat->connId, pConn->id); - } - } else if (pConn->killed) { - mError("user:%s, conn:%d is already killed", pReq->user, pConn->id); - terrno = TSDB_CODE_MND_INVALID_CONNECTION; - return -1; - } else { - if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) { - char oldIpStr[40]; - char newIpStr[40]; - taosIpPort2String(pConn->ip, pConn->port, oldIpStr); - taosIpPort2String(info.clientIp, info.clientPort, newIpStr); - mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr, - pConn->user, oldIpStr); - - if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1; - taosCacheRelease(pMgmt->cache, (void **)&pConn, false); - terrno = TSDB_CODE_MND_INVALID_CONNECTION; - return -1; - } - } - - SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp)); - if (pRsp == NULL) { - mndReleaseConn(pMnode, pConn); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("user:%s, conn:%d failed to process hb while since %s", pReq->user, pHeartbeat->connId, terrstr()); - return -1; - } - - mndSaveQueryStreamList(pConn, pHeartbeat); - if (pConn->killed != 0) { - pRsp->killConnection = 1; - } - - if (pConn->queryId != 0) { - pRsp->queryId = htonl(pConn->queryId); - pConn->queryId = 0; - } - - pRsp->connId = htonl(pConn->id); - pRsp->totalDnodes = htonl(1); - pRsp->onlineDnodes = htonl(1); - mndGetMnodeEpSet(pMnode, &pRsp->epSet); - mndReleaseConn(pMnode, pConn); - - pReq->contLen = sizeof(SConnectRsp); - pReq->pRsp = pRsp; - return 0; -#endif } static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { @@ -518,7 +511,7 @@ static int32_t mndProcessKillQueryReq(SNodeMsg *pReq) { return -1; } else { mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->user); - pConn->queryId = killReq.queryId; + pConn->killId = killReq.queryId; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return 0; } @@ -651,7 +644,7 @@ static int32_t mndRetrieveConns(SNodeMsg *pReq, SShowObj *pShow, char *data, int cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pConn->id; + *(uint32_t *)pWrite = pConn->id; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -808,6 +801,7 @@ static int32_t mndGetQueryMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pNode; int32_t numOfRows = 0; +#if 0 SConnObj *pConn = NULL; int32_t cols = 0; char *pWrite; @@ -905,6 +899,7 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; +#endif return numOfRows; } @@ -917,4 +912,4 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { int32_t mndGetNumOfConnections(SMnode *pMnode) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; return taosCacheGetNumOfObj(pMgmt->cache); -} \ No newline at end of file +} diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 62a96b6438..d1def1bef1 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -156,8 +156,8 @@ typedef struct SSchJob { int32_t levelNum; int32_t taskNum; void *transport; - SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr - SArray *levels; // Element is SQueryLevel, starting from 0. SArray + SArray *nodeList; // qnode/vnode list, SArray + SArray *levels; // starting from 0. SArray SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler int32_t levelIdx; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 11f8e880a6..8dd7625325 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2655,6 +2655,34 @@ _return: SCH_RET(code); } +int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { + int32_t code = 0; + SSchJob *pJob = schAcquireJob(job); + if (NULL == pJob) { + qDebug("acquire job from jobRef list failed, may not started or dropped, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) { + qDebug("job not initialized or not executable job, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + + for (int32_t m = 0; m < pLevel->taskNum; ++m) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status}; + + taosArrayPush(pSub, &subDesc); + } + } + + return TSDB_CODE_SUCCESS; +} + + int32_t scheduleCancelJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { @@ -2672,7 +2700,7 @@ int32_t scheduleCancelJob(int64_t job) { void schedulerFreeJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); return; } diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index a74b26a386..4477a5cacd 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -303,6 +303,21 @@ void taosArrayClear(SArray* pArray) { pArray->size = 0; } +void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { + if (pArray == NULL) return; + if (fp == NULL) { + pArray->size = 0; + return; + } + + for (int32_t i = 0; i < pArray->size; ++i) { + fp(TARRAY_GET_ELEM(pArray, i)); + } + + pArray->size = 0; +} + + void* taosArrayDestroy(SArray* pArray) { if (pArray) { taosMemoryFree(pArray->pData);