enable hb
This commit is contained in:
parent
06a370c8de
commit
ecf00f7f57
|
@ -60,7 +60,6 @@ int32_t init_env() {
|
|||
pRes =
|
||||
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
assert(0);
|
||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
*/
|
||||
|
||||
#include "catalog.h"
|
||||
#include "scheduler.h"
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "scheduler.h"
|
||||
#include "trpc.h"
|
||||
|
||||
static SClientHbMgr clientHbMgr = {0};
|
||||
|
@ -110,7 +110,8 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == info) {
|
||||
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid, pRsp->connKey.connType);
|
||||
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
|
||||
pRsp->connKey.connType);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -121,7 +122,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
} else {
|
||||
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
|
||||
pTscObj->connId = pRsp->query->connId;
|
||||
|
||||
|
||||
if (pRsp->query->killRid) {
|
||||
SRequestObj *pRequest = acquireRequest(pRsp->query->killRid);
|
||||
if (NULL == pRequest) {
|
||||
|
@ -131,7 +132,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
releaseRequest(pRsp->query->killRid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (pRsp->query->killConnection) {
|
||||
taos_close(pTscObj);
|
||||
}
|
||||
|
@ -139,7 +140,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
releaseTscObj(pRsp->connKey.tscRid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int32_t kvNum = pRsp->info ? taosArrayGetSize(pRsp->info) : 0;
|
||||
|
||||
tscDebug("hb got %d rsp kv", kvNum);
|
||||
|
@ -236,24 +237,24 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
|
|||
}
|
||||
|
||||
int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
||||
int64_t now = taosGetTimestampUs();
|
||||
int64_t now = taosGetTimestampUs();
|
||||
SQueryDesc desc = {0};
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
void *pIter = taosHashIterate(pObj->pRequests, NULL);
|
||||
void *pIter = taosHashIterate(pObj->pRequests, NULL);
|
||||
while (pIter != NULL) {
|
||||
int64_t *rid = pIter;
|
||||
int64_t *rid = pIter;
|
||||
SRequestObj *pRequest = acquireRequest(*rid);
|
||||
if (NULL == pRequest) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tstrncpy(desc.sql, pRequest->sqlstr, sizeof(desc.sql));
|
||||
desc.stime = pRequest->metric.start;
|
||||
desc.queryId = pRequest->requestId;
|
||||
desc.stime = pRequest->metric.start;
|
||||
desc.queryId = pRequest->requestId;
|
||||
desc.useconds = now - pRequest->metric.start;
|
||||
desc.reqRid = pRequest->self;
|
||||
desc.pid = hbBasic->pid;
|
||||
desc.reqRid = pRequest->self;
|
||||
desc.pid = hbBasic->pid;
|
||||
taosGetFqdn(desc.fqdn);
|
||||
desc.subPlanNum = pRequest->body.pDag ? pRequest->body.pDag->numOfSubplans : 0;
|
||||
|
||||
|
@ -271,9 +272,9 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
|
|||
}
|
||||
}
|
||||
|
||||
releaseRequest(*rid);
|
||||
releaseRequest(*rid);
|
||||
taosArrayPush(hbBasic->queryDesc, &desc);
|
||||
|
||||
|
||||
pIter = taosHashIterate(pObj->pRequests, pIter);
|
||||
}
|
||||
|
||||
|
@ -286,14 +287,14 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
|||
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
|
||||
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
|
||||
if (numOfQueries <= 0) {
|
||||
releaseTscObj(connKey->tscRid);
|
||||
tscDebug("no queries on connection");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
|
||||
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
|
||||
if (NULL == hbBasic) {
|
||||
tscError("calloc %d failed", (int32_t)sizeof(SQueryHbReqBasic));
|
||||
|
@ -308,7 +309,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
|
|||
taosMemoryFree(hbBasic);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
||||
hbBasic->connId = pTscObj->connId;
|
||||
hbBasic->pid = taosGetPId();
|
||||
taosGetAppName(hbBasic->app, NULL);
|
||||
|
@ -405,7 +406,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
|||
}
|
||||
|
||||
hbGetQueryBasicInfo(connKey, req);
|
||||
|
||||
|
||||
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
|
@ -471,10 +472,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
}
|
||||
|
||||
// if (code) {
|
||||
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
|
||||
// taosMemoryFreeClear(pBatchReq);
|
||||
// }
|
||||
// if (code) {
|
||||
// taosArrayDestroyEx(pBatchReq->reqs, hbFreeReq);
|
||||
// taosMemoryFreeClear(pBatchReq);
|
||||
// }
|
||||
|
||||
return pBatchReq;
|
||||
}
|
||||
|
@ -630,24 +631,23 @@ void appHbMgrCleanup(void) {
|
|||
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||
|
||||
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
|
||||
|
||||
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
|
||||
while (pIter != NULL) {
|
||||
SClientHbReq *pOneReq = pIter;
|
||||
hbFreeReq(pOneReq);
|
||||
taosHashCleanup(pOneReq->info);
|
||||
pIter = taosHashIterate(pTarget->activeInfo, pIter);
|
||||
}
|
||||
}
|
||||
taosHashCleanup(pTarget->activeInfo);
|
||||
pTarget->activeInfo = NULL;
|
||||
|
||||
|
||||
pIter = taosHashIterate(pTarget->connInfo, NULL);
|
||||
while (pIter != NULL) {
|
||||
SHbConnInfo *info = pIter;
|
||||
taosMemoryFree(info->param);
|
||||
pIter = taosHashIterate(pTarget->connInfo, pIter);
|
||||
}
|
||||
}
|
||||
taosHashCleanup(pTarget->connInfo);
|
||||
pTarget->connInfo = NULL;
|
||||
|
||||
|
@ -668,13 +668,13 @@ int hbMgrInit() {
|
|||
hbMgrInitHandle();
|
||||
|
||||
// init backgroud thread
|
||||
//hbCreateThread();
|
||||
hbCreateThread();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void hbMgrCleanUp() {
|
||||
//hbStopThread();
|
||||
// hbStopThread();
|
||||
|
||||
// destroy all appHbMgr
|
||||
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
|
||||
|
@ -747,11 +747,11 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
|
|||
taosMemoryFree(info->param);
|
||||
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
||||
}
|
||||
|
||||
|
||||
if (NULL == pReq || NULL == info) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue