diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 95d08829ae..928afdaecf 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -310,7 +310,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); -void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); +int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); void doFreeReqResultInfo(SReqResultInfo* pResInfo); int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq); void syncCatalogFn(SMetaData* pResult, void* param, int32_t code); @@ -349,10 +349,10 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo, - void **p); + STscObj **p); void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); -int32_t releaseTscObj(int64_t rid); +void releaseTscObj(int64_t rid); void destroyAppInst(void* pAppInfo); uint64_t generateRequestId(); @@ -396,11 +396,11 @@ void taos_close_internal(void* taos); // --- heartbeat // global, called by mgmt -int hbMgrInit(); -void hbMgrCleanUp(); +int32_t hbMgrInit(); +void hbMgrCleanUp(); // cluster level -SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); +int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr); void appHbMgrCleanup(void); void hbRemoveAppHbMrg(SAppHbMgr** pAppHbMgr); void destroyAllRequests(SHashObj* pRequests); @@ -409,7 +409,7 @@ void stopAllRequests(SHashObj* pRequests); //SAppInstInfo* getAppInstInfo(const char* clusterKey); // conn level -int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); +int32_t hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); void hbDeregisterConn(STscObj* pTscObj, SClientHbKey connKey); typedef struct SSqlCallbackWrapper { @@ -428,7 +428,7 @@ void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta); int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView); int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); -bool qnodeRequired(SRequestObj* pRequest); +int32_t qnodeRequired(SRequestObj* pRequest, bool *required); void continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest); void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper); void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index db1de705ad..e65d9acd8b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -305,9 +305,7 @@ static void deregisterRequest(SRequestObj *pRequest) { } } - if (TSDB_CODE_SUCCESS != releaseTscObj(pTscObj->id)) { - tscError("failed to release STscObj"); - } + releaseTscObj(pTscObj->id); } // todo close the transporter properly @@ -419,14 +417,14 @@ void destroyAppInst(void *info) { int32_t code = taosThreadMutexLock(&appInfo.mutex); if (TSDB_CODE_SUCCESS != code) { - tscError("failed to lock app info, code:%s", tstrerror(code)); + tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); } hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); code = taosThreadMutexUnlock(&appInfo.mutex); if (TSDB_CODE_SUCCESS != code) { - tscError("failed to unlock app info, code:%s", tstrerror(code)); + tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); } taosMemoryFreeClear(pAppInfo->instKey); @@ -434,13 +432,13 @@ void destroyAppInst(void *info) { code = taosThreadMutexLock(&pAppInfo->qnodeMutex); if (TSDB_CODE_SUCCESS != code) { - tscError("failed to lock qnode mutex, code:%s", tstrerror(code)); + tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); } taosArrayDestroy(pAppInfo->pQnodeList); code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex); if (TSDB_CODE_SUCCESS != code) { - tscError("failed to unlock qnode mutex, code:%s", tstrerror(code)); + tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); } taosMemoryFree(pAppInfo); @@ -475,51 +473,55 @@ void destroyTscObj(void *pObj) { } int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo, - void **p) { - STscObj *pObj = (STscObj *)*p; - pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); - if (NULL == pObj) { + STscObj **pObj) { + *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); + if (NULL == *pObj) { return TSDB_CODE_OUT_OF_MEMORY; } - pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pObj->pRequests) { - taosMemoryFree(pObj); + (*pObj)->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == (*pObj)->pRequests) { + taosMemoryFree(*pObj); return TSDB_CODE_OUT_OF_MEMORY; } - pObj->connType = connType; - pObj->pAppInfo = pAppInfo; - pObj->appHbMgrIdx = pAppInfo->pAppHbMgr->idx; - tstrncpy(pObj->user, user, sizeof(pObj->user)); - (void)memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); + (*pObj)->connType = connType; + (*pObj)->pAppInfo = pAppInfo; + (*pObj)->appHbMgrIdx = pAppInfo->pAppHbMgr->idx; + tstrncpy((*pObj)->user, user, sizeof((*pObj)->user)); + (void)memcpy((*pObj)->pass, auth, TSDB_PASSWORD_LEN); if (db != NULL) { - tstrncpy(pObj->db, db, tListLen(pObj->db)); + tstrncpy((*pObj)->db, db, tListLen((*pObj)->db)); } - int32_t code = taosThreadMutexInit(&pObj->mutex, NULL); + int32_t code = taosThreadMutexInit(&(*pObj)->mutex, NULL); if (TSDB_CODE_SUCCESS != code) { return TAOS_SYSTEM_ERROR(code); } - pObj->id = taosAddRef(clientConnRefPool, pObj); - if (pObj->id < 0) { + (*pObj)->id = taosAddRef(clientConnRefPool, *pObj); + if ((*pObj)->id < 0) { tscError("failed to add object to clientConnRefPool"); code = terrno; - taosMemoryFree(pObj); + taosMemoryFree(*pObj); return code; } - (void)atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); + (void)atomic_add_fetch_64(&(*pObj)->pAppInfo->numOfConns, 1); - tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj); + tscDebug("connObj created, 0x%" PRIx64 ",p:%p", (*pObj)->id, *pObj); return code; } STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); } -int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } +void releaseTscObj(int64_t rid) { + int32_t code = taosReleaseRef(clientConnRefPool, rid); + if (TSDB_CODE_SUCCESS != code) { + tscWarn("failed to release TscObj, rid:%lld, code:%s", rid, tstrerror(code)); + } +} int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) { int32_t code = TSDB_CODE_SUCCESS; @@ -535,9 +537,7 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj } SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); if (interParam == NULL) { - if (TSDB_CODE_SUCCESS != releaseTscObj(connId)) { - tscError("failed to release TscObj"); - } + releaseTscObj(connId); code = TSDB_CODE_OUT_OF_MEMORY; goto _return; } @@ -857,20 +857,27 @@ static void *tscCrashReportThreadFp(void *param) { int32_t tscCrashReportInit() { if (!tsEnableCrashReport) { - return 0; + return TSDB_CODE_SUCCESS; } - + int32_t code = TSDB_CODE_SUCCESS; TdThreadAttr thAttr; - TSC_ERR_RET(taosThreadAttrInit(&thAttr)); - TSC_ERR_RET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); + TSC_ERR_JRET(taosThreadAttrInit(&thAttr)); + TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); TdThread crashReportThread; if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) { tscError("failed to create crashReport thread since %s", strerror(errno)); - return -1; + terrno = TAOS_SYSTEM_ERROR(errno); + TSC_ERR_RET(errno); } - TSC_ERR_RET(taosThreadAttrDestroy(&thAttr)); - return 0; + (void)taosThreadAttrDestroy(&thAttr); +_return: + if (code) { + terrno = TAOS_SYSTEM_ERROR(errno); + TSC_ERR_RET(terrno); + } + + return TSDB_CODE_SUCCESS; } void tscStopCrashReport() { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 6833adb3bd..77a2f3b036 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -99,9 +99,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat } } if (!pRsp) { - if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) { - tscError("failed to release tscObj"); - } + releaseTscObj(pReq->connKey.tscRid); taosHashCancelIterate(hbMgr->activeInfo, pReq); break; } @@ -116,9 +114,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat } } } - if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) { - tscError("failed to release tscObj"); - } + releaseTscObj(pReq->connKey.tscRid); continue; } @@ -163,9 +159,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user, oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id); } - if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) { - tscError("failed to release tscObj"); - } + releaseTscObj(pReq->connKey.tscRid); } } return 0; @@ -529,9 +523,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid); } else { taos_stop_query((TAOS_RES *)pRequest); - if (TSDB_CODE_SUCCESS != releaseRequest(pRsp->query->killRid)) { - tscWarn("release request failed"); - } + (void)releaseRequest(pRsp->query->killRid); } } @@ -545,9 +537,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } } - if (TSDB_CODE_SUCCESS != releaseTscObj(pRsp->connKey.tscRid)) { - tscWarn("release tscobj failed"); - } + releaseTscObj(pRsp->connKey.tscRid); } } @@ -571,7 +561,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return TSDB_CODE_SUCCESS; } -//TODO(smj) static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { if (0 == atomic_load_8(&clientHbMgr.inited)) { goto _return; @@ -581,8 +570,11 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { int32_t idx = *(int32_t *)param; SClientHbBatchRsp pRsp = {0}; if (TSDB_CODE_SUCCESS == code) { - tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); - + if (TSDB_CODE_SUCCESS != tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp)) { + code = terrno; + tscError("deserialize hb rsp failed"); + goto _return; + } int32_t now = taosGetTimestampSec(); int32_t delta = abs(now - pRsp.svrTimestamp); if (delta > timestampDeltaLimit) { @@ -593,27 +585,25 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { int32_t rspNum = taosArrayGetSize(pRsp.rsps); - taosThreadMutexLock(&clientHbMgr.lock); + code = taosThreadMutexLock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("lock failed"); + code = TAOS_SYSTEM_ERROR(code); + goto _return; + } SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, idx); if (pAppHbMgr == NULL) { - taosThreadMutexUnlock(&clientHbMgr.lock); tscError("appHbMgr not exist, idx:%d", idx); - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - tFreeClientHbBatchRsp(&pRsp); - return -1; + code = TSDB_CODE_OUT_OF_RANGE; + goto _returnunlock; } SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo; if (code != 0) { pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1; tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), pInst->onlineDnodes, pInst->totalDnodes); - taosThreadMutexUnlock(&clientHbMgr.lock); - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - tFreeClientHbBatchRsp(&pRsp); - return -1; + goto _returnunlock; } pInst->monitorParas = pRsp.monitorParas; @@ -624,22 +614,30 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); } else { - atomic_add_fetch_32(&emptyRspNum, 1); + (void)atomic_add_fetch_32(&emptyRspNum, 1); } for (int32_t i = 0; i < rspNum; ++i) { SClientHbRsp *rsp = taosArrayGet(pRsp.rsps, i); + if (NULL == rsp) { + tscError("invalid hb rsp, idx:%d", i); + break; + } code = (*clientHbMgr.rspHandle[rsp->connKey.connType])(pAppHbMgr, rsp); if (code) { break; } } - taosThreadMutexUnlock(&clientHbMgr.lock); - - tFreeClientHbBatchRsp(&pRsp); +_returnunlock: + code = taosThreadMutexUnlock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("unlock failed"); + code = TAOS_SYSTEM_ERROR(code); + } _return: + tFreeClientHbBatchRsp(&pRsp); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); return code; @@ -660,7 +658,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { } if (pRequest->killed || 0 == pRequest->body.queryJob) { - releaseRequest(*rid); + (void)releaseRequest(*rid); pIter = taosHashIterate(pObj->pRequests, pIter); continue; } @@ -672,14 +670,19 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { desc.reqRid = pRequest->self; desc.stableQuery = pRequest->stableQuery; desc.isSubQuery = pRequest->isSubReq; - taosGetFqdn(desc.fqdn); + code = taosGetFqdn(desc.fqdn); + if (TSDB_CODE_SUCCESS != code) { + (void)releaseRequest(*rid); + tscError("get fqdn failed"); + return TSDB_CODE_FAILED; + } desc.subPlanNum = pRequest->body.subplanNum; if (desc.subPlanNum) { desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc)); if (NULL == desc.subDesc) { - releaseRequest(*rid); - return TSDB_CODE_OUT_OF_MEMORY; + (void)releaseRequest(*rid); + return terrno; } code = schedulerGetTasksStatus(pRequest->body.queryJob, desc.subDesc); @@ -692,20 +695,23 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { desc.subDesc = NULL; } - releaseRequest(*rid); - taosArrayPush(hbBasic->queryDesc, &desc); + (void)releaseRequest(*rid); + if (NULL == taosArrayPush(hbBasic->queryDesc, &desc)) { + taosArrayDestroy(desc.subDesc); + return TSDB_CODE_OUT_OF_MEMORY; + } pIter = taosHashIterate(pObj->pRequests, pIter); } - return TSDB_CODE_SUCCESS; + return code; } int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); if (NULL == pTscObj) { tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); - return TSDB_CODE_APP_ERROR; + return terrno; } SQueryHbReqBasic *hbBasic = (SQueryHbReqBasic *)taosMemoryCalloc(1, sizeof(SQueryHbReqBasic)); @@ -730,7 +736,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { tscWarn("taosArrayInit %d queryDesc failed", numOfQueries); releaseTscObj(connKey->tscRid); taosMemoryFree(hbBasic); - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); @@ -753,7 +759,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); if (!pTscObj) { tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); - return TSDB_CODE_APP_ERROR; + return terrno; } int32_t code = 0; @@ -775,7 +781,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient SUserAuthVersion *qUserAuth = (SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion)); if (qUserAuth) { - strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN); + (void)strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN); (qUserAuth + userNum)->version = htonl(-1); // force get userAuthInfo pKv->value = qUserAuth; pKv->valueLen += sizeof(SUserAuthVersion); @@ -801,6 +807,10 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient if (!req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + if (NULL == req->info) { + code = terrno; + goto _return; + } } if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) != 0) { @@ -848,9 +858,17 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S if (NULL == req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + if (NULL == req->info) { + taosMemoryFree(users); + return terrno; + } } - taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(users); + return code; + } return TSDB_CODE_SUCCESS; } @@ -894,9 +912,17 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl if (NULL == req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + if (NULL == req->info) { + taosMemoryFree(dbs); + return terrno; + } } - taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(dbs); + return code; + } return TSDB_CODE_SUCCESS; } @@ -934,9 +960,17 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC if (NULL == req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + if (NULL == req->info) { + taosMemoryFree(stbs); + return terrno; + } } - taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + code = taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(stbs); + return code; + } return TSDB_CODE_SUCCESS; } @@ -947,12 +981,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S int32_t code = 0; SDynViewVersion *pDynViewVer = NULL; - code = catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer); - if (TSDB_CODE_SUCCESS != code) { - taosMemoryFree(views); - taosMemoryFree(pDynViewVer); - return code; - } + TSC_ERR_JRET(catalogGetExpiredViews(pCatalog, &views, &viewNum, &pDynViewVer)); if (viewNum <= 0) { taosMemoryFree(views); @@ -971,6 +1000,9 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S if (NULL == req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + if (NULL == req->info) { + TSC_ERR_JRET(terrno); + } } SKv kv = { @@ -979,15 +1011,18 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S .value = pDynViewVer, }; - taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv))); kv.key = HEARTBEAT_KEY_VIEWINFO; kv.valueLen = sizeof(SViewVersion) * viewNum; kv.value = views; - taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); - + TSC_ERR_JRET(taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv))); return TSDB_CODE_SUCCESS; +_return: + taosMemoryFree(views); + taosMemoryFree(pDynViewVer); + return code; } int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *pReq) { @@ -1017,22 +1052,30 @@ int32_t hbGetExpiredTSMAInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S if (!pReq->info) { pReq->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + if (!pReq->info) { + taosMemoryFree(tsmas); + return terrno; + } } SKv kv = {.key = HEARTBEAT_KEY_TSMA, .valueLen = sizeof(STSMAVersion) * tsmaNum, .value = tsmas}; - taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(tsmas); + return code; + } return TSDB_CODE_SUCCESS; } int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL != pApp) { - memcpy(&req->app, pApp, sizeof(*pApp)); + (void)memcpy(&req->app, pApp, sizeof(*pApp)); } else { - memset(&req->app.summary, 0, sizeof(req->app.summary)); + (void)memset(&req->app.summary, 0, sizeof(req->app.summary)); req->app.pid = taosGetPId(); req->app.appId = clientHbMgr.appId; - taosGetAppName(req->app.name, NULL); + TSC_ERR_RET(taosGetAppName(req->app.name, NULL)); } return TSDB_CODE_SUCCESS; @@ -1043,7 +1086,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req SHbParam *hbParam = (SHbParam *)param; SCatalog *pCatalog = NULL; - hbGetQueryBasicInfo(connKey, req); + code = hbGetQueryBasicInfo(connKey, req); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("hbGetQueryBasicInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } if (hbParam->reqCnt == 0) { code = catalogGetHandle(hbParam->clusterId, &pCatalog); @@ -1052,20 +1099,30 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return code; } - hbGetAppInfo(hbParam->clusterId, req); + code = hbGetAppInfo(hbParam->clusterId, req); + if (TSDB_CODE_SUCCESS != code) { + tscWarn("getAppInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) { code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbGetExpiredUserInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } + code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0); + if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } - taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0); } // invoke after hbGetExpiredUserInfo if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) { code = hbGetUserAuthInfo(connKey, hbParam, req); if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbGetUserAuthInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1); @@ -1073,23 +1130,34 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req code = hbGetExpiredDBInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbGetExpiredDBInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } code = hbGetExpiredStbInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbGetExpiredStbInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } #ifdef TD_ENTERPRISE code = hbGetExpiredViewInfo(connKey, pCatalog, req); if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbGetExpiredViewInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); return code; } #endif code = hbGetExpiredTSMAInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbGetExpiredTSMAInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } } else { - hbGetAppInfo(hbParam->clusterId, req); + code = hbGetAppInfo(hbParam->clusterId, req); + if (TSDB_CODE_SUCCESS != code) { + tscWarn("hbGetAppInfo failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } } ++hbParam->reqCnt; // success to get catalog info @@ -1106,17 +1174,16 @@ static FORCE_INLINE void hbMgrInitHandle() { clientHbMgr.rspHandle[CONN_TYPE__TMQ] = hbMqHbRspHandle; } -SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { - SClientHbBatchReq *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq)); +int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) { + *pBatchReq = taosMemoryCalloc(1, sizeof(SClientHbBatchReq)); if (pBatchReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); - pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); - if (!pBatchReq->reqs) { - tFreeClientHbBatchReq(pBatchReq); - return NULL; + (*pBatchReq)->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + if (!(*pBatchReq)->reqs) { + tFreeClientHbBatchReq(*pBatchReq); + return terrno; } int64_t maxIpWhiteVer = 0; @@ -1132,7 +1199,11 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { continue; } - pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); + pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq); + if (NULL == pOneReq) { + releaseTscObj(connKey->tscRid); + continue; + } switch (connKey->connType) { case CONN_TYPE__QUERY: { @@ -1159,9 +1230,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { maxIpWhiteVer = TMAX(maxIpWhiteVer, ver); releaseTscObj(connKey->tscRid); } - pBatchReq->ipWhiteList = maxIpWhiteVer; + (*pBatchReq)->ipWhiteList = maxIpWhiteVer; - return pBatchReq; + return TSDB_CODE_SUCCESS; } void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); } @@ -1180,11 +1251,12 @@ void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) { int32_t hbGatherAppInfo(void) { SAppHbReq req = {0}; + int32_t code = TSDB_CODE_SUCCESS; int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); if (sz > 0) { req.pid = taosGetPId(); req.appId = clientHbMgr.appId; - taosGetAppName(req.name, NULL); + TSC_ERR_RET(taosGetAppName(req.name, NULL)); } taosHashClear(clientHbMgr.appSummary); @@ -1196,9 +1268,9 @@ int32_t hbGatherAppInfo(void) { int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL == pApp) { - memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); + (void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); req.startTime = pAppHbMgr->startTime; - taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req)); + TSC_ERR_RET(taosHashPut(clientHbMgr.appSummary, &clusterId, sizeof(clusterId), &req, sizeof(req))); } else { if (pAppHbMgr->startTime < pApp->startTime) { pApp->startTime = pAppHbMgr->startTime; @@ -1223,14 +1295,24 @@ static void *hbThreadFunc(void *param) { break; } - taosThreadMutexLock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != taosThreadMutexLock(&clientHbMgr.lock)) { + tscError("taosThreadMutexLock failed"); + return NULL; + } int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); if (sz > 0) { - hbGatherAppInfo(); + if (TSDB_CODE_SUCCESS != hbGatherAppInfo()) { + tscError("hbGatherAppInfo failed"); + return NULL; + } if (sz > 1 && !clientHbMgr.appHbHash) { clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + if (NULL == clientHbMgr.appHbHash) { + tscError("taosHashInit failed"); + return NULL; + } } taosHashClear(clientHbMgr.appHbHash); } @@ -1245,8 +1327,10 @@ static void *hbThreadFunc(void *param) { if (connCnt == 0) { continue; } - SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); - if (pReq == NULL || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) { + SClientHbBatchReq *pReq = NULL; + int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq); + if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) { + terrno = code ? code : TSDB_CODE_OUT_OF_RANGE; tFreeClientHbBatchReq(pReq); continue; } @@ -1288,8 +1372,10 @@ static void *hbThreadFunc(void *param) { atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } - taosThreadMutexUnlock(&clientHbMgr.lock); - + if (TSDB_CODE_SUCCESS != taosThreadMutexUnlock(&clientHbMgr.lock)) { + tscError("taosThreadMutexLock failed"); + return NULL; + } taosMsleep(HEARTBEAT_INTERVAL); } taosHashCleanup(clientHbMgr.appHbHash); @@ -1297,16 +1383,24 @@ static void *hbThreadFunc(void *param) { } static int32_t hbCreateThread() { + int32_t code = TSDB_CODE_SUCCESS; TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + TSC_ERR_JRET(taosThreadAttrInit(&thAttr)); + TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + TSC_ERR_RET(terrno); } - taosThreadAttrDestroy(&thAttr); - return 0; + (void)taosThreadAttrDestroy(&thAttr); +_return: + + if (code) { + terrno = TAOS_SYSTEM_ERROR(errno); + TSC_ERR_RET(terrno); + } + + return TSDB_CODE_SUCCESS; } static void hbStopThread() { @@ -1318,54 +1412,66 @@ static void hbStopThread() { return; } + int32_t code = TSDB_CODE_SUCCESS; // thread quit mode kill or inner exit from self-thread if (clientHbMgr.quitByKill) { - taosThreadKill(clientHbMgr.thread, 0); + code = taosThreadKill(clientHbMgr.thread, 0); + if (TSDB_CODE_SUCCESS != code) { + tscError("taosThreadKill failed since %s", tstrerror(TAOS_SYSTEM_ERROR(code))); + } } else { - taosThreadJoin(clientHbMgr.thread, NULL); + code = taosThreadJoin(clientHbMgr.thread, NULL); + if (TSDB_CODE_SUCCESS != code) { + tscError("taosThreadJoin failed since %s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + } } tscDebug("hb thread stopped"); } -SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { - if (hbMgrInit() != 0) { - terrno = TSDB_CODE_TSC_INTERNAL_ERROR; - return NULL; - } - SAppHbMgr *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr)); - if (pAppHbMgr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; +int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMgr) { + int32_t code = TSDB_CODE_SUCCESS; + TSC_ERR_RET(hbMgrInit()); + *pAppHbMgr = taosMemoryMalloc(sizeof(SAppHbMgr)); + if (*pAppHbMgr == NULL) { + TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } // init stat - pAppHbMgr->startTime = taosGetTimestampMs(); - pAppHbMgr->connKeyCnt = 0; - pAppHbMgr->connHbFlag = 0; - pAppHbMgr->reportCnt = 0; - pAppHbMgr->reportBytes = 0; - pAppHbMgr->key = taosStrdup(key); + (*pAppHbMgr)->startTime = taosGetTimestampMs(); + (*pAppHbMgr)->connKeyCnt = 0; + (*pAppHbMgr)->connHbFlag = 0; + (*pAppHbMgr)->reportCnt = 0; + (*pAppHbMgr)->reportBytes = 0; + (*pAppHbMgr)->key = taosStrdup(key); + if ((*pAppHbMgr)->key == NULL) { + TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } // init app info - pAppHbMgr->pAppInstInfo = pAppInstInfo; + (*pAppHbMgr)->pAppInstInfo = pAppInstInfo; // init hash info - pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + (*pAppHbMgr)->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - if (pAppHbMgr->activeInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pAppHbMgr); - return NULL; + if ((*pAppHbMgr)->activeInfo == NULL) { + TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq); - taosThreadMutexLock(&clientHbMgr.lock); - taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); - pAppHbMgr->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1; - taosThreadMutexUnlock(&clientHbMgr.lock); + TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock)); + if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + (void)taosThreadMutexUnlock(&clientHbMgr.lock); + goto _return; + } + (*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1; + TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock)); - return pAppHbMgr; + return TSDB_CODE_SUCCESS; +_return: + taosMemoryFree(*pAppHbMgr); + return code; } void hbFreeAppHbMgr(SAppHbMgr *pTarget) { @@ -1383,7 +1489,11 @@ void hbFreeAppHbMgr(SAppHbMgr *pTarget) { } void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) { - taosThreadMutexLock(&clientHbMgr.lock); + int32_t code = TSDB_CODE_SUCCESS; + code = taosThreadMutexLock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + } int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int32_t i = 0; i < mgrSize; ++i) { SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i); @@ -1394,7 +1504,10 @@ void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) { break; } } - taosThreadMutexUnlock(&clientHbMgr.lock); + code = taosThreadMutexUnlock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + } } void appHbMgrCleanup(void) { @@ -1406,7 +1519,7 @@ void appHbMgrCleanup(void) { } } -int hbMgrInit() { +int32_t hbMgrInit() { // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -1415,8 +1528,13 @@ int hbMgrInit() { tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId); clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (NULL == clientHbMgr.appSummary) { + uError("hbMgrInit:taosHashInit error") return terrno; + } clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); - + if (NULL == clientHbMgr.appHbMgrs) { + uError("hbMgrInit:taosArrayInit error") return terrno; + } TdThreadMutexAttr attr = {0}; int ret = taosThreadMutexAttrInit(&attr); @@ -1443,7 +1561,10 @@ int hbMgrInit() { hbMgrInitHandle(); // init backgroud thread - hbCreateThread(); + ret = hbCreateThread(); + if (ret != 0) { + uError("hbMgrInit:hbCreateThread error") return ret; + } return 0; } @@ -1455,14 +1576,20 @@ void hbMgrCleanUp() { int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; - taosThreadMutexLock(&clientHbMgr.lock); + int32_t code = taosThreadMutexLock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + } appHbMgrCleanup(); taosArrayDestroy(clientHbMgr.appHbMgrs); clientHbMgr.appHbMgrs = NULL; - taosThreadMutexUnlock(&clientHbMgr.lock); + code = taosThreadMutexUnlock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + } } -int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) { +int32_t hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) { // init hash in activeinfo void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { @@ -1473,13 +1600,13 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust hbReq.clusterId = clusterId; // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + TSC_ERR_RET(taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq))); - atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); + (void)atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); return 0; } -int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) { +int32_t hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType) { SClientHbKey connKey = { .tscRid = tscRefId, .connType = connType, @@ -1498,7 +1625,10 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { - taosThreadMutexLock(&clientHbMgr.lock); + int32_t code = taosThreadMutexLock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + } SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); if (pAppHbMgr) { SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); @@ -1506,10 +1636,13 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { tFreeClientHbReq(pReq); taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); taosHashRelease(pAppHbMgr->activeInfo, pReq); - atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); + (void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } } - taosThreadMutexUnlock(&clientHbMgr.lock); + code = taosThreadMutexUnlock(&clientHbMgr.lock); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock clientHbMgr, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + } } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f5c7751ec2..42ed8ca9a7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -63,7 +63,7 @@ bool chkRequestKilled(void* param) { killed = true; } - releaseRequest((int64_t)param); + (void)releaseRequest((int64_t)param); return killed; } @@ -124,7 +124,8 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass SAppInstInfo** pInst = NULL; int32_t code = taosThreadMutexLock(&appInfo.mutex); if (TSDB_CODE_SUCCESS != code) { - tscError("failed to lock app info, code:%s", tstrerror(code)); + tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + TSC_ERR_RET(code); } pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); @@ -132,33 +133,28 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass if (pInst == NULL) { p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo)); if (NULL == p) { - taosThreadMutexUnlock(&appInfo.mutex); - taosMemoryFreeClear(key); - TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + TSC_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } p->mgmtEp = epSet; - taosThreadMutexInit(&p->qnodeMutex, NULL); + code = taosThreadMutexInit(&p->qnodeMutex, NULL); + if (TSDB_CODE_SUCCESS != code) { + taosMemoryFree(p); + TSC_ERR_JRET(code); + } code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter); if (TSDB_CODE_SUCCESS != code) { - taosThreadMutexUnlock(&appInfo.mutex); - taosMemoryFreeClear(key); taosMemoryFree(p); - TSC_ERR_RET(code); + TSC_ERR_JRET(code); } - p->pAppHbMgr = appHbMgrInit(p, key); - if (NULL == p->pAppHbMgr) { + code = appHbMgrInit(p, key, &p->pAppHbMgr); + if (TSDB_CODE_SUCCESS != code) { destroyAppInst(&p); - taosThreadMutexUnlock(&appInfo.mutex); - taosMemoryFreeClear(key); - // TODO(smj) : change this to right code. - TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + TSC_ERR_JRET(code); } code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); if (TSDB_CODE_SUCCESS != code) { destroyAppInst(&p); - taosThreadMutexUnlock(&appInfo.mutex); - taosMemoryFreeClear(key); - TSC_ERR_RET(code); + TSC_ERR_JRET(code); } p->instKey = key; key = NULL; @@ -171,9 +167,12 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0); } +_return: + code = taosThreadMutexUnlock(&appInfo.mutex); if (TSDB_CODE_SUCCESS != code) { - tscError("failed to unlock app info, code:%s", tstrerror(code)); + tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + return code; } taosMemoryFreeClear(key); @@ -192,7 +191,7 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass void freeQueryParam(SSyncQueryParam* param) { if (param == NULL) return; - tsem_destroy(¶m->sem); + (void)tsem_destroy(¶m->sem); taosMemoryFree(param); } @@ -212,7 +211,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_OUT_OF_MEMORY; } - strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen); + (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen); (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; (*pRequest)->validateOnly = validateSql; @@ -289,7 +288,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC code = qParseSql(&cxt, pQuery); if (TSDB_CODE_SUCCESS == code) { if ((*pQuery)->haveResultSet) { - setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); + code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision); } } @@ -332,9 +331,9 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); + TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg)); - tsem_wait(&pRequest->body.rspSem); + (void)tsem_wait(&pRequest->body.rspSem); return TSDB_CODE_SUCCESS; } @@ -409,7 +408,11 @@ int compareQueryNodeLoad(const void* elem1, const void* elem2) { } int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) { - taosThreadMutexLock(&pInfo->qnodeMutex); + int32_t code = taosThreadMutexLock(&pInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + return code; + } if (pInfo->pQnodeList) { taosArrayDestroy(pInfo->pQnodeList); pInfo->pQnodeList = NULL; @@ -422,41 +425,64 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) { tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId, taosArrayGetSize(pInfo->pQnodeList)); } - taosThreadMutexUnlock(&pInfo->qnodeMutex); + code = taosThreadMutexUnlock(&pInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + return code; + } return TSDB_CODE_SUCCESS; } -bool qnodeRequired(SRequestObj* pRequest) { +int32_t qnodeRequired(SRequestObj* pRequest, bool *required) { if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) { - return false; + *required = false; + return TSDB_CODE_SUCCESS; } + int32_t code = TSDB_CODE_SUCCESS; SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo; - bool required = false; + *required = false; - taosThreadMutexLock(&pInfo->qnodeMutex); - required = (NULL == pInfo->pQnodeList); - taosThreadMutexUnlock(&pInfo->qnodeMutex); - - return required; + code = taosThreadMutexLock(&pInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + return code; + } + *required = (NULL == pInfo->pQnodeList); + code = taosThreadMutexUnlock(&pInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + return code; + } + return TSDB_CODE_SUCCESS; } int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) { SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo; int32_t code = 0; - taosThreadMutexLock(&pInfo->qnodeMutex); + code = taosThreadMutexLock(&pInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + return code; + } if (pInfo->pQnodeList) { *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL); } - taosThreadMutexUnlock(&pInfo->qnodeMutex); - + code = taosThreadMutexUnlock(&pInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); + return code; + } if (NULL == *pNodeList) { SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (TSDB_CODE_SUCCESS == code) { *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad)); + if (NULL == pNodeList) { + TSC_ERR_RET(terrno); + } SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, @@ -489,10 +515,10 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra return qCreateQueryPlan(&cxt, pPlan, pNodeList); } -void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) { +int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) { if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) { tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0"); - return; + return TSDB_CODE_INVALID_PARA; } pResInfo->numOfCols = numOfCols; @@ -504,9 +530,12 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t } pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); + if (NULL == pResInfo->fields || NULL == pResInfo->userFields) { + return TSDB_CODE_OUT_OF_MEMORY; + } if (numOfCols != pResInfo->numOfCols) { tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols); - return; + return TSDB_CODE_FAILED; } for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { @@ -526,6 +555,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name)); } + return TSDB_CODE_SUCCESS; } void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { @@ -539,11 +569,17 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) { SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); + if (NULL == nodeList) { + return terrno; + } char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client"; int32_t dbNum = taosArrayGetSize(pDbVgList); for (int32_t i = 0; i < dbNum; ++i) { SArray* pVg = taosArrayGetP(pDbVgList, i); + if (NULL == pVg) { + continue; + } int32_t vgNum = taosArrayGetSize(pVg); if (vgNum <= 0) { continue; @@ -551,11 +587,18 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr for (int32_t j = 0; j < vgNum; ++j) { SVgroupInfo* pInfo = taosArrayGet(pVg, j); + if (NULL == pInfo) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_RANGE; + } SQueryNodeLoad load = {0}; load.addr.nodeId = pInfo->vgId; load.addr.epSet = pInfo->epSet; - taosArrayPush(nodeList, &load); + if (NULL == taosArrayPush(nodeList, &load)) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_MEMORY; + } } } @@ -572,7 +615,14 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr } void* pData = taosArrayGet(pMnodeList, 0); - taosArrayAddBatch(nodeList, pData, mnodeNum); + if (NULL == pData) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_RANGE; + } + if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_MEMORY; + } tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum); @@ -585,11 +635,21 @@ _return: int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) { SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); + if (NULL == nodeList) { + return terrno; + } int32_t qNodeNum = taosArrayGetSize(pQnodeList); if (qNodeNum > 0) { void* pData = taosArrayGet(pQnodeList, 0); - taosArrayAddBatch(nodeList, pData, qNodeNum); + if (NULL == pData) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_RANGE; + } + if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_MEMORY; + } tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum); goto _return; } @@ -601,7 +661,14 @@ int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr } void* pData = taosArrayGet(pMnodeList, 0); - taosArrayAddBatch(nodeList, pData, mnodeNum); + if (NULL == pData) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_RANGE; + } + if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) { + taosArrayDestroy(nodeList); + return TSDB_CODE_OUT_OF_MEMORY; + } tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum); @@ -714,7 +781,7 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* } case QUERY_POLICY_HYBRID: case QUERY_POLICY_QNODE: { - getQnodeList(pRequest, &pQnodeList); + TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList)); code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; @@ -1203,6 +1270,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { SArray* pNodeList = NULL; if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) { + //TODO(smj) fuckthis buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); } @@ -1418,7 +1486,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) { *pTscObj = NULL; - int32_t code = createTscObj(user, auth, db, connType, pAppInfo, (void**)&pTscObj); + int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -1440,8 +1508,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta int64_t transporterId = 0; asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body); - code = tsem_wait(&pRequest->body.rspSem); - if (code != TSDB_CODE_SUCCESS) { + (void)tsem_wait(&pRequest->body.rspSem); + if (pRequest->code != TSDB_CODE_SUCCESS) { const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); tscError("failed to connect to server, reason: %s", errorMsg); @@ -1604,7 +1672,7 @@ int32_t doProcessMsgFromServer(void* param) { } } - pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); + (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); if (pTscObj) { taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); @@ -2749,7 +2817,7 @@ void doRequestCallback(SRequestObj* pRequest, int32_t code) { SRequestObj* pReq = acquireRequest(this); if (pReq != NULL) { pReq->inCallback = false; - releaseRequest(this); + (void)releaseRequest(this); } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 2b2241b2f8..9594c45366 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1014,10 +1014,12 @@ void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta } if (pQuery->haveResultSet) { - setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); + code = setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); setResPrecision(&pRequest->body.resInfo, pQuery->precision); } + } + if (code == TSDB_CODE_SUCCESS) { TSWAP(pRequest->dbList, (pQuery)->pDbList); TSWAP(pRequest->tableList, (pQuery)->pTableList); TSWAP(pRequest->targetTableList, (pQuery)->pTargetTableList); @@ -1202,7 +1204,7 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p code = TSDB_CODE_OUT_OF_MEMORY; } else { pWrapper->pCatalogReq->forceUpdate = updateMetaForce; - pWrapper->pCatalogReq->qNodeRequired = qnodeRequired(pRequest); + TSC_ERR_RET(qnodeRequired(pRequest, &pWrapper->pCatalogReq->qNodeRequired)); code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq); } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index b0d9f076d9..aaaa9e2c2d 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -1009,10 +1009,6 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null uuid_unparse_lower(uuid, buf); int n = snprintf(uid, uidlen, "%.*s", (int)sizeof(buf), buf); // though less performance, much safer - if (n >= uidlen) { - // target buffer is too small - return -1; - } return 0; #else int len = 0;