enh:[TD-31070] Handling return value of clientMain.c

This commit is contained in:
sima 2024-07-24 16:28:45 +08:00
parent 47be646de4
commit a1c323b690
1 changed files with 82 additions and 46 deletions

View File

@ -69,15 +69,21 @@ void taos_cleanup(void) {
qCleanupKeywordsTable();
nodesDestroyAllocatorSet();
cleanupTaskQueue();
if (TSDB_CODE_SUCCESS != cleanupTaskQueue()) {
tscWarn("failed to cleanup task queue");
}
int32_t id = clientReqRefPool;
clientReqRefPool = -1;
taosCloseRef(id);
if (TSDB_CODE_SUCCESS != taosCloseRef(id)) {
tscWarn("failed to close clientReqRefPool");
}
id = clientConnRefPool;
clientConnRefPool = -1;
taosCloseRef(id);
if (TSDB_CODE_SUCCESS != taosCloseRef(id)) {
tscWarn("failed to close clientReqRefPool");
}
DestroyRegexCache();
rpcCleanup();
@ -121,10 +127,13 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
if (NULL == rid) {
tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
*rid = pObj->id;
return (TAOS *)rid;
} else {
terrno = code;
}
return NULL;
@ -145,24 +154,24 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
switch (type) {
case TAOS_NOTIFY_PASSVER: {
taosThreadMutexLock(&pObj->mutex);
TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
pObj->passInfo.fp = fp;
pObj->passInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
break;
}
case TAOS_NOTIFY_WHITELIST_VER: {
taosThreadMutexLock(&pObj->mutex);
TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
pObj->whiteListInfo.fp = fp;
pObj->whiteListInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
break;
}
case TAOS_NOTIFY_USER_DROPPED: {
taosThreadMutexLock(&pObj->mutex);
TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
pObj->userDroppedInfo.fp = fp;
pObj->userDroppedInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
break;
}
default: {
@ -194,7 +203,13 @@ int32_t fetchWhiteListCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
}
SGetUserWhiteListRsp wlRsp;
tDeserializeSGetUserWhiteListRsp(pMsg->pData, pMsg->len, &wlRsp);
if (TSDB_CODE_SUCCESS != tDeserializeSGetUserWhiteListRsp(pMsg->pData, pMsg->len, &wlRsp)) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pInfo);
tFreeSGetUserWhiteListRsp(&wlRsp);
return terrno;
}
uint64_t* pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
if (pWhiteLists == NULL) {
@ -234,8 +249,14 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
}
SGetUserWhiteListReq req;
memcpy(req.user, pTsc->user, TSDB_USER_LEN);
(void)memcpy(req.user, pTsc->user, TSDB_USER_LEN);
int32_t msgLen = tSerializeSGetUserWhiteListReq(NULL, 0, &req);
if (msgLen < 0) {
fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
releaseTscObj(connId);
return;
}
void* pReq = taosMemoryMalloc(msgLen);
if (pReq == NULL) {
fp(param, TSDB_CODE_OUT_OF_MEMORY, taos, 0, NULL);
@ -279,7 +300,9 @@ void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *pa
int64_t transportId = 0;
SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo);
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, &transportId, pSendInfo)) {
tscWarn("failed to async send msg to server");
}
releaseTscObj(connId);
return;
}
@ -292,7 +315,7 @@ void taos_close_internal(void *taos) {
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(clientConnRefPool, pTscObj->id);
(void)taosRemoveRef(clientConnRefPool, pTscObj->id);
}
void taos_close(TAOS *taos) {
@ -458,6 +481,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
} else {
// assert to avoid un-initialization error
tscError("invalid result passed to taos_fetch_row");
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return NULL;
}
}
@ -526,7 +550,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
if(taosAscii2Hex(row[i], charLen, &data, &size) < 0){
break;
}
memcpy(str + len, data, size);
(void)memcpy(str + len, data, size);
len += size;
taosMemoryFree(data);
}break;
@ -544,7 +568,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
}
}
memcpy(str + len, row[i], charLen);
(void)memcpy(str + len, row[i], charLen);
len += charLen;
} break;
@ -691,7 +715,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
char sql[256] = {0};
snprintf(sql, tListLen(sql), "use %s", db);
(void)snprintf(sql, tListLen(sql), "use %s", db);
TAOS_RES *pRequest = taos_query(taos, sql);
int32_t code = taos_errno(pRequest);
@ -747,10 +771,10 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
return pRequest->code;
}
doAsyncFetchRows(pRequest, false, true);
(void)doAsyncFetchRows(pRequest, false, true);
// TODO refactor
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
@ -800,10 +824,10 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
return pRequest->code;
}
doAsyncFetchRows(pRequest, false, false);
(void)doAsyncFetchRows(pRequest, false, false);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
@ -811,7 +835,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData;
return 0;
return pRequest->code;
}
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
@ -869,26 +893,24 @@ const char *taos_get_server_info(TAOS *taos) {
int taos_get_current_db(TAOS *taos, char *database, int len, int *required) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return -1;
return TSDB_CODE_TSC_DISCONNECTED;
}
int code = TSDB_CODE_SUCCESS;
taosThreadMutexLock(&pTscObj->mutex);
TSC_ERR_JRET(taosThreadMutexLock(&pTscObj->mutex));
if (database == NULL || len <= 0) {
if (required != NULL) *required = strlen(pTscObj->db) + 1;
terrno = TSDB_CODE_INVALID_PARA;
code = -1;
TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
} else if (len < strlen(pTscObj->db) + 1) {
tstrncpy(database, pTscObj->db, len);
if (required) *required = strlen(pTscObj->db) + 1;
terrno = TSDB_CODE_INVALID_PARA;
code = -1;
TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
} else {
strcpy(database, pTscObj->db);
(void)strcpy(database, pTscObj->db);
code = 0;
}
taosThreadMutexUnlock(&pTscObj->mutex);
_return:
code = taosThreadMutexUnlock(&pTscObj->mutex);
releaseTscObj(*(int64_t *)taos);
return code;
}
@ -929,8 +951,8 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
if (pRequest->parseOnly) {
memcpy(&pRequest->parseMeta, pResultMeta, sizeof(*pResultMeta));
memset(pResultMeta, 0, sizeof(*pResultMeta));
(void)memcpy(&pRequest->parseMeta, pResultMeta, sizeof(*pResultMeta));
(void)memset(pResultMeta, 0, sizeof(*pResultMeta));
}
handleQueryAnslyseRes(pWrapper, pResultMeta, code);
@ -1266,13 +1288,13 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest);
SRequestObj* pUserReq = pRequest;
acquireRequest(pRequest->self);
(void)acquireRequest(pRequest->self);
while (pUserReq) {
if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) {
break;
} else {
int64_t nextRefId = pUserReq->relation.nextRefId;
releaseRequest(pUserReq->self);
(void)releaseRequest(pUserReq->self);
if (nextRefId) {
pUserReq = acquireRequest(nextRefId);
}
@ -1282,16 +1304,16 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
if (pUserReq) {
destroyCtxInRequest(pUserReq);
pUserReq->prevCode = code;
memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
(void)memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
} else {
tscError("User req is missing");
removeFromMostPrevReq(pRequest);
(void)removeFromMostPrevReq(pRequest);
return;
}
if (hasSubRequest)
removeFromMostPrevReq(pRequest);
(void)removeFromMostPrevReq(pRequest);
else
releaseRequest(pUserReq->self);
(void)releaseRequest(pUserReq->self);
doAsyncQuery(pUserReq, true);
}
@ -1381,7 +1403,7 @@ int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInf
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
snprintf(dbFName, sizeof(dbFName), "%d.%s", pTscObj->acctId, db);
(void)snprintf(dbFName, sizeof(dbFName), "%d.%s", pTscObj->acctId, db);
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, dbInfo);
if (code) {
@ -1431,7 +1453,7 @@ int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
SName tableName;
toName(pTscObj->acctId, db, table, &tableName);
(void)toName(pTscObj->acctId, db, table, &tableName);
SVgroupInfo vgInfo;
code = catalogGetTableHashVgroup(pCtg, &conn, &tableName, &vgInfo);
@ -1552,7 +1574,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
}
SSyncQueryParam *pParam = pRequest->body.interParam;
tsem_wait(&pParam->sem);
(void)tsem_wait(&pParam->sem);
_return:
destoryCatalogReq(&catalogReq);
@ -1569,7 +1591,9 @@ TAOS_STMT *taos_stmt_init(TAOS *taos) {
}
TAOS_STMT *pStmt = stmtInit(pObj, 0, NULL);
if (NULL == pStmt) {
tscError("stmt init failed, errcode:%s", terrstr());
}
releaseTscObj(*(int64_t *)taos);
return pStmt;
@ -1584,7 +1608,9 @@ TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid) {
}
TAOS_STMT *pStmt = stmtInit(pObj, reqid, NULL);
if (NULL == pStmt) {
tscError("stmt init failed, errcode:%s", terrstr());
}
releaseTscObj(*(int64_t *)taos);
return pStmt;
@ -1599,7 +1625,9 @@ TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) {
}
TAOS_STMT *pStmt = stmtInit(pObj, options->reqId, options);
if (NULL == pStmt) {
tscError("stmt init failed, errcode:%s", terrstr());
}
releaseTscObj(*(int64_t *)taos);
return pStmt;
@ -1714,7 +1742,11 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
}
int32_t insert = 0;
stmtIsInsert(stmt, &insert);
int32_t code = stmtIsInsert(stmt, &insert);
if (TSDB_CODE_SUCCESS != code) {
tscError("stmt insert failed, errcode:%s", tstrerror(code));
return code;
}
if (0 == insert && bind->num > 1) {
tscError("only one row data allowed for query");
terrno = TSDB_CODE_INVALID_PARA;
@ -1738,7 +1770,11 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, in
}
int32_t insert = 0;
stmtIsInsert(stmt, &insert);
int32_t code = stmtIsInsert(stmt, &insert);
if (TSDB_CODE_SUCCESS != code) {
tscError("stmt insert failed, errcode:%s", tstrerror(code));
return code;
}
if (0 == insert && bind->num > 1) {
tscError("only one row data allowed for query");
terrno = TSDB_CODE_INVALID_PARA;