enh:[TD-31070] Handling return value of clientHb.c

This commit is contained in:
sima 2024-07-23 17:20:32 +08:00
parent 99b0f1c652
commit 063965d28a
6 changed files with 438 additions and 232 deletions

View File

@ -310,7 +310,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
@ -349,10 +349,10 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
void **p);
STscObj **p);
void destroyTscObj(void* pObj);
STscObj* acquireTscObj(int64_t rid);
int32_t releaseTscObj(int64_t rid);
void releaseTscObj(int64_t rid);
void destroyAppInst(void* pAppInfo);
uint64_t generateRequestId();
@ -396,11 +396,11 @@ void taos_close_internal(void* taos);
// --- heartbeat
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int32_t hbMgrInit();
void hbMgrCleanUp();
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr);
void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr);
void destroyAllRequests(SHashObj* pRequests);
@ -409,7 +409,7 @@ void stopAllRequests(SHashObj* pRequests);
//SAppInstInfo* getAppInstInfo(const char* clusterKey);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
int32_t hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey);
typedef struct SSqlCallbackWrapper {
@ -428,7 +428,7 @@ void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView);
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog);
bool qnodeRequired(SRequestObj* pRequest);
int32_t qnodeRequired(SRequestObj* pRequest, bool *required);
void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest);
void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper);
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code);

View File

@ -305,9 +305,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
}
}
if (TSDB_CODE_SUCCESS != releaseTscObj(pTscObj->id)) {
tscError("failed to release STscObj");
}
releaseTscObj(pTscObj->id);
}
// todo close the transporter properly
@ -419,14 +417,14 @@ void destroyAppInst(void *info) {
int32_t code = taosThreadMutexLock(&appInfo.mutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock app info, code:%s", tstrerror(code));
tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
code = taosThreadMutexUnlock(&appInfo.mutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock app info, code:%s", tstrerror(code));
tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
taosMemoryFreeClear(pAppInfo->instKey);
@ -434,13 +432,13 @@ void destroyAppInst(void *info) {
code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock qnode mutex, code:%s", tstrerror(code));
tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
taosArrayDestroy(pAppInfo->pQnodeList);
code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock qnode mutex, code:%s", tstrerror(code));
tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
taosMemoryFree(pAppInfo);
@ -475,51 +473,55 @@ void destroyTscObj(void *pObj) {
}
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
void **p) {
STscObj *pObj = (STscObj *)*p;
pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
if (NULL == pObj) {
STscObj **pObj) {
*pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
if (NULL == *pObj) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pObj->pRequests) {
taosMemoryFree(pObj);
(*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == (*pObj)->pRequests) {
taosMemoryFree(*pObj);
return TSDB_CODE_OUT_OF_MEMORY;
}
pObj->connType = connType;
pObj->pAppInfo = pAppInfo;
pObj->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
tstrncpy(pObj->user, user, sizeof(pObj->user));
(void)memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
(*pObj)->connType = connType;
(*pObj)->pAppInfo = pAppInfo;
(*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
tstrncpy((*pObj)->user, user, sizeof((*pObj)->user));
(void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN);
if (db != NULL) {
tstrncpy(pObj->db, db, tListLen(pObj->db));
tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
}
int32_t code = taosThreadMutexInit(&pObj->mutex, NULL);
int32_t code = taosThreadMutexInit(&(*pObj)->mutex, NULL);
if (TSDB_CODE_SUCCESS != code) {
return TAOS_SYSTEM_ERROR(code);
}
pObj->id = taosAddRef(clientConnRefPool, pObj);
if (pObj->id < 0) {
(*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
if ((*pObj)->id < 0) {
tscError("failed to add object to clientConnRefPool");
code = terrno;
taosMemoryFree(pObj);
taosMemoryFree(*pObj);
return code;
}
(void)atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);
(void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1);
tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj);
tscDebug("connObj created, 0x%" PRIx64 ",p:%p", (*pObj)->id, *pObj);
return code;
}
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
void releaseTscObj(int64_t rid) {
int32_t code = taosReleaseRef(clientConnRefPool, rid);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("failed to release TscObj, rid:%lld, code:%s", rid, tstrerror(code));
}
}
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
int32_t code = TSDB_CODE_SUCCESS;
@ -535,9 +537,7 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj
}
SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (interParam == NULL) {
if (TSDB_CODE_SUCCESS != releaseTscObj(connId)) {
tscError("failed to release TscObj");
}
releaseTscObj(connId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
@ -857,20 +857,27 @@ static void *tscCrashReportThreadFp(void *param) {
int32_t tscCrashReportInit() {
if (!tsEnableCrashReport) {
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
TdThreadAttr thAttr;
TSC_ERR_RET(taosThreadAttrInit(&thAttr));
TSC_ERR_RET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
TdThread crashReportThread;
if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
tscError("failed to create crashReport thread since %s", strerror(errno));
return -1;
terrno = TAOS_SYSTEM_ERROR(errno);
TSC_ERR_RET(errno);
}
TSC_ERR_RET(taosThreadAttrDestroy(&thAttr));
return 0;
(void)taosThreadAttrDestroy(&thAttr);
_return:
if (code) {
terrno = TAOS_SYSTEM_ERROR(errno);
TSC_ERR_RET(terrno);
}
return TSDB_CODE_SUCCESS;
}
void tscStopCrashReport() {

View File

@ -99,9 +99,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
}
}
if (!pRsp) {
if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) {
tscError("failed to release tscObj");
}
releaseTscObj(pReq->connKey.tscRid);
taosHashCancelIterate(hbMgr->activeInfo, pReq);
break;
}
@ -116,9 +114,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
}
}
}
if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) {
tscError("failed to release tscObj");
}
releaseTscObj(pReq->connKey.tscRid);
continue;
}
@ -163,9 +159,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user,
oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id);
}
if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) {
tscError("failed to release tscObj");
}
releaseTscObj(pReq->connKey.tscRid);
}
}
return 0;
@ -529,9 +523,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid);
} else {
taos_stop_query((TAOS_RES *)pRequest);
if (TSDB_CODE_SUCCESS != releaseRequest(pRsp->query->killRid)) {
tscWarn("release request failed");
}
(void)releaseRequest(pRsp->query->killRid);
}
}
@ -545,9 +537,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
}
}
if (TSDB_CODE_SUCCESS != releaseTscObj(pRsp->connKey.tscRid)) {
tscWarn("release tscobj failed");
}
releaseTscObj(pRsp->connKey.tscRid);
}
}
@ -571,7 +561,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
return TSDB_CODE_SUCCESS;
}
//TODO(smj)
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (0 == atomic_load_8(&clientHbMgr.inited)) {
goto _return;
@ -581,8 +570,11 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
int32_t idx = *(int32_t *)param;
SClientHbBatchRsp pRsp = {0};
if (TSDB_CODE_SUCCESS == code) {
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
if (TSDB_CODE_SUCCESS != tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp)) {
code = terrno;
tscError("deserialize hb rsp failed");
goto _return;
}
int32_t now = taosGetTimestampSec();
int32_t delta = abs(now - pRsp.svrTimestamp);
if (delta > timestampDeltaLimit) {
@ -593,27 +585,25 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
taosThreadMutexLock(&clientHbMgr.lock);
code = taosThreadMutexLock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("lock failed");
code = TAOS_SYSTEM_ERROR(code);
goto _return;
}
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
if (pAppHbMgr == NULL) {
taosThreadMutexUnlock(&clientHbMgr.lock);
tscError("appHbMgr not exist, idx:%d", idx);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp);
return -1;
code = TSDB_CODE_OUT_OF_RANGE;
goto _returnunlock;
}
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
if (code != 0) {
pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
taosThreadMutexUnlock(&clientHbMgr.lock);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp);
return -1;
goto _returnunlock;
}
pInst->monitorParas = pRsp.monitorParas;
@ -624,22 +614,30 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
} else {
atomic_add_fetch_32(&emptyRspNum, 1);
(void)atomic_add_fetch_32(&emptyRspNum, 1);
}
for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
if (NULL == rsp) {
tscError("invalid hb rsp, idx:%d", i);
break;
}
code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
if (code) {
break;
}
}
taosThreadMutexUnlock(&clientHbMgr.lock);
tFreeClientHbBatchRsp(&pRsp);
_returnunlock:
code = taosThreadMutexUnlock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("unlock failed");
code = TAOS_SYSTEM_ERROR(code);
}
_return:
tFreeClientHbBatchRsp(&pRsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return code;
@ -660,7 +658,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
}
if (pRequest->killed || 0 == pRequest->body.queryJob) {
releaseRequest(*rid);
(void)releaseRequest(*rid);
pIter = taosHashIterate(pObj->pRequests, pIter);
continue;
}
@ -672,14 +670,19 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
desc.reqRid = pRequest->self;
desc.stableQuery = pRequest->stableQuery;
desc.isSubQuery = pRequest->isSubReq;
taosGetFqdn(desc.fqdn);
code = taosGetFqdn(desc.fqdn);
if (TSDB_CODE_SUCCESS != code) {
(void)releaseRequest(*rid);
tscError("get fqdn failed");
return TSDB_CODE_FAILED;
}
desc.subPlanNum = pRequest->body.subplanNum;
if (desc.subPlanNum) {
desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
if (NULL == desc.subDesc) {
releaseRequest(*rid);
return TSDB_CODE_OUT_OF_MEMORY;
(void)releaseRequest(*rid);
return terrno;
}
code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc);
@ -692,20 +695,23 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
desc.subDesc = NULL;
}
releaseRequest(*rid);
taosArrayPush(hbBasic->queryDesc, &desc);
(void)releaseRequest(*rid);
if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) {
taosArrayDestroy(desc.subDesc);
return TSDB_CODE_OUT_OF_MEMORY;
}
pIter = taosHashIterate(pObj->pRequests, pIter);
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (NULL == pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR;
return terrno;
}
SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic));
@ -730,7 +736,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
tscWarn("taosArrayInit %d queryDesc failed", numOfQueries);
releaseTscObj(connKey->tscRid);
taosMemoryFree(hbBasic);
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
@ -753,7 +759,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR;
return terrno;
}
int32_t code = 0;
@ -775,7 +781,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
SUserAuthVersion *qUserAuth =
(SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
if (qUserAuth) {
strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
(void)strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
(qUserAuth + userNum)->version = htonl(-1); // force get userAuthInfo
pKv->value = qUserAuth;
pKv->valueLen += sizeof(SUserAuthVersion);
@ -801,6 +807,10 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
if (!req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (NULL == req->info) {
code = terrno;
goto _return;
}
}
if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) {
@ -848,9 +858,17 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (NULL == req->info) {
taosMemoryFree(users);
return terrno;
}
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(users);
return code;
}
return TSDB_CODE_SUCCESS;
}
@ -894,9 +912,17 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (NULL == req->info) {
taosMemoryFree(dbs);
return terrno;
}
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(dbs);
return code;
}
return TSDB_CODE_SUCCESS;
}
@ -934,9 +960,17 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (NULL == req->info) {
taosMemoryFree(stbs);
return terrno;
}
}
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(stbs);
return code;
}
return TSDB_CODE_SUCCESS;
}
@ -947,12 +981,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
int32_t code = 0;
SDynViewVersion *pDynViewVer = NULL;
code = catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(views);
taosMemoryFree(pDynViewVer);
return code;
}
TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer));
if (viewNum <= 0) {
taosMemoryFree(views);
@ -971,6 +1000,9 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (NULL == req->info) {
TSC_ERR_JRET(terrno);
}
}
SKv kv = {
@ -979,15 +1011,18 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
.value = pDynViewVer,
};
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
kv.key = HEARTBEAT_KEY_VIEWINFO;
kv.valueLen = sizeof(SViewVersion) * viewNum;
kv.value = views;
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(views);
taosMemoryFree(pDynViewVer);
return code;
}
int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) {
@ -1017,22 +1052,30 @@ int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
if (!pReq->info) {
pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (!pReq->info) {
taosMemoryFree(tsmas);
return terrno;
}
}
SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas};
taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(tsmas);
return code;
}
return TSDB_CODE_SUCCESS;
}
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL != pApp) {
memcpy(&req->app, pApp, sizeof(*pApp));
(void)memcpy(&req->app, pApp, sizeof(*pApp));
} else {
memset(&req->app.summary, 0, sizeof(req->app.summary));
(void)memset(&req->app.summary, 0, sizeof(req->app.summary));
req->app.pid = taosGetPId();
req->app.appId = clientHbMgr.appId;
taosGetAppName(req->app.name, NULL);
TSC_ERR_RET(taosGetAppName(req->app.name, NULL));
}
return TSDB_CODE_SUCCESS;
@ -1043,7 +1086,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
SHbParam *hbParam = (SHbParam *)param;
SCatalog *pCatalog = NULL;
hbGetQueryBasicInfo(connKey, req);
code = hbGetQueryBasicInfo(connKey, req);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("hbGetQueryBasicInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
if (hbParam->reqCnt == 0) {
code = catalogGetHandle(hbParam->clusterId, &pCatalog);
@ -1052,20 +1099,30 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
return code;
}
hbGetAppInfo(hbParam->clusterId, req);
code = hbGetAppInfo(hbParam->clusterId, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("getAppInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbGetExpiredUserInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
}
// invoke after hbGetExpiredUserInfo
if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
code = hbGetUserAuthInfo(connKey, hbParam, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbGetUserAuthInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
@ -1073,23 +1130,34 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbGetExpiredDBInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbGetExpiredStbInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
#ifdef TD_ENTERPRISE
code = hbGetExpiredViewInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbGetExpiredViewInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
#endif
code = hbGetExpiredTSMAInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbGetExpiredTSMAInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
} else {
hbGetAppInfo(hbParam->clusterId, req);
code = hbGetAppInfo(hbParam->clusterId, req);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbGetAppInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
}
++hbParam->reqCnt; // success to get catalog info
@ -1106,17 +1174,16 @@ static FORCE_INLINE void hbMgrInitHandle() {
clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle;
}
SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
*pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
if (!pBatchReq->reqs) {
tFreeClientHbBatchReq(pBatchReq);
return NULL;
(*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
if (!(*pBatchReq)->reqs) {
tFreeClientHbBatchReq(*pBatchReq);
return terrno;
}
int64_t maxIpWhiteVer = 0;
@ -1132,7 +1199,11 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
continue;
}
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
if (NULL == pOneReq) {
releaseTscObj(connKey->tscRid);
continue;
}
switch (connKey->connType) {
case CONN_TYPE__QUERY: {
@ -1159,9 +1230,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
releaseTscObj(connKey->tscRid);
}
pBatchReq->ipWhiteList = maxIpWhiteVer;
(*pBatchReq)->ipWhiteList = maxIpWhiteVer;
return pBatchReq;
return TSDB_CODE_SUCCESS;
}
void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
@ -1180,11 +1251,12 @@ void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
int32_t hbGatherAppInfo(void) {
SAppHbReq req = {0};
int32_t code = TSDB_CODE_SUCCESS;
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
if (sz > 0) {
req.pid = taosGetPId();
req.appId = clientHbMgr.appId;
taosGetAppName(req.name, NULL);
TSC_ERR_RET(taosGetAppName(req.name, NULL));
}
taosHashClear(clientHbMgr.appSummary);
@ -1196,9 +1268,9 @@ int32_t hbGatherAppInfo(void) {
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) {
memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
(void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
req.startTime = pAppHbMgr->startTime;
taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req));
TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)));
} else {
if (pAppHbMgr->startTime < pApp->startTime) {
pApp->startTime = pAppHbMgr->startTime;
@ -1223,14 +1295,24 @@ static void *hbThreadFunc(void *param) {
break;
}
taosThreadMutexLock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) {
tscError("taosThreadMutexLock failed");
return NULL;
}
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
if (sz > 0) {
hbGatherAppInfo();
if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) {
tscError("hbGatherAppInfo failed");
return NULL;
}
if (sz > 1 && !clientHbMgr.appHbHash) {
clientHbMgr.appHbHash =
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
if (NULL == clientHbMgr.appHbHash) {
tscError("taosHashInit failed");
return NULL;
}
}
taosHashClear(clientHbMgr.appHbHash);
}
@ -1245,8 +1327,10 @@ static void *hbThreadFunc(void *param) {
if (connCnt == 0) {
continue;
}
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
if (pReq == NULL || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
SClientHbBatchReq *pReq = NULL;
int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq);
if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
tFreeClientHbBatchReq(pReq);
continue;
}
@ -1288,8 +1372,10 @@ static void *hbThreadFunc(void *param) {
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
taosThreadMutexUnlock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) {
tscError("taosThreadMutexLock failed");
return NULL;
}
taosMsleep(HEARTBEAT_INTERVAL);
}
taosHashCleanup(clientHbMgr.appHbHash);
@ -1297,16 +1383,24 @@ static void *hbThreadFunc(void *param) {
}
static int32_t hbCreateThread() {
int32_t code = TSDB_CODE_SUCCESS;
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
TSC_ERR_RET(terrno);
}
taosThreadAttrDestroy(&thAttr);
return 0;
(void)taosThreadAttrDestroy(&thAttr);
_return:
if (code) {
terrno = TAOS_SYSTEM_ERROR(errno);
TSC_ERR_RET(terrno);
}
return TSDB_CODE_SUCCESS;
}
static void hbStopThread() {
@ -1318,54 +1412,66 @@ static void hbStopThread() {
return;
}
int32_t code = TSDB_CODE_SUCCESS;
// thread quit mode kill or inner exit from self-thread
if (clientHbMgr.quitByKill) {
taosThreadKill(clientHbMgr.thread, 0);
code = taosThreadKill(clientHbMgr.thread, 0);
if (TSDB_CODE_SUCCESS != code) {
tscError("taosThreadKill failed since %s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
} else {
taosThreadJoin(clientHbMgr.thread, NULL);
code = taosThreadJoin(clientHbMgr.thread, NULL);
if (TSDB_CODE_SUCCESS != code) {
tscError("taosThreadJoin failed since %s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
}
tscDebug("hb thread stopped");
}
SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
if (hbMgrInit() != 0) {
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return NULL;
}
SAppHbMgr *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) {
int32_t code = TSDB_CODE_SUCCESS;
TSC_ERR_RET(hbMgrInit());
*pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr));
if (*pAppHbMgr == NULL) {
TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
// init stat
pAppHbMgr->startTime = taosGetTimestampMs();
pAppHbMgr->connKeyCnt = 0;
pAppHbMgr->connHbFlag = 0;
pAppHbMgr->reportCnt = 0;
pAppHbMgr->reportBytes = 0;
pAppHbMgr->key = taosStrdup(key);
(*pAppHbMgr)->startTime = taosGetTimestampMs();
(*pAppHbMgr)->connKeyCnt = 0;
(*pAppHbMgr)->connHbFlag = 0;
(*pAppHbMgr)->reportCnt = 0;
(*pAppHbMgr)->reportBytes = 0;
(*pAppHbMgr)->key = taosStrdup(key);
if ((*pAppHbMgr)->key == NULL) {
TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
// init app info
pAppHbMgr->pAppInstInfo = pAppInstInfo;
(*pAppHbMgr)->pAppInstInfo = pAppInstInfo;
// init hash info
pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
(*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (pAppHbMgr->activeInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pAppHbMgr);
return NULL;
if ((*pAppHbMgr)->activeInfo == NULL) {
TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
// taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
taosThreadMutexLock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
pAppHbMgr->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
taosThreadMutexUnlock(&clientHbMgr.lock);
TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
goto _return;
}
(*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
return pAppHbMgr;
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(*pAppHbMgr);
return code;
}
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
@ -1383,7 +1489,11 @@ void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
}
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
taosThreadMutexLock(&clientHbMgr.lock);
int32_t code = TSDB_CODE_SUCCESS;
code = taosThreadMutexLock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int32_t i = 0; i < mgrSize; ++i) {
SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
@ -1394,7 +1504,10 @@ void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
break;
}
}
taosThreadMutexUnlock(&clientHbMgr.lock);
code = taosThreadMutexUnlock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
}
void appHbMgrCleanup(void) {
@ -1406,7 +1519,7 @@ void appHbMgrCleanup(void) {
}
}
int hbMgrInit() {
int32_t hbMgrInit() {
// init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0;
@ -1415,8 +1528,13 @@ int hbMgrInit() {
tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId);
clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == clientHbMgr.appSummary) {
uError("hbMgrInit:taosHashInit error") return terrno;
}
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
if (NULL == clientHbMgr.appHbMgrs) {
uError("hbMgrInit:taosArrayInit error") return terrno;
}
TdThreadMutexAttr attr = {0};
int ret = taosThreadMutexAttrInit(&attr);
@ -1443,7 +1561,10 @@ int hbMgrInit() {
hbMgrInitHandle();
// init backgroud thread
hbCreateThread();
ret = hbCreateThread();
if (ret != 0) {
uError("hbMgrInit:hbCreateThread error") return ret;
}
return 0;
}
@ -1455,14 +1576,20 @@ void hbMgrCleanUp() {
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return;
taosThreadMutexLock(&clientHbMgr.lock);
int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
appHbMgrCleanup();
taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = NULL;
taosThreadMutexUnlock(&clientHbMgr.lock);
code = taosThreadMutexUnlock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
}
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
// init hash in activeinfo
void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
@ -1473,13 +1600,13 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust
hbReq.clusterId = clusterId;
// hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)));
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
(void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
return 0;
}
int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) {
int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) {
SClientHbKey connKey = {
.tscRid = tscRefId,
.connType = connType,
@ -1498,7 +1625,10 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
}
void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
taosThreadMutexLock(&clientHbMgr.lock);
int32_t code = taosThreadMutexLock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
if (pAppHbMgr) {
SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
@ -1506,10 +1636,13 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
tFreeClientHbReq(pReq);
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRelease(pAppHbMgr->activeInfo, pReq);
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
(void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}
}
taosThreadMutexUnlock(&clientHbMgr.lock);
code = taosThreadMutexUnlock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
}
}
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner

View File

@ -63,7 +63,7 @@ bool chkRequestKilled(void* param) {
killed = true;
}
releaseRequest((int64_t)param);
(void)releaseRequest((int64_t)param);
return killed;
}
@ -124,7 +124,8 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass
SAppInstInfo** pInst = NULL;
int32_t code = taosThreadMutexLock(&appInfo.mutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock app info, code:%s", tstrerror(code));
tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
TSC_ERR_RET(code);
}
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
@ -132,33 +133,28 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass
if (pInst == NULL) {
p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
if (NULL == p) {
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key);
TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
p->mgmtEp = epSet;
taosThreadMutexInit(&p->qnodeMutex, NULL);
code = taosThreadMutexInit(&p->qnodeMutex, NULL);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(p);
TSC_ERR_JRET(code);
}
code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
if (TSDB_CODE_SUCCESS != code) {
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key);
taosMemoryFree(p);
TSC_ERR_RET(code);
TSC_ERR_JRET(code);
}
p->pAppHbMgr = appHbMgrInit(p, key);
if (NULL == p->pAppHbMgr) {
code = appHbMgrInit(p, key, &p->pAppHbMgr);
if (TSDB_CODE_SUCCESS != code) {
destroyAppInst(&p);
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key);
// TODO(smj) : change this to right code.
TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
TSC_ERR_JRET(code);
}
code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
destroyAppInst(&p);
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(key);
TSC_ERR_RET(code);
TSC_ERR_JRET(code);
}
p->instKey = key;
key = NULL;
@ -171,9 +167,12 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass
atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
}
_return:
code = taosThreadMutexUnlock(&appInfo.mutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock app info, code:%s", tstrerror(code));
tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
return code;
}
taosMemoryFreeClear(key);
@ -192,7 +191,7 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass
void freeQueryParam(SSyncQueryParam* param) {
if (param == NULL) return;
tsem_destroy(&param->sem);
(void)tsem_destroy(&param->sem);
taosMemoryFree(param);
}
@ -212,7 +211,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_OUT_OF_MEMORY;
}
strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
(void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
(*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql;
@ -289,7 +288,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
code = qParseSql(&cxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
if ((*pQuery)->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
}
}
@ -332,9 +331,9 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg));
tsem_wait(&pRequest->body.rspSem);
(void)tsem_wait(&pRequest->body.rspSem);
return TSDB_CODE_SUCCESS;
}
@ -409,7 +408,11 @@ int compareQueryNodeLoad(const void* elem1, const void* elem2) {
}
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
taosThreadMutexLock(&pInfo->qnodeMutex);
int32_t code = taosThreadMutexLock(&pInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
return code;
}
if (pInfo->pQnodeList) {
taosArrayDestroy(pInfo->pQnodeList);
pInfo->pQnodeList = NULL;
@ -422,41 +425,64 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
taosArrayGetSize(pInfo->pQnodeList));
}
taosThreadMutexUnlock(&pInfo->qnodeMutex);
code = taosThreadMutexUnlock(&pInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
return code;
}
return TSDB_CODE_SUCCESS;
}
bool qnodeRequired(SRequestObj* pRequest) {
int32_t qnodeRequired(SRequestObj* pRequest, bool *required) {
if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
return false;
*required = false;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
bool required = false;
*required = false;
taosThreadMutexLock(&pInfo->qnodeMutex);
required = (NULL == pInfo->pQnodeList);
taosThreadMutexUnlock(&pInfo->qnodeMutex);
return required;
code = taosThreadMutexLock(&pInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
return code;
}
*required = (NULL == pInfo->pQnodeList);
code = taosThreadMutexUnlock(&pInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
return code;
}
return TSDB_CODE_SUCCESS;
}
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
int32_t code = 0;
taosThreadMutexLock(&pInfo->qnodeMutex);
code = taosThreadMutexLock(&pInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
return code;
}
if (pInfo->pQnodeList) {
*pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
}
taosThreadMutexUnlock(&pInfo->qnodeMutex);
code = taosThreadMutexUnlock(&pInfo->qnodeMutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
return code;
}
if (NULL == *pNodeList) {
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) {
*pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
if (NULL == pNodeList) {
TSC_ERR_RET(terrno);
}
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
@ -489,10 +515,10 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
return qCreateQueryPlan(&cxt, pPlan, pNodeList);
}
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
return;
return TSDB_CODE_INVALID_PARA;
}
pResInfo->numOfCols = numOfCols;
@ -504,9 +530,12 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
if (NULL == pResInfo->fields || NULL == pResInfo->userFields) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (numOfCols != pResInfo->numOfCols) {
tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
return;
return TSDB_CODE_FAILED;
}
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
@ -526,6 +555,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
}
return TSDB_CODE_SUCCESS;
}
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
@ -539,11 +569,17 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
if (NULL == nodeList) {
return terrno;
}
char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
int32_t dbNum = taosArrayGetSize(pDbVgList);
for (int32_t i = 0; i < dbNum; ++i) {
SArray* pVg = taosArrayGetP(pDbVgList, i);
if (NULL == pVg) {
continue;
}
int32_t vgNum = taosArrayGetSize(pVg);
if (vgNum <= 0) {
continue;
@ -551,11 +587,18 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
for (int32_t j = 0; j < vgNum; ++j) {
SVgroupInfo* pInfo = taosArrayGet(pVg, j);
if (NULL == pInfo) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_RANGE;
}
SQueryNodeLoad load = {0};
load.addr.nodeId = pInfo->vgId;
load.addr.epSet = pInfo->epSet;
taosArrayPush(nodeList, &load);
if (NULL == taosArrayPush(nodeList, &load)) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
}
@ -572,7 +615,14 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
}
void* pData = taosArrayGet(pMnodeList, 0);
taosArrayAddBatch(nodeList, pData, mnodeNum);
if (NULL == pData) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_RANGE;
}
if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_MEMORY;
}
tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
@ -585,11 +635,21 @@ _return:
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
if (NULL == nodeList) {
return terrno;
}
int32_t qNodeNum = taosArrayGetSize(pQnodeList);
if (qNodeNum > 0) {
void* pData = taosArrayGet(pQnodeList, 0);
taosArrayAddBatch(nodeList, pData, qNodeNum);
if (NULL == pData) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_RANGE;
}
if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_MEMORY;
}
tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
goto _return;
}
@ -601,7 +661,14 @@ int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
}
void* pData = taosArrayGet(pMnodeList, 0);
taosArrayAddBatch(nodeList, pData, mnodeNum);
if (NULL == pData) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_RANGE;
}
if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_MEMORY;
}
tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
@ -714,7 +781,7 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
}
case QUERY_POLICY_HYBRID:
case QUERY_POLICY_QNODE: {
getQnodeList(pRequest, &pQnodeList);
TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
break;
@ -1203,6 +1270,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
//TODO(smj) fuckthis
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
}
@ -1418,7 +1486,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
*pTscObj = NULL;
int32_t code = createTscObj(user, auth, db, connType, pAppInfo, (void**)&pTscObj);
int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -1440,8 +1508,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
int64_t transporterId = 0;
asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body);
code = tsem_wait(&pRequest->body.rspSem);
if (code != TSDB_CODE_SUCCESS) {
(void)tsem_wait(&pRequest->body.rspSem);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
tscError("failed to connect to server, reason: %s", errorMsg);
@ -1604,7 +1672,7 @@ int32_t doProcessMsgFromServer(void* param) {
}
}
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
(void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
if (pTscObj) {
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
@ -2749,7 +2817,7 @@ void doRequestCallback(SRequestObj* pRequest, int32_t code) {
SRequestObj* pReq = acquireRequest(this);
if (pReq != NULL) {
pReq->inCallback = false;
releaseRequest(this);
(void)releaseRequest(this);
}
}

View File

@ -1014,10 +1014,12 @@ void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta
}
if (pQuery->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
code = setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols);
setResPrecision(&pRequest->body.resInfo, pQuery->precision);
}
}
if (code == TSDB_CODE_SUCCESS) {
TSWAP(pRequest->dbList, (pQuery)->pDbList);
TSWAP(pRequest->tableList, (pQuery)->pTableList);
TSWAP(pRequest->targetTableList, (pQuery)->pTargetTableList);
@ -1202,7 +1204,7 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pWrapper->pCatalogReq->forceUpdate = updateMetaForce;
pWrapper->pCatalogReq->qNodeRequired = qnodeRequired(pRequest);
TSC_ERR_RET(qnodeRequired(pRequest, &pWrapper->pCatalogReq->qNodeRequired));
code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq);
}

View File

@ -1009,10 +1009,6 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower(uuid, buf);
int n = snprintf(uid, uidlen, "%.*s", (int)sizeof(buf), buf); // though less performance, much safer
if (n >= uidlen) {
// target buffer is too small
return -1;
}
return 0;
#else
int len = 0;