From ebe7276c2c9f57e8d222a9bca80af87e94ba6527 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 25 Jul 2024 17:15:41 +0800 Subject: [PATCH] fix:[TD-31017]process return value in client for tmq --- include/util/tutil.h | 3 + source/client/src/clientEnv.c | 9 +- source/client/src/clientImpl.c | 4 +- source/client/src/clientMonitor.c | 147 ++++++++------- source/client/src/clientMonitorSlow.c | 2 +- source/client/src/clientMonitorSql.c | 2 +- source/client/src/clientMsgHandler.c | 245 ++++++++++++++++--------- source/client/src/clientStmt.c | 103 +++++++---- source/client/src/clientTmqConnector.c | 8 +- source/libs/parser/src/parInsertStmt.c | 8 + 10 files changed, 331 insertions(+), 200 deletions(-) diff --git a/include/util/tutil.h b/include/util/tutil.h index 305af76dea..cd40daf8aa 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -152,6 +152,9 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, #define TCONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member))) +#define TAOS_GET_TERRNO(code) \ + (terrno == 0 ? code : terrno) + #define TAOS_RETURN(CODE) \ do { \ return (terrno = (CODE)); \ diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index f640618897..4153282489 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -162,7 +162,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ data.clusterId = pTscObj->pAppInfo->clusterId; data.type = SLOW_LOG_WRITE; data.data = value; - if(monitorPutData2MonitorQueue(data) < 0){ + if(monitorPutData2MonitorQueue(data) != 0){ taosMemoryFree(value); } @@ -479,7 +479,10 @@ void *createRequest(uint64_t connId, int32_t type, int64_t reqid) { pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; - tsem_init(&pRequest->body.rspSem, 0, 0); + if (tsem2_init(&pRequest->body.rspSem, 0, 0) != 0) { + doDestroyRequest(pRequest); + return NULL; + } if (registerRequest(pRequest, pTscObj)) { doDestroyRequest(pRequest); @@ -601,7 +604,7 @@ void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->msgBuf); doFreeReqResultInfo(&pRequest->body.resInfo); - tsem_destroy(&pRequest->body.rspSem); + (void)tsem2_destroy(&pRequest->body.rspSem); taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->targetTableList); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3fb61b6902..26e5d83b56 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -324,7 +324,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg); - tsem_wait(&pRequest->body.rspSem); + (void)tsem2_wait(&pRequest->body.rspSem); return TSDB_CODE_SUCCESS; } @@ -1427,7 +1427,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); - tsem_wait(&pRequest->body.rspSem); + (void)tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { const char* errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index e3b073dbc8..d47b074658 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -21,13 +21,10 @@ SHashObj* monitorSlowLogHash; char tmpSlowLogPath[PATH_MAX] = {0}; static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { - if (tsTempDir == NULL) { - return -1; - } int ret = snprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); if (ret < 0) { tscError("failed to get tmp path ret:%d", ret); - return ret; + return TSDB_CODE_TSC_INTERNAL_ERROR; } return 0; } @@ -71,10 +68,9 @@ static void destroyMonitorClient(void* data) { if (pMonitor == NULL) { return; } - taosTmrStopA(&pMonitor->timer); + (void)taosTmrStopA(&pMonitor->timer); taosHashCleanup(pMonitor->counters); - taos_collector_registry_destroy(pMonitor->registry); - // taos_collector_destroy(pMonitor->colector); + (void)taos_collector_registry_destroy(pMonitor->registry); taosMemoryFree(pMonitor); } @@ -142,15 +138,17 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO void* buf = taosMemoryMalloc(tlen); if (buf == NULL) { tscError("sendReport failed, out of memory, len:%d", tlen); - terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAILED; } - tSerializeSStatisReq(buf, tlen, &sStatisReq); + tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq); + if (tlen < 0) { + taosMemoryFree(buf); + goto FAILED; + } SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pInfo == NULL) { tscError("sendReport failed, out of memory send info"); - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(buf); goto FAILED; } @@ -168,12 +166,12 @@ static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITO FAILED: monitorFreeSlowLogDataEx(param); - return -1; + return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR); } static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) { char ts[50] = {0}; - sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); + (void)sprintf(ts, "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI)); char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL); if (NULL == pCont) { tscError("generateClusterReport failed, get null content."); @@ -181,7 +179,7 @@ static void generateClusterReport(taos_collector_registry_t* registry, void* pTr } if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) { - taos_collector_registry_clear_batch(registry); + (void)taos_collector_registry_clear_batch(registry); } taosMemoryFreeClear(pCont); } @@ -202,7 +200,7 @@ static void reportSendProcess(void* param, void* tmrId) { SEpSet ep = getEpSet_s(&pInst->mgmtEp); generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); - taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); + (void)taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); taosRUnLockLatch(&monitorLock); } @@ -257,7 +255,6 @@ void monitorCreateClient(int64_t clusterId) { tscError("failed to create monitor counters"); goto fail; } - // taosHashSetFreeFp(pMonitor->counters, destroyCounter); if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) { tscError("failed to put monitor client to hash"); @@ -300,10 +297,14 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys); if (newCounter == NULL) return; MonitorClient* pMonitor = *ppMonitor; - taos_collector_add_metric(pMonitor->colector, newCounter); + if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0){ + tscError("failed to add metric to collector"); + (void)taos_counter_destroy(newCounter); + goto end; +} if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) { tscError("failed to put counter to monitor"); - taos_counter_destroy(newCounter); + (void)taos_counter_destroy(newCounter); goto end; } tscInfo("[monitor] monitorCreateClientCounter %" PRIx64 "(%p):%s : %p.", pMonitor->clusterId, pMonitor, name, @@ -332,7 +333,10 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** tscError("monitorCounterInc not found pCounter %" PRIx64 ":%s.", clusterId, counterName); goto end; } - taos_counter_inc(*ppCounter, label_values); + if (taos_counter_inc(*ppCounter, label_values) != 0){ + tscError("monitorCounterInc failed to inc %" PRIx64 ":%s.", clusterId, counterName); + goto end; + } tscDebug("[monitor] monitorCounterInc %" PRIx64 "(%p):%s", pMonitor->clusterId, pMonitor, counterName); end: @@ -360,24 +364,23 @@ static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpP tscInfo("[monitor] create slow log file:%s", path); pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - tscError("failed to open file:%s since %s", path, terrstr()); + tscError("failed to open file:%s since %d", path, errno); return; } SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient)); if (pClient == NULL) { tscError("failed to allocate memory for slow log client"); - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); return; } pClient->lastCheckTime = taosGetMonoTimestampMs(); - strcpy(pClient->path, path); + (void)strcpy(pClient->path, path); pClient->offset = 0; pClient->pFile = pFile; if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) { tscError("failed to put clusterId:%" PRId64 " to hash table", slowLogData->clusterId); - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); taosMemoryFree(pClient); return; } @@ -423,7 +426,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) { return NULL; } char* buf = pCont; - strcat(buf++, "["); + (void)strcat(buf++, "["); int64_t readSize = taosReadFile(pFile, buf, SLOW_LOG_SEND_SIZE_MAX); if (readSize <= 0) { if (readSize < 0) { @@ -454,7 +457,7 @@ static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) { static int64_t getFileSize(char* path) { int64_t fileSize = 0; if (taosStatFile(path, &fileSize, NULL, NULL) < 0) { - return -1; + return TSDB_CODE_TSC_INTERNAL_ERROR; } return fileSize; @@ -464,13 +467,13 @@ static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64 char* fileName, void* pTransporter, SEpSet* epSet) { if (data == NULL) { taosMemoryFree(fileName); - return -1; + return TSDB_CODE_INVALID_PARA; } MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData)); if (pParam == NULL) { taosMemoryFree(data); taosMemoryFree(fileName); - return -1; + return terrno; } pParam->data = data; pParam->offset = offset; @@ -486,7 +489,7 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs SAppInstInfo* pInst = getAppInstByClusterId(clusterId); if (pInst == NULL) { tscError("failed to get app instance by clusterId:%" PRId64, clusterId); - return -1; + return terrno; } SEpSet ep = getEpSet_s(&pInst->mgmtEp); char* data = readFile(pFile, offset, size); @@ -495,13 +498,20 @@ static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offs } static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) { + if (fileName == NULL){ + return; + } int64_t size = getFileSize(*fileName); if (size <= offset) { processFileInTheEnd(pFile, *fileName); tscDebug("[monitor] monitorSendSlowLogAtBeginning delete file:%s", *fileName); } else { int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName); - tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log clusterId:%" PRId64 ",ret:%d", clusterId, code); + if (code == 0){ + tscDebug("[monitor] monitorSendSlowLogAtBeginning send slow log succ, clusterId:%" PRId64, clusterId); + }else{ + tscError("[monitor] monitorSendSlowLogAtBeginning send slow log failed, clusterId:%" PRId64 ",ret:%d", clusterId, code); + } *fileName = NULL; } } @@ -509,10 +519,12 @@ static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, Td static void monitorSendSlowLogAtRunning(int64_t clusterId) { void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES); if (tmp == NULL) { + tscError("failed to get slow log client by clusterId:%" PRId64, clusterId); return; } SlowLogClient* pClient = (*(SlowLogClient**)tmp); if (pClient == NULL) { + tscError("failed to get slow log client by clusterId:%" PRId64, clusterId); return; } int64_t size = getFileSize(pClient->path); @@ -574,14 +586,16 @@ static void monitorSendAllSlowLogAtQuit() { } static void processFileRemoved(SlowLogClient* pClient) { - taosUnLockFile(pClient->pFile); - taosCloseFile(&(pClient->pFile)); + if (taosUnLockFile(pClient->pFile) != 0) { + tscError("failed to unlock file:%s since %d", pClient->path, errno); + return; + } + (void)taosCloseFile(&(pClient->pFile)); TdFilePtr pFile = taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - tscError("failed to open file:%s since %s", pClient->path, terrstr()); + tscError("failed to open file:%s since %d", pClient->path, errno); } else { pClient->pFile = pFile; } @@ -594,7 +608,7 @@ static void monitorSendAllSlowLog() { int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL); SAppInstInfo* pInst = getAppInstByClusterId(*clusterId); SlowLogClient* pClient = (*(SlowLogClient**)pIter); - if (pClient == NULL) { + if (pClient == NULL || pInst == NULL) { taosHashCancelIterate(monitorSlowLogHash, pIter); return; } @@ -604,7 +618,7 @@ static void monitorSendAllSlowLog() { continue; } - if (pInst != NULL && pClient->offset == 0) { + if (pClient->offset == 0) { int64_t size = getFileSize(pClient->path); if (size <= 0) { if (size < 0) { @@ -657,7 +671,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { } char filename[PATH_MAX] = {0}; - snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); + (void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE); if (pFile == NULL) { tscError("failed to open file:%s since %s", filename, terrstr()); @@ -665,7 +679,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { } if (taosLockFile(pFile) < 0) { tscError("failed to lock file:%s since %s, maybe used by other process", filename, terrstr()); - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); continue; } char* tmp = taosStrdup(filename); @@ -673,7 +687,7 @@ static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { taosMemoryFree(tmp); } - taosCloseDir(&pDir); + (void)taosCloseDir(&pDir); } static void* monitorThreadFunc(void* param) { @@ -707,7 +721,7 @@ static void* monitorThreadFunc(void* param) { } MonitorSlowLogData* slowLogData = NULL; - taosReadQitem(monitorQueue, (void**)&slowLogData); + (void)taosReadQitem(monitorQueue, (void**)&slowLogData); if (slowLogData != NULL) { if (slowLogData->type == SLOW_LOG_READ_BEGINNIG) { if (slowLogData->pFile != NULL) { @@ -735,7 +749,7 @@ static void* monitorThreadFunc(void* param) { if (quitCnt == 0) { monitorSendAllSlowLog(); } - tsem2_timewait(&monitorSem, 100); + (void)tsem2_timewait(&monitorSem, 100); } atomic_store_32(&slowLogFlag, -2); return NULL; @@ -743,12 +757,19 @@ static void* monitorThreadFunc(void* param) { static int32_t tscMonitortInit() { TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadAttrInit(&thAttr) != 0) { + tscError("failed to init thread attr since %s", strerror(errno)); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) { + tscError("failed to set thread attr since %s", strerror(errno)); + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + TdThread monitorThread; if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { tscError("failed to create monitor thread since %s", strerror(errno)); - return -1; + return TSDB_CODE_TSC_INTERNAL_ERROR; } taosThreadAttrDestroy(&thAttr); @@ -767,15 +788,14 @@ static void tscMonitorStop() { } int32_t monitorInit() { - int32_t code; + int32_t code = 0; tscInfo("[monitor] tscMonitor init"); monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorCounterHash == NULL) { tscError("failed to create monitorCounterHash"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); } taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); @@ -783,46 +803,39 @@ int32_t monitorInit() { (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (monitorSlowLogHash == NULL) { tscError("failed to create monitorSlowLogHash"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); } taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); if (monitorTimer == NULL) { tscError("failed to create monitor timer"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); } - if (getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)) < 0) { - terrno = TSDB_CODE_TSC_INTERNAL_ERROR; - return -1; + code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath)); + if (code != 0) { + return code; } if (taosMulModeMkDir(tmpSlowLogPath, 0777, true) != 0) { tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr()); - return terrno; + return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); } if (tsem2_init(&monitorSem, 0, 0) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); tscError("sem init error since %s", terrstr()); - return -1; + return TAOS_SYSTEM_ERROR(errno); } code = taosOpenQueue(&monitorQueue); if (code) { - terrno = code; tscError("open queue error since %s", terrstr()); - return -1; + return TAOS_GET_TERRNO(code); } taosInitRWLatch(&monitorLock); - if (tscMonitortInit() != 0) { - return -1; - } - return 0; + return tscMonitortInit(); } void monitorClose() { @@ -838,13 +851,13 @@ void monitorClose() { taosHashCleanup(monitorSlowLogHash); taosTmrCleanUp(monitorTimer); taosCloseQueue(monitorQueue); - tsem2_destroy(&monitorSem); + (void)tsem2_destroy(&monitorSem); taosWUnLockLatch(&monitorLock); } int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { - int32_t code; - MonitorSlowLogData* slowLogData; + int32_t code = 0; + MonitorSlowLogData* slowLogData = NULL; if (atomic_load_32(&slowLogFlag) == -2) { tscError("[monitor] slow log thread is exiting"); @@ -854,13 +867,13 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData); if (code) { tscError("[monitor] failed to allocate slow log data"); - return terrno = code; + return code; } *slowLogData = data; tscDebug("[monitor] write slow log to queue, clusterId:%" PRIx64 " type:%s, data:%s", slowLogData->clusterId, queueTypeStr[slowLogData->type], slowLogData->data); if (taosWriteQitem(monitorQueue, slowLogData) == 0) { - tsem2_post(&monitorSem); + (void)tsem2_post(&monitorSem); } else { monitorFreeSlowLogData(slowLogData); taosFreeQitem(slowLogData); diff --git a/source/client/src/clientMonitorSlow.c b/source/client/src/clientMonitorSlow.c index 192792f43e..5945f28884 100644 --- a/source/client/src/clientMonitorSlow.c +++ b/source/client/src/clientMonitorSlow.c @@ -69,7 +69,7 @@ void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost) { } else { clientSlowQueryLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, cost); } - releaseTscObj(rid); + (void)releaseTscObj(rid); } else { tscLog("slowQueryLog, not found rid"); } diff --git a/source/client/src/clientMonitorSql.c b/source/client/src/clientMonitorSql.c index 19d5b7506e..2556cf123a 100644 --- a/source/client/src/clientMonitorSql.c +++ b/source/client/src/clientMonitorSql.c @@ -65,7 +65,7 @@ void sqlReqLog(int64_t rid, bool killed, int32_t code, int8_t type) { } else { clientSQLReqLog(pTscObj->pAppInfo->clusterId, pTscObj->user, result, type); } - releaseTscObj(rid); + (void)releaseTscObj(rid); } else { tscLog("sqlReqLog, not found rid"); } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index d587deffc5..1e8c2720bf 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -40,7 +40,10 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { setErrno(pRequest, code); if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { - removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)); + code = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)); + if (code != 0){ + setErrno(pRequest, code); + } } taosMemoryFree(pMsg->pEpSet); @@ -48,7 +51,7 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) { if (pRequest->body.queryFp != NULL) { doRequestCallback(pRequest, code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } return code; } @@ -61,7 +64,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); goto End; } @@ -70,7 +73,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if (NULL == pTscObj->pAppInfo) { code = TSDB_CODE_TSC_DISCONNECTED; setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); goto End; } @@ -78,14 +81,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) { code = TSDB_CODE_TSC_INVALID_VERSION; setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); goto End; } if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) { tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer); setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); goto End; } @@ -95,14 +98,14 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { code = TSDB_CODE_TIME_UNSYNCED; tscError("time diff:%ds is too big", delta); setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); goto End; } if (connectRsp.epSet.numOfEps == 0) { code = TSDB_CODE_APP_ERROR; setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); goto End; } @@ -111,8 +114,13 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); SEpSet dstEpSet = connectRsp.epSet; if (srcEpSet.numOfEps == 1) { - rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, + code = rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn, dstEpSet.eps[dstEpSet.inUse].fqdn); + if (code != 0){ + setErrno(pRequest, code); + (void)tsem2_post(&pRequest->body.rspSem); + goto End; + } updateEpSet = 0; } } @@ -158,7 +166,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { MonitorSlowLogData data = {0}; data.clusterId = pTscObj->pAppInfo->clusterId; data.type = SLOW_LOG_READ_BEGINNIG; - monitorPutData2MonitorQueue(data); + (void)monitorPutData2MonitorQueue(data); monitorClientSlowQueryInit(connectRsp.clusterId); monitorClientSQLReqInit(connectRsp.clusterId); } @@ -167,12 +175,17 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { taosThreadMutexLock(&clientHbMgr.lock); SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx); if (pAppHbMgr) { - hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); + code = hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType); + if (code != 0){ + setErrno(pRequest, code); + (void)tsem2_post(&pRequest->body.rspSem); + goto End; + } } else { taosThreadMutexUnlock(&clientHbMgr.lock); code = TSDB_CODE_TSC_DISCONNECTED; setErrno(pRequest, code); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); goto End; } taosThreadMutexUnlock(&clientHbMgr.lock); @@ -180,7 +193,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, pTscObj->pAppInfo->numOfConns); - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); End: if (pRequest) { @@ -216,7 +229,7 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) { setErrno(pRequest, code); } else { struct SCatalog* pCatalog = NULL; - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (TSDB_CODE_SUCCESS == code) { STscObj* pTscObj = pRequest->pTscObj; @@ -225,28 +238,39 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) { .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; char dbFName[TSDB_DB_FNAME_LEN]; - snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); - catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); - snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); - catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); + (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); + code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + } + if (code == TSDB_CODE_SUCCESS) { + (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); + code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); + } + if (code != TSDB_CODE_SUCCESS) { + setErrno(pRequest, code); + } } } if (pRequest->body.queryFp) { doRequestCallback(pRequest, code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } return code; } int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { SRequestObj* pRequest = param; + SUseDbRsp usedbRsp = {0}; + code = tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); + if (code != 0){ + goto END; + } if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code || TSDB_CODE_MND_DB_IN_DROPPING == code) { - SUseDbRsp usedbRsp = {0}; - tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); struct SCatalog* pCatalog = NULL; if (usedbRsp.vgVersion >= 0) { // cached in local @@ -256,42 +280,35 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1)); } else { - catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); + code = catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid); + if (code != 0){ + goto END; + } } } - tFreeSUsedbRsp(&usedbRsp); } if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - setErrno(pRequest, code); - - if (pRequest->body.queryFp != NULL) { - doRequestCallback(pRequest, pRequest->code); - - } else { - tsem_post(&pRequest->body.rspSem); - } - - return code; + goto END; } - SUseDbRsp usedbRsp = {0}; - tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); - if (strlen(usedbRsp.db) == 0) { if (usedbRsp.errCode != 0) { - return usedbRsp.errCode; + code = usedbRsp.errCode; } else { - return TSDB_CODE_APP_ERROR; + code = TSDB_CODE_APP_ERROR; } + goto END; } tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum); for (int32_t i = 0; i < usedbRsp.vgNum; ++i) { SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i); + if (pInfo == NULL){ + code = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); + goto END; + } tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse); for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) { tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port); @@ -299,11 +316,13 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { } SName name = {0}; - tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); + code = tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); + if (code != 0){ + goto END; + } SUseDbOutput output = {0}; code = queryBuildUseDbOutput(&output, &usedbRsp); - if (code != 0) { terrno = code; if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); @@ -317,28 +336,32 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code1)); } else { - catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); - output.dbVgroup = NULL; + code = catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); + if (code == 0){ + output.dbVgroup = NULL; + } } } taosMemoryFreeClear(output.dbVgroup); - tFreeSUsedbRsp(&usedbRsp); - char db[TSDB_DB_NAME_LEN] = {0}; - tNameGetDbName(&name, db); + (void)tNameGetDbName(&name, db); setConnectionDB(pRequest->pTscObj, db); + +END: + setErrno(pRequest, code); + tFreeSUsedbRsp(&usedbRsp); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); if (pRequest->body.queryFp != NULL) { doRequestCallback(pRequest, pRequest->code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } - return 0; + return code; } int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { @@ -353,7 +376,10 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { SMCreateStbRsp createRsp = {0}; SDecoder coder = {0}; tDecoderInit(&coder, pMsg->pData, pMsg->len); - tDecodeSMCreateStbRsp(&coder, &createRsp); + code = tDecodeSMCreateStbRsp(&coder, &createRsp); + if (code != 0){ + setErrno(pRequest, TAOS_GET_TERRNO(code)); + } tDecoderClear(&coder); pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB; @@ -380,7 +406,7 @@ int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { doRequestCallback(pRequest, code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } return code; } @@ -391,33 +417,49 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) { setErrno(pRequest, code); } else { SDropDbRsp dropdbRsp = {0}; - tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp); - + code = tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp); + if (code != 0){ + setErrno(pRequest, TAOS_GET_TERRNO(code)); + goto END; + } struct SCatalog* pCatalog = NULL; - int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (TSDB_CODE_SUCCESS == code) { - catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); + code = catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid); + if (code != 0){ + setErrno(pRequest, TAOS_GET_TERRNO(code)); + goto END; + } STscObj* pTscObj = pRequest->pTscObj; SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; - char dbFName[TSDB_DB_FNAME_LEN]; - snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); - catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); - snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); - catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB); + code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); + if (code != 0){ + setErrno(pRequest, TAOS_GET_TERRNO(code)); + goto END; + } + (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB); + code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName); + if (code != 0){ + setErrno(pRequest, TAOS_GET_TERRNO(code)); + goto END; + } } } +END: taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); if (pRequest->body.queryFp != NULL) { doRequestCallback(pRequest, code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } return code; } @@ -430,7 +472,10 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { SMAlterStbRsp alterRsp = {0}; SDecoder coder = {0}; tDecoderInit(&coder, pMsg->pData, pMsg->len); - tDecodeSMAlterStbRsp(&coder, &alterRsp); + code = tDecodeSMAlterStbRsp(&coder, &alterRsp); + if (code != 0){ + setErrno(pRequest, TAOS_GET_TERRNO(code)); + } tDecoderClear(&coder); pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB; @@ -457,57 +502,72 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) { doRequestCallback(pRequest, code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } return code; } static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) { + int32_t code = 0; + int32_t line = 0; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY); pBlock->info.hasVarCol = true; pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData)); - + TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN; - taosArrayPush(pBlock->pDataBlock, &infoData); + TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY); infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN; - taosArrayPush(pBlock->pDataBlock, &infoData); + TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY); infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN; - taosArrayPush(pBlock->pDataBlock, &infoData); + TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY); int32_t numOfCfg = taosArrayGetSize(pVars); - blockDataEnsureCapacity(pBlock, numOfCfg); + code = blockDataEnsureCapacity(pBlock, numOfCfg); + TSDB_CHECK_CODE(code, line, END); for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { SVariablesInfo* pInfo = taosArrayGet(pVars, i); + TSDB_CHECK_NULL(pInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY); char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, name, false); + TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY); + code = colDataSetVal(pColInfo, i, name, false); + TSDB_CHECK_CODE(code, line, END); char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE); pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, value, false); + TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY); + code = colDataSetVal(pColInfo, i, value, false); + TSDB_CHECK_CODE(code, line, END); char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE); pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, scope, false); + TSDB_CHECK_NULL(pColInfo, code, line, END, TSDB_CODE_OUT_OF_MEMORY); + code = colDataSetVal(pColInfo, i, scope, false); + TSDB_CHECK_CODE(code, line, END); } pBlock->info.rows = numOfCfg; *block = pBlock; + return code; - return TSDB_CODE_SUCCESS; +END: + taosArrayDestroy(pBlock->pDataBlock); + taosMemoryFree(pBlock); + return code; } static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { @@ -577,55 +637,72 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) { if (pRequest->body.queryFp != NULL) { doRequestCallback(pRequest, code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } return code; } static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) { + int32_t code = 0; + int32_t line = 0; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + TSDB_CHECK_NULL(pBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY); pBlock->info.hasVarCol = true; pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData)); - + TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, TSDB_CODE_OUT_OF_MEMORY); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN; - taosArrayPush(pBlock->pDataBlock, &infoData); + TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY); infoData.info.type = TSDB_DATA_TYPE_INT; infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; - taosArrayPush(pBlock->pDataBlock, &infoData); + TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY); infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN; - taosArrayPush(pBlock->pDataBlock, &infoData); + TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, TSDB_CODE_OUT_OF_MEMORY); - blockDataEnsureCapacity(pBlock, 1); + code = blockDataEnsureCapacity(pBlock, 1); + TSDB_CHECK_CODE(code, line, END); SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0); + TSDB_CHECK_NULL(pResultCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY); SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1); + TSDB_CHECK_NULL(pIdCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY); SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2); + TSDB_CHECK_NULL(pReasonCol, code, line, END, TSDB_CODE_OUT_OF_MEMORY); + char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0}; char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0}; if (pRsp->bAccepted) { STR_TO_VARSTR(result, "accepted"); - colDataSetVal(pResultCol, 0, result, false); - colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false); + code = colDataSetVal(pResultCol, 0, result, false); + TSDB_CHECK_CODE(code, line, END); + code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false); + TSDB_CHECK_CODE(code, line, END); STR_TO_VARSTR(reason, "success"); - colDataSetVal(pReasonCol, 0, reason, false); + code = colDataSetVal(pReasonCol, 0, reason, false); + TSDB_CHECK_CODE(code, line, END); } else { STR_TO_VARSTR(result, "rejected"); - colDataSetVal(pResultCol, 0, result, false); + code = colDataSetVal(pResultCol, 0, result, false); + TSDB_CHECK_CODE(code, line, END); colDataSetNULL(pIdCol, 0); STR_TO_VARSTR(reason, "compaction is ongoing"); - colDataSetVal(pReasonCol, 0, reason, false); + code = colDataSetVal(pReasonCol, 0, reason, false); + TSDB_CHECK_CODE(code, line, END); } pBlock->info.rows = 1; *block = pBlock; return TSDB_CODE_SUCCESS; +END: + taosMemoryFree(pBlock); + taosArrayDestroy(pBlock->pDataBlock); + return code; } static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) { @@ -696,7 +773,7 @@ int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) { if (pRequest->body.queryFp != NULL) { pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code); } else { - tsem_post(&pRequest->body.rspSem); + (void)tsem2_post(&pRequest->body.rspSem); } return code; } diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 17b52521b8..11c27b409d 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -14,15 +14,20 @@ static FORCE_INLINE int32_t stmtAllocQNodeFromBuf(STableBufInfo* pTblBuf, void** pTblBuf->buffOffset += pTblBuf->buffUnit; } else if (pTblBuf->buffIdx < taosArrayGetSize(pTblBuf->pBufList)) { pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, pTblBuf->buffIdx++); + if (NULL == pTblBuf->pCurBuff) { + return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY); + } *pBuf = pTblBuf->pCurBuff; pTblBuf->buffOffset = pTblBuf->buffUnit; } else { void* buff = taosMemoryMalloc(pTblBuf->buffSize); if (NULL == buff) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } - taosArrayPush(pTblBuf->pBufList, &buff); + if(taosArrayPush(pTblBuf->pBufList, &buff) == NULL){ + return terrno; + } pTblBuf->buffIdx++; pTblBuf->pCurBuff = buff; @@ -48,7 +53,7 @@ bool stmtDequeue(STscStmt* pStmt, SStmtQNode** param) { *param = node; - atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); + (void)atomic_sub_fetch_64(&pStmt->queue.qRemainNum, 1); return true; } @@ -58,7 +63,7 @@ void stmtEnqueue(STscStmt* pStmt, SStmtQNode* param) { pStmt->queue.tail = param; pStmt->stat.bindDataNum++; - atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); + (void)atomic_add_fetch_64(&pStmt->queue.qRemainNum, 1); } static int32_t stmtCreateRequest(STscStmt* pStmt) { @@ -183,8 +188,8 @@ int32_t stmtBackupQueryFields(STscStmt* pStmt) { if (NULL == pRes->fields || NULL == pRes->userFields) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size); - memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); + (void)memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size); + (void)memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); return TSDB_CODE_SUCCESS; } @@ -201,7 +206,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) { if (NULL == pStmt->exec.pRequest->body.resInfo.fields) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size); + (void)memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size); } if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { @@ -209,7 +214,7 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) { if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size); + (void)memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size); } return TSDB_CODE_SUCCESS; @@ -219,10 +224,13 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, bool autoCreateTbl) { STscStmt* pStmt = (STscStmt*)stmt; char tbFName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(tbName, tbFName); + int32_t code = tNameExtractFullName(tbName, tbFName); + if (code != 0){ + return code; + } - memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName)); - strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1); + (void)memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName)); + (void)strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1); pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0; pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid; @@ -383,17 +391,21 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { taosMemoryFreeClear(pStmt->bInfo.boundTags); } pStmt->bInfo.stbFName[0] = 0; - ; + return TSDB_CODE_SUCCESS; } void stmtFreeTableBlkList(STableColsData* pTb) { - qResetStmtColumns(pTb->aCol, true); + (void)qResetStmtColumns(pTb->aCol, true); taosArrayDestroy(pTb->aCol); } void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) { pTblBuf->pCurBuff = taosArrayGetP(pTblBuf->pBufList, 0); + if (NULL == pTblBuf->pCurBuff) { + tscError("QInfo:%p, failed to get buffer from list", pTblBuf); + return; + } pTblBuf->buffIdx = 1; pTblBuf->buffOffset = sizeof(*pQueue->head); @@ -505,7 +517,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx); taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols); - memset(&pStmt->sql, 0, sizeof(pStmt->sql)); + (void)memset(&pStmt->sql, 0, sizeof(pStmt->sql)); pStmt->sql.siInfo.tableColsReady = true; STMT_DLOG_E("end to free SQL info"); @@ -726,6 +738,9 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) { for (int32_t i = 0; i < pStmt->sql.siInfo.pTableColsIdx; ++i) { SArray** p = (SArray**)TARRAY_GET_ELEM(pStmt->sql.siInfo.pTableCols, i); *p = taosArrayInit(20, POINTER_BYTES); + if (*p == NULL) { + STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } } atomic_store_8((int8_t*)&pStmt->sql.siInfo.tableColsReady, true); @@ -735,7 +750,7 @@ int32_t stmtAsyncOutput(STscStmt* pStmt, void* param) { // taosMemoryFree(pParam->pTbData); - atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); + (void)atomic_sub_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); } return TSDB_CODE_SUCCESS; } @@ -757,7 +772,7 @@ void* stmtBindThreadFunc(void* param) { continue; } - stmtAsyncOutput(pStmt, asyncParam); + (void)stmtAsyncOutput(pStmt, asyncParam); } qInfo("stmt bind thread stopped"); @@ -767,8 +782,12 @@ void* stmtBindThreadFunc(void* param) { int32_t stmtStartBindThread(STscStmt* pStmt) { TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadAttrInit(&thAttr) != 0) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } if (taosThreadCreate(&pStmt->bindThread, &thAttr, stmtBindThreadFunc, pStmt) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -777,7 +796,7 @@ int32_t stmtStartBindThread(STscStmt* pStmt) { pStmt->bindThreadInUse = true; - taosThreadAttrDestroy(&thAttr); + (void)taosThreadAttrDestroy(&thAttr); return TSDB_CODE_SUCCESS; } @@ -800,7 +819,9 @@ int32_t stmtInitTableBuf(STableBufInfo* pTblBuf) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pTblBuf->pBufList, &buff); + if (taosArrayPush(pTblBuf->pBufList, &buff) == NULL){ + return TSDB_CODE_OUT_OF_MEMORY; + } pTblBuf->pCurBuff = buff; pTblBuf->buffIdx = 1; @@ -834,7 +855,7 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) { pStmt->errCode = TSDB_CODE_SUCCESS; if (NULL != pOptions) { - memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); + (void)memcpy(&pStmt->options, pOptions, sizeof(pStmt->options)); if (pOptions->singleStbInsert && pOptions->singleTableBindOnce) { pStmt->stbInterlaceMode = true; } @@ -848,24 +869,26 @@ TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid, TAOS_STMT_OPTIONS* pOptions) { pStmt->sql.siInfo.pTableHash = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (NULL == pStmt->sql.siInfo.pTableHash) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stmtClose(pStmt); + (void)stmtClose(pStmt); return NULL; } pStmt->sql.siInfo.pTableCols = taosArrayInit(STMT_TABLE_COLS_NUM, POINTER_BYTES); if (NULL == pStmt->sql.siInfo.pTableCols) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stmtClose(pStmt); + (void)stmtClose(pStmt); return NULL; } code = stmtInitTableBuf(&pStmt->sql.siInfo.tbBuf); if (TSDB_CODE_SUCCESS == code) { - stmtInitQueue(pStmt); + code = stmtInitQueue(pStmt); + } + if (TSDB_CODE_SUCCESS == code) { code = stmtStartBindThread(pStmt); } if (TSDB_CODE_SUCCESS != code) { terrno = code; - stmtClose(pStmt); + (void)stmtClose(pStmt); return NULL; } } @@ -904,7 +927,7 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { char* dbName = NULL; if (qParseDbName(sql, length, &dbName)) { - stmtSetDbName(stmt, dbName); + STMT_ERR_RET(stmtSetDbName(stmt, dbName)); taosMemoryFreeClear(dbName); } @@ -928,7 +951,9 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) { return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); + if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols)) { + return TSDB_CODE_OUT_OF_MEMORY; + } } pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags; @@ -977,18 +1002,18 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); - tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName); + STMT_ERR_RET(tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName)); STMT_ERR_RET(stmtGetFromCache(pStmt)); if (pStmt->bInfo.needParse) { - strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); + (void)strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; STMT_ERR_RET(stmtParseSql(pStmt)); } } else { - strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); + (void)strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1); pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0; pStmt->exec.pRequest->requestId++; pStmt->bInfo.needParse = false; @@ -1142,7 +1167,7 @@ int32_t stmtAppendTablePostHandle(STscStmt* pStmt, SStmtQNode* param) { param->tblData.getFromHash = pStmt->sql.siInfo.tbFromHash; param->next = NULL; - atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); + (void)atomic_add_fetch_64(&pStmt->sql.siInfo.tbRemainNum, 1); stmtEnqueue(pStmt, param); @@ -1162,7 +1187,9 @@ static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray** return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols); + if (taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols) == NULL){ + return TSDB_CODE_OUT_OF_MEMORY; + } } } } @@ -1428,7 +1455,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { .requestId = pStmt->exec.pRequest->requestId, .requestObjRefId = pStmt->exec.pRequest->self, .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; - int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); + code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta); pStmt->stat.ctgGetTbMetaNum++; @@ -1515,7 +1542,7 @@ int stmtExec(TAOS_STMT* stmt) { STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE)); if (STMT_TYPE_QUERY == pStmt->sql.type) { - launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); + (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } else { if (pStmt->sql.stbInterlaceMode) { int64_t startTs = taosGetTimestampUs(); @@ -1537,7 +1564,7 @@ int stmtExec(TAOS_STMT* stmt) { STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash)); } - launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); + (void)launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL); } if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) { @@ -1562,7 +1589,7 @@ _return: taosUsleep(1); } - stmtCleanExecInfo(pStmt, (code ? false : true), false); + STMT_ERR_RET(stmtCleanExecInfo(pStmt, (code ? false : true), false)); tFreeSSubmitRsp(pRsp); @@ -1582,7 +1609,7 @@ int stmtClose(TAOS_STMT* stmt) { pStmt->queue.stopQueue = true; if (pStmt->bindThreadInUse) { - taosThreadJoin(pStmt->bindThread, NULL); + (void)taosThreadJoin(pStmt->bindThread, NULL); pStmt->bindThreadInUse = false; } @@ -1597,7 +1624,7 @@ int stmtClose(TAOS_STMT* stmt) { pStmt->stat.bindDataUs1, pStmt->stat.bindDataUs2, pStmt->stat.bindDataUs3, pStmt->stat.bindDataUs4, pStmt->stat.addBatchUs, pStmt->stat.execWaitUs, pStmt->stat.execUseUs); - stmtCleanSQLInfo(pStmt); + STMT_ERR_RET(stmtCleanSQLInfo(pStmt)); taosMemoryFree(stmt); return TSDB_CODE_SUCCESS; diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index 2bea738c23..be00ec34c9 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -41,7 +41,7 @@ void tmqGlobalMethod(JNIEnv *env) { } if (g_vm == NULL) { - (*env)->GetJavaVM(env, &g_vm); + (void)((*env)->GetJavaVM(env, &g_vm)); } jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback"); @@ -68,7 +68,7 @@ void tmqAssignmentMethod(JNIEnv *env) { } if (g_vm == NULL) { - (*env)->GetJavaVM(env, &g_vm); + (void)((*env)->GetJavaVM(env, &g_vm)); } jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment"); @@ -104,7 +104,7 @@ void commit_cb(tmq_t *tmq, int32_t code, void *param) { param = NULL; if (needDetach) { - (*g_vm)->DetachCurrentThread(g_vm); + (void)((*g_vm)->DetachCurrentThread(g_vm)); } env = NULL; } @@ -126,7 +126,7 @@ void consumer_callback(tmq_t *tmq, int32_t code, void *param) { param = NULL; if (needDetach) { - (*g_vm)->DetachCurrentThread(g_vm); + (void)((*g_vm)->DetachCurrentThread(g_vm)); } env = NULL; } diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 2bcc7501fb..6ae6898af2 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -518,6 +518,10 @@ int32_t qResetStmtColumns(SArray* pCols, bool deepClear) { for (int32_t i = 0; i < colNum; ++i) { SColData* pCol = (SColData*)taosArrayGet(pCols, i); + if (pCol == NULL){ + qError("qResetStmtColumns column is NULL"); + return TSDB_CODE_OUT_OF_MEMORY; + } if (deepClear) { tColDataDeepClear(pCol); } else { @@ -534,6 +538,10 @@ int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) { for (int32_t i = 0; i < colNum; ++i) { SColData* pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i); + if (pCol == NULL){ + qError("qResetStmtDataBlock column is NULL"); + return TSDB_CODE_OUT_OF_MEMORY; + } if (deepClear) { tColDataDeepClear(pCol); } else {