enh:[TD-31088] Bug fix.

This commit is contained in:
sima 2024-07-25 11:12:27 +08:00
parent a1c323b690
commit d350adf69b
4 changed files with 32 additions and 37 deletions

View File

@ -482,7 +482,7 @@ int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t
(*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == (*pObj)->pRequests) { if (NULL == (*pObj)->pRequests) {
taosMemoryFree(*pObj); taosMemoryFree(*pObj);
return TSDB_CODE_OUT_OF_MEMORY; return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
} }
(*pObj)->connType = connType; (*pObj)->connType = connType;
@ -495,10 +495,9 @@ int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t
tstrncpy((*pObj)->db, db, tListLen((*pObj)->db)); tstrncpy((*pObj)->db, db, tListLen((*pObj)->db));
} }
int32_t code = taosThreadMutexInit(&(*pObj)->mutex, NULL); TSC_ERR_RET(taosThreadMutexInit(&(*pObj)->mutex, NULL));
if (TSDB_CODE_SUCCESS != code) {
return TAOS_SYSTEM_ERROR(code); int32_t code = TSDB_CODE_SUCCESS;
}
(*pObj)->id = taosAddRef(clientConnRefPool, *pObj); (*pObj)->id = taosAddRef(clientConnRefPool, *pObj);
if ((*pObj)->id < 0) { if ((*pObj)->id < 0) {

View File

@ -570,10 +570,9 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
int32_t idx = *(int32_t *)param; int32_t idx = *(int32_t *)param;
SClientHbBatchRsp pRsp = {0}; SClientHbBatchRsp pRsp = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS != tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp)) { code = tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
code = terrno; if (TSDB_CODE_SUCCESS != code) {
tscError("deserialize hb rsp failed"); tscError("deserialize hb rsp failed");
goto _return;
} }
int32_t now = taosGetTimestampSec(); int32_t now = taosGetTimestampSec();
int32_t delta = abs(now - pRsp.svrTimestamp); int32_t delta = abs(now - pRsp.svrTimestamp);
@ -585,25 +584,27 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
int32_t rspNum = taosArrayGetSize(pRsp.rsps); int32_t rspNum = taosArrayGetSize(pRsp.rsps);
code = taosThreadMutexLock(&clientHbMgr.lock); (void)taosThreadMutexLock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) {
tscError("lock failed");
code = TAOS_SYSTEM_ERROR(code);
goto _return;
}
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx);
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
tscError("appHbMgr not exist, idx:%d", idx); tscError("appHbMgr not exist, idx:%d", idx);
code = TSDB_CODE_OUT_OF_RANGE; taosMemoryFree(pMsg->pData);
goto _returnunlock; taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp);
return TSDB_CODE_OUT_OF_RANGE;
} }
SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo; SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo;
if (code != 0) { if (code != 0) {
pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1; pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1;
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes); tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes);
goto _returnunlock; (void)taosThreadMutexUnlock(&clientHbMgr.lock);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp);
return code;
} }
pInst->monitorParas = pRsp.monitorParas; pInst->monitorParas = pRsp.monitorParas;
@ -619,25 +620,17 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
for (int32_t i = 0; i < rspNum; ++i) { for (int32_t i = 0; i < rspNum; ++i) {
SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i);
if (NULL == rsp) {
tscError("invalid hb rsp, idx:%d", i);
break;
}
code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp); code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp);
if (code) { if (code) {
break; break;
} }
} }
_returnunlock: (void)taosThreadMutexUnlock(&clientHbMgr.lock);
code = taosThreadMutexUnlock(&clientHbMgr.lock);
if (TSDB_CODE_SUCCESS != code) { tFreeClientHbBatchRsp(&pRsp);
tscError("unlock failed");
code = TAOS_SYSTEM_ERROR(code);
}
_return: _return:
tFreeClientHbBatchRsp(&pRsp);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
return code; return code;
@ -1111,10 +1104,12 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
tscWarn("hbGetExpiredUserInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); tscWarn("hbGetExpiredUserInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code; return code;
} }
code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0); if (clientHbMgr.appHbHash) {
if (TSDB_CODE_SUCCESS != code) { code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); if (TSDB_CODE_SUCCESS != code) {
return code; tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
} }
} }
@ -1308,7 +1303,7 @@ static void *hbThreadFunc(void *param) {
} }
if (sz > 1 && !clientHbMgr.appHbHash) { if (sz > 1 && !clientHbMgr.appHbHash) {
clientHbMgr.appHbHash = clientHbMgr.appHbHash =
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
if (NULL == clientHbMgr.appHbHash) { if (NULL == clientHbMgr.appHbHash) {
tscError("taosHashInit failed"); tscError("taosHashInit failed");
return NULL; return NULL;
@ -1527,7 +1522,7 @@ int32_t hbMgrInit() {
clientHbMgr.appId = tGenIdPI64(); clientHbMgr.appId = tGenIdPI64();
tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId); tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId);
clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (NULL == clientHbMgr.appSummary) { if (NULL == clientHbMgr.appSummary) {
uError("hbMgrInit:taosHashInit error") return terrno; uError("hbMgrInit:taosHashInit error") return terrno;
} }

View File

@ -1569,15 +1569,16 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
tscError("failed to connect to server, reason: %s", errorMsg); tscError("failed to connect to server, reason: %s", errorMsg);
terrno = pRequest->code;
destroyRequest(pRequest); destroyRequest(pRequest);
taos_close_internal(*pTscObj); taos_close_internal(*pTscObj);
*pTscObj = NULL; *pTscObj = NULL;
return terrno;
} else { } else {
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, (*pTscObj)->id, tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, (*pTscObj)->id,
(*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId); (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
destroyRequest(pRequest); destroyRequest(pRequest);
} }
return code; return code;
} }

View File

@ -897,7 +897,7 @@ int taos_get_current_db(TAOS *taos, char *database, int len, int *required) {
} }
int code = TSDB_CODE_SUCCESS; int code = TSDB_CODE_SUCCESS;
TSC_ERR_JRET(taosThreadMutexLock(&pTscObj->mutex)); (void)taosThreadMutexLock(&pTscObj->mutex);
if (database == NULL || len <= 0) { if (database == NULL || len <= 0) {
if (required != NULL) *required = strlen(pTscObj->db) + 1; if (required != NULL) *required = strlen(pTscObj->db) + 1;
TSC_ERR_JRET(TSDB_CODE_INVALID_PARA); TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
@ -910,7 +910,7 @@ int taos_get_current_db(TAOS *taos, char *database, int len, int *required) {
code = 0; code = 0;
} }
_return: _return:
code = taosThreadMutexUnlock(&pTscObj->mutex); (void)taosThreadMutexUnlock(&pTscObj->mutex);
releaseTscObj(*(int64_t *)taos); releaseTscObj(*(int64_t *)taos);
return code; return code;
} }