diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9b49c1908d..a52e87bbb3 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -198,7 +198,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TSC_ENCODE_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0231) #define TSDB_CODE_TSC_ENCODE_PARAM_NULL TAOS_DEF_ERROR_CODE(0, 0X0232) #define TSDB_CODE_TSC_COMPRESS_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0233) -#define TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR TAOS_DEF_ERROR_CODE(0, 0X0234) +#define TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR TAOS_DEF_ERROR_CODE(0, 0X0234) +#define TSDB_CODE_TSC_FAIL_GENERATE_JSON TAOS_DEF_ERROR_CODE(0, 0X0235) #define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0X02FF) // mnode-common diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 507738acc9..95d08829ae 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -348,7 +348,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo); +int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo, + void **p); void destroyTscObj(void* pObj); STscObj* acquireTscObj(int64_t rid); int32_t releaseTscObj(int64_t rid); @@ -356,7 +357,7 @@ void destroyAppInst(void* pAppInfo); uint64_t generateRequestId(); -void* createRequest(uint64_t connId, int32_t type, int64_t reqid); +int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest); void destroyRequest(SRequestObj* pRequest); SRequestObj* acquireRequest(int64_t rid); int32_t releaseRequest(int64_t rid); @@ -370,7 +371,7 @@ void resetConnectDB(STscObj* pTscObj); int taos_options_imp(TSDB_OPTION option, const char* str); -void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); +int32_t openTransporter(const char* user, const char* auth, int32_t numOfThreads, void **pDnodeConn); void tscStopCrashReport(); typedef struct AsyncArg { @@ -381,8 +382,8 @@ typedef struct AsyncArg { bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); -STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, - uint16_t port, int connType); +int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, + uint16_t port, int connType, STscObj** pObj); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb); @@ -443,6 +444,30 @@ void freeQueryParam(SSyncQueryParam* param); int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes); #endif +#define TSC_ERR_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) +#define TSC_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + } \ + return _code; \ + } while (0) +#define TSC_ERR_JRET(c) \ + do { \ + code = c; \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index f640618897..db1de705ad 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -44,6 +44,26 @@ #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 +#define ENV_JSON_FALSE_CHECK(c) \ + do { \ + if (!c) { \ + tscError("faild to add item to JSON object");\ + code = TSDB_CODE_TSC_FAIL_GENERATE_JSON; \ + goto _end; \ + } \ + } while (0) + +#define ENV_ERR_RET(c,info) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + errno = _code; \ + tscInitRes = _code; \ + tscError(info); \ + return; \ + } \ + } while (0) + STscDbg tscDbg = {0}; SAppInfo appInfo; int64_t lastClusterId = 0; @@ -57,8 +77,14 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { + int32_t code = TSDB_CODE_SUCCESS; // connection has been released already, abort creating request. pRequest->self = taosAddRef(clientReqRefPool, pRequest); + if (pRequest->self < 0) { + tscError("failed to add ref to request"); + code = terrno; + return code; + } int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); @@ -72,19 +98,23 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } - return TSDB_CODE_SUCCESS; + return code; } static void concatStrings(SArray *list, char* buf, int size){ int len = 0; for(int i = 0; i < taosArrayGetSize(list); i++){ char* db = taosArrayGet(list, i); + if (NULL == db) { + tscError("get dbname failed, buf:%s", buf); + break; + } char* dot = strchr(db, '.'); if (dot != NULL) { db = dot + 1; } if (i != 0){ - strcat(buf, ","); + (void)strcat(buf, ","); len += 1; } int ret = snprintf(buf + len, size - len, "%s", db); @@ -100,61 +130,70 @@ static void concatStrings(SArray *list, char* buf, int size){ } } -static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration){ +static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration){ cJSON* json = cJSON_CreateObject(); + int32_t code = TSDB_CODE_SUCCESS; if (json == NULL) { tscError("[monitor] cJSON_CreateObject failed"); - return; + return TSDB_CODE_OUT_OF_MEMORY; } char clusterId[32] = {0}; if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0){ tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId); + code = TSDB_CODE_FAILED; + goto _end; } char startTs[32] = {0}; if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start/1000) < 0){ tscError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000); + code = TSDB_CODE_FAILED; + goto _end; } char requestId[32] = {0}; if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0){ tscError("failed to generate requestId:%" PRIu64, pRequest->requestId); + code = TSDB_CODE_FAILED; + goto _end; } - cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)); - cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)); - cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)); - cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration/1000)); - cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)); - cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))); - cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)); - cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration/1000))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code)))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows))); if(pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; - cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr))); pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp; }else{ - cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr))); } - cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)); - cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)); - cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName))); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn))); char pid[32] = {0}; if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0){ tscError("failed to generate pid:%d", appInfo.pid); + code = TSDB_CODE_FAILED; + goto _end; } - cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid))); if(pRequest->dbList != NULL){ char dbList[1024] = {0}; concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1); - cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList))); }else if(pRequest->pDb != NULL){ - cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb))); }else{ - cJSON_AddItemToObject(json, "db", cJSON_CreateString("")); + ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(""))); } char* value = cJSON_PrintUnformatted(json); @@ -162,11 +201,15 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ data.clusterId = pTscObj->pAppInfo->clusterId; data.type = SLOW_LOG_WRITE; data.data = value; - if(monitorPutData2MonitorQueue(data) < 0){ + code = monitorPutData2MonitorQueue(data); + if (TSDB_CODE_SUCCESS != code) { taosMemoryFree(value); + goto _end; } +_end: cJSON_Delete(json); + return code; } static bool checkSlowLogExceptDb(SRequestObj *pRequest, char* exceptDb) { @@ -176,6 +219,10 @@ static bool checkSlowLogExceptDb(SRequestObj *pRequest, char* exceptDb) { for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) { char *db = taosArrayGet(pRequest->dbList, i); + if (NULL == db) { + tscError("get dbname failed, exceptDb:%s", exceptDb); + return false; + } char *dot = strchr(db, '.'); if (dot != NULL) { db = dot + 1; @@ -215,7 +262,7 @@ static void deregisterRequest(SRequestObj *pRequest) { "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs); - atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); + (void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); reqType = SLOW_LOG_TYPE_INSERT; } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 @@ -223,11 +270,13 @@ static void deregisterRequest(SRequestObj *pRequest) { duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, pRequest->metric.planCostUs, pRequest->metric.execCostUs); - atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); + (void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); reqType = SLOW_LOG_TYPE_QUERY; } - nodesSimReleaseAllocator(pRequest->allocatorRefId); + if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) { + tscError("failed to release allocator"); + } } if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){ @@ -242,19 +291,23 @@ static void deregisterRequest(SRequestObj *pRequest) { if ((duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL || duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThresholdTest * 1000000UL) && checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->monitorParas.tsSlowLogExceptDb)) { - atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); + (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) { taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s", taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){ slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration); - generateWriteSlowLog(pTscObj, pRequest, reqType, duration); + if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) { + tscError("failed to generate write slow log"); + } } } } - releaseTscObj(pTscObj->id); + if (TSDB_CODE_SUCCESS != releaseTscObj(pTscObj->id)) { + tscError("failed to release STscObj"); + } } // todo close the transporter properly @@ -289,9 +342,9 @@ static bool clientRpcTfp(int32_t code, tmsg_t msgType) { } // TODO refactor -void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { +int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) { SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); + (void)memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; rpcInit.numOfThreads = tsNumOfRpcThreads; @@ -315,15 +368,19 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; - taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); - - void *pDnodeConn = rpcOpen(&rpcInit); - if (pDnodeConn == NULL) { - tscError("failed to init connection to server"); - return NULL; + int32_t code = taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + if (TSDB_CODE_SUCCESS != code) { + tscError("invalid version string."); + return code; } - return pDnodeConn; + *pDnodeConn = rpcOpen(&rpcInit); + if (*pDnodeConn == NULL) { + tscError("failed to init connection to server."); + code = TSDB_CODE_FAILED; + } + + return code; } void destroyAllRequests(SHashObj *pRequests) { @@ -334,7 +391,7 @@ void destroyAllRequests(SHashObj *pRequests) { SRequestObj *pRequest = acquireRequest(*rid); if (pRequest) { destroyRequest(pRequest); - releaseRequest(*rid); + (void)releaseRequest(*rid); // ignore error } pIter = taosHashIterate(pRequests, pIter); @@ -349,7 +406,7 @@ void stopAllRequests(SHashObj *pRequests) { SRequestObj *pRequest = acquireRequest(*rid); if (pRequest) { taos_stop_query(pRequest); - releaseRequest(*rid); + (void)releaseRequest(*rid); // ignore error } pIter = taosHashIterate(pRequests, pIter); @@ -360,18 +417,31 @@ void destroyAppInst(void *info) { SAppInstInfo* pAppInfo = *(SAppInstInfo**)info; tscDebug("destroy app inst mgr %p", pAppInfo); - taosThreadMutexLock(&appInfo.mutex); + int32_t code = taosThreadMutexLock(&appInfo.mutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock app info, code:%s", tstrerror(code)); + } hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); - taosThreadMutexUnlock(&appInfo.mutex); + code = taosThreadMutexUnlock(&appInfo.mutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock app info, code:%s", tstrerror(code)); + } taosMemoryFreeClear(pAppInfo->instKey); closeTransporter(pAppInfo); - taosThreadMutexLock(&pAppInfo->qnodeMutex); + code = taosThreadMutexLock(&pAppInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock qnode mutex, code:%s", tstrerror(code)); + } + taosArrayDestroy(pAppInfo->pQnodeList); - taosThreadMutexUnlock(&pAppInfo->qnodeMutex); + code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock qnode mutex, code:%s", tstrerror(code)); + } taosMemoryFree(pAppInfo); } @@ -396,97 +466,110 @@ void destroyTscObj(void *pObj) { pTscObj->pAppInfo->numOfConns); // In any cases, we should not free app inst here. Or an race condition rises. - /*int64_t connNum = */ atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + /*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - taosThreadMutexDestroy(&pTscObj->mutex); + (void)taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFree(pTscObj); tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); } -void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) { - STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); +int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo, + void **p) { + STscObj *pObj = (STscObj *)*p; + pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); if (NULL == pObj) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (NULL == pObj->pRequests) { taosMemoryFree(pObj); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pObj->connType = connType; pObj->pAppInfo = pAppInfo; pObj->appHbMgrIdx = pAppInfo->pAppHbMgr->idx; tstrncpy(pObj->user, user, sizeof(pObj->user)); - memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); + (void)memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); if (db != NULL) { tstrncpy(pObj->db, db, tListLen(pObj->db)); } - taosThreadMutexInit(&pObj->mutex, NULL); - pObj->id = taosAddRef(clientConnRefPool, pObj); + int32_t code = taosThreadMutexInit(&pObj->mutex, NULL); + if (TSDB_CODE_SUCCESS != code) { + return TAOS_SYSTEM_ERROR(code); + } - atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); + pObj->id = taosAddRef(clientConnRefPool, pObj); + if (pObj->id < 0) { + tscError("failed to add object to clientConnRefPool"); + code = terrno; + taosMemoryFree(pObj); + return code; + } + + (void)atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj); - return pObj; + return code; } STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); } int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } -void *createRequest(uint64_t connId, int32_t type, int64_t reqid) { - SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); - if (NULL == pRequest) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; +int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) { + int32_t code = TSDB_CODE_SUCCESS; + *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); + if (NULL == *pRequest) { + return TSDB_CODE_OUT_OF_MEMORY; } STscObj *pTscObj = acquireTscObj(connId); if (pTscObj == NULL) { - taosMemoryFree(pRequest); - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; + code = TSDB_CODE_TSC_DISCONNECTED; + goto _return; } SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); if (interParam == NULL) { - releaseTscObj(connId); - doDestroyRequest(pRequest); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + if (TSDB_CODE_SUCCESS != releaseTscObj(connId)) { + tscError("failed to release TscObj"); + } + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; } - tsem_init(&interParam->sem, 0, 0); - interParam->pRequest = pRequest; - pRequest->body.interParam = interParam; + TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0)); + interParam->pRequest = *pRequest; + (*pRequest)->body.interParam = interParam; - pRequest->resType = RES_TYPE__QUERY; - pRequest->requestId = reqid == 0 ? generateRequestId() : reqid; - pRequest->metric.start = taosGetTimestampUs(); + (*pRequest)->resType = RES_TYPE__QUERY; + (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid; + (*pRequest)->metric.start = taosGetTimestampUs(); - pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default - pRequest->type = type; - pRequest->allocatorRefId = -1; + (*pRequest)->body.resInfo.convertUcs4 = true; // convert ucs4 by default + (*pRequest)->type = type; + (*pRequest)->allocatorRefId = -1; - pRequest->pDb = getDbOfConnection(pTscObj); - pRequest->pTscObj = pTscObj; - pRequest->inCallback = false; + (*pRequest)->pDb = getDbOfConnection(pTscObj); + (*pRequest)->pTscObj = pTscObj; + (*pRequest)->inCallback = false; - pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); - pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; - tsem_init(&pRequest->body.rspSem, 0, 0); - - if (registerRequest(pRequest, pTscObj)) { - doDestroyRequest(pRequest); - return NULL; + (*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); + if (NULL == (*pRequest)->msgBuf) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; } + (*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; + TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0)); + TSC_ERR_JRET(registerRequest(*pRequest, pTscObj)); - return pRequest; + return TSDB_CODE_SUCCESS; +_return: + doDestroyRequest(*pRequest); + return code; } void doFreeReqResultInfo(SReqResultInfo *pResInfo) { @@ -521,12 +604,12 @@ int64_t removeFromMostPrevReq(SRequestObj* pRequest) { pTmp = acquireRequest(pTmp->relation.prevRefId); if (pTmp) { mostPrevReqRefId = pTmp->self; - releaseRequest(mostPrevReqRefId); + (void)releaseRequest(mostPrevReqRefId); // ignore error } else { break; } } - removeRequest(mostPrevReqRefId); + (void)removeRequest(mostPrevReqRefId); // ignore error return mostPrevReqRefId; } @@ -534,8 +617,8 @@ void destroyNextReq(int64_t nextRefId) { if (nextRefId) { SRequestObj* pObj = acquireRequest(nextRefId); if (pObj) { - releaseRequest(nextRefId); - releaseRequest(nextRefId); + (void)releaseRequest(nextRefId); // ignore error + (void)releaseRequest(nextRefId); // ignore error } } } @@ -555,7 +638,7 @@ void destroySubRequests(SRequestObj *pRequest) { pTmp = acquireRequest(tmpRefId); if (pTmp) { pReqList[++reqIdx] = pTmp; - releaseRequest(tmpRefId); + (void)releaseRequest(tmpRefId); // ignore error } else { tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId); break; @@ -563,7 +646,7 @@ void destroySubRequests(SRequestObj *pRequest) { } for (int32_t i = reqIdx; i >= 0; i--) { - removeRequest(pReqList[i]->self); + (void)removeRequest(pReqList[i]->self); // ignore error } tmpRefId = pRequest->relation.nextRefId; @@ -571,8 +654,8 @@ void destroySubRequests(SRequestObj *pRequest) { pTmp = acquireRequest(tmpRefId); if (pTmp) { tmpRefId = pTmp->relation.nextRefId; - removeRequest(pTmp->self); - releaseRequest(pTmp->self); + (void)removeRequest(pTmp->self); // ignore error + (void)releaseRequest(pTmp->self); // ignore error } else { tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId); break; @@ -592,8 +675,10 @@ void doDestroyRequest(void *p) { int64_t nextReqRefId = pRequest->relation.nextRefId; - taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); - + int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to remove request from hash, code:%s", tstrerror(code)); + } schedulerFreeJob(&pRequest->body.queryJob, 0); destorySqlCallbackWrapper(pRequest->pWrapper); @@ -601,7 +686,7 @@ void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->msgBuf); doFreeReqResultInfo(&pRequest->body.resInfo); - tsem_destroy(&pRequest->body.rspSem); + (void)tsem_destroy(&pRequest->body.rspSem); taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->targetTableList); @@ -615,13 +700,15 @@ void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->pDb); taosArrayDestroy(pRequest->dbList); if (pRequest->body.interParam) { - tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem); + (void)tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem); } taosMemoryFree(pRequest->body.interParam); if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) { qDestroyQuery(pRequest->pQuery); - nodesSimReleaseAllocator(pRequest->allocatorRefId); + if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) { + tscError("failed to release allocator"); + } } nodesDestroyAllocator(pRequest->allocatorRefId); @@ -638,7 +725,7 @@ void destroyRequest(SRequestObj *pRequest) { } taos_stop_query(pRequest); - removeFromMostPrevReq(pRequest); + (void)removeFromMostPrevReq(pRequest); } void taosStopQueryImpl(SRequestObj *pRequest) { @@ -677,7 +764,7 @@ void stopAllQueries(SRequestObj *pRequest) { for (int32_t i = reqIdx; i >= 0; i--) { taosStopQueryImpl(pReqList[i]); - releaseRequest(pReqList[i]->self); + (void)releaseRequest(pReqList[i]->self); // ignore error } taosStopQueryImpl(pRequest); @@ -688,7 +775,7 @@ void stopAllQueries(SRequestObj *pRequest) { if (pTmp) { tmpRefId = pTmp->relation.nextRefId; taosStopQueryImpl(pTmp); - releaseRequest(pTmp->self); + (void)releaseRequest(pTmp->self); // ignore error } else { tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId); break; @@ -701,7 +788,7 @@ void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, static void *tscCrashReportThreadFp(void *param) { setThreadName("client-crashReport"); char filepath[PATH_MAX] = {0}; - snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP); + (void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP); char *pMsg = NULL; int64_t msgLen = 0; TdFilePtr pFile = NULL; @@ -774,15 +861,15 @@ int32_t tscCrashReportInit() { } TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + TSC_ERR_RET(taosThreadAttrInit(&thAttr)); + TSC_ERR_RET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); TdThread crashReportThread; if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) { tscError("failed to create crashReport thread since %s", strerror(errno)); return -1; } - taosThreadAttrDestroy(&thAttr); + TSC_ERR_RET(taosThreadAttrDestroy(&thAttr)); return 0; } @@ -842,6 +929,11 @@ void taos_init_imp(void) { appInfo.startTime = taosGetTimestampMs(); appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); appInfo.pInstMapByClusterId = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) { + tscError("failed to allocate memory when init appInfo"); + tscInitRes = TSDB_CODE_OUT_OF_MEMORY; + return; + } taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); deltaToUtcInitOnce(); @@ -849,7 +941,7 @@ void taos_init_imp(void) { #ifdef CUS_PROMPT snprintf(logDirName, 64, "%slog", CUS_PROMPT); #else - snprintf(logDirName, 64, "taoslog"); + (void)snprintf(logDirName, 64, "taoslog"); #endif if (taosCreateLog(logDirName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { printf(" WARING: Create %s failed:%s. configDir=%s\n", logDirName, strerror(errno), configDir); @@ -857,24 +949,12 @@ void taos_init_imp(void) { return; } - if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) { - tscInitRes = -1; - return; - } + ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg"); initQueryModuleMsgHandle(); - - if (taosConvInit() != 0) { - tscInitRes = -1; - tscError("failed to init conv"); - return; - } - if (monitorInit() != 0){ - tscInitRes = -1; - tscError("failed to init monitor"); - return; - } - rpcInit(); + ENV_ERR_RET(taosConvInit(), "failed to init conv"); + ENV_ERR_RET(monitorInit(), "failed to init monitor"); + ENV_ERR_RET(rpcInit(), "failed to init rpc"); if (InitRegexCache() != 0) { tscInitRes = -1; @@ -883,34 +963,25 @@ void taos_init_imp(void) { } SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; - catalogInit(&cfg); + ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog"); + ENV_ERR_RET(schedulerInit(), "failed to init scheduler"); - schedulerInit(); tscDebug("starting to initialize TAOS driver"); #ifndef WINDOWS taosSetCoreDump(true); #endif - if (initTaskQueue() != 0){ - tscInitRes = -1; - tscError("failed to init task queue"); - return; - } - if (fmFuncMgtInit() != TSDB_CODE_SUCCESS) { - tscInitRes = -1; - tscError("failed to init function manager"); - return; - } - nodesInitAllocatorSet(); + ENV_ERR_RET(initTaskQueue(), "failed to init task queue"); + ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt"); + ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set"); clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); - taosGetAppName(appInfo.appName, NULL); - taosThreadMutexInit(&appInfo.mutex, NULL); - - tscCrashReportInit(); + ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name"); + ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex"); + ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report"); tscDebug("client is initialized successfully"); } @@ -957,7 +1028,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { return -1; } newstr[0] = '"'; - memcpy(newstr+1, str, len); + (void)memcpy(newstr+1, str, len); newstr[len + 1] = '"'; newstr[len + 2] = '\0'; str = newstr; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index e45ca2b872..6833adb3bd 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -44,28 +44,34 @@ static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { retur static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; SUserAuthBatchRsp batchRsp = {0}; if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + return TSDB_CODE_INVALID_MSG; } int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i); + if (NULL == rsp) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _return; + } tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version); - catalogUpdateUserAuthInfo(pCatalog, rsp); + TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp)); } - if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp); + if (numOfBatchs > 0) { + TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp)); + } - atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2); + (void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2); +_return: taosArrayDestroy(batchRsp.pArray); - return TSDB_CODE_SUCCESS; + return code; } static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) { @@ -93,7 +99,9 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat } } if (!pRsp) { - releaseTscObj(pReq->connKey.tscRid); + if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) { + tscError("failed to release tscObj"); + } taosHashCancelIterate(hbMgr->activeInfo, pReq); break; } @@ -108,7 +116,9 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat } } } - releaseTscObj(pReq->connKey.tscRid); + if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) { + tscError("failed to release tscObj"); + } continue; } @@ -153,7 +163,9 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user, oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id); } - releaseTscObj(pReq->connKey.tscRid); + if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) { + tscError("failed to release tscObj"); + } } } return 0; @@ -211,6 +223,10 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i); + if (NULL == rsp) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _return; + } if (rsp->useDbRsp) { tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db, rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid); @@ -227,7 +243,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db); - catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo); + TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo)); if (IS_SYS_DBNAME(rsp->useDbRsp->db)) { code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp); @@ -235,23 +251,28 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog goto _return; } - catalogUpdateDBVgInfo(pCatalog, - (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, - rsp->useDbRsp->uid, vgInfo); + TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, + (rsp->useDbRsp->db[0] == 'i') ? + TSDB_PERFORMANCE_SCHEMA_DB : + TSDB_INFORMATION_SCHEMA_DB, + rsp->useDbRsp->uid, vgInfo)); } } } if (rsp->cfgRsp) { tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion); - catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp); + code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp); rsp->cfgRsp = NULL; } if (rsp->pTsmaRsp) { if (rsp->pTsmaRsp->pTsmas) { for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) { STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i); - catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion); + if (NULL == pTsma) { + TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE); + } + TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion)); } taosArrayClear(rsp->pTsmaRsp->pTsmas); } @@ -265,7 +286,7 @@ _return: } static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; SSTbHbRsp hbRsp = {0}; if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) { @@ -276,10 +297,13 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp); for (int32_t i = 0; i < numOfMeta; ++i) { STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i); - + if (NULL == rsp) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _return; + } if (rsp->numOfColumns < 0) { tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); - catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid); + TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid)); } else { tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -288,22 +312,26 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_TSC_INVALID_VALUE; } - catalogAsyncUpdateTableMeta(pCatalog, rsp); + TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp)); } } int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp); for (int32_t i = 0; i < numOfIndex; ++i) { STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i); - - catalogUpdateTableIndex(pCatalog, rsp); + if (NULL == rsp) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _return; + } + TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp)); } +_return: taosArrayDestroy(hbRsp.pIndexRsp); hbRsp.pIndexRsp = NULL; tFreeSSTbHbRsp(&hbRsp); - return TSDB_CODE_SUCCESS; + return code; } static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { @@ -320,7 +348,7 @@ static void hbFreeSViewMetaInRsp(void *p) { } static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; SViewHbRsp hbRsp = {0}; if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) { @@ -332,20 +360,25 @@ static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatal int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp); for (int32_t i = 0; i < numOfMeta; ++i) { SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i); - + if (NULL == rsp) { + code = TSDB_CODE_OUT_OF_RANGE; + goto _return; + } if (rsp->numOfCols < 0) { tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name); - catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId); + code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId); tFreeSViewMetaRsp(rsp); taosMemoryFreeClear(rsp); } else { tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name); - catalogUpdateViewMeta(pCatalog, rsp); + code = catalogUpdateViewMeta(pCatalog, rsp); } + TSC_ERR_JRET(code); } +_return: taosArrayDestroy(hbRsp.pViewRsp); - return TSDB_CODE_SUCCESS; + return code; } static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { @@ -363,30 +396,38 @@ static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog * if (!pTsmaInfo->pFuncs) { tscDebug("hb to remove tsma: %s.%s", pTsmaInfo->dbFName, pTsmaInfo->name); - catalogRemoveTSMA(pCatalog, pTsmaInfo); + code = catalogRemoveTSMA(pCatalog, pTsmaInfo); tFreeAndClearTableTSMAInfo(pTsmaInfo); } else { tscDebug("hb to update tsma: %s.%s", pTsmaInfo->dbFName, pTsmaInfo->name); - catalogUpdateTSMA(pCatalog, &pTsmaInfo); + code = catalogUpdateTSMA(pCatalog, &pTsmaInfo); tFreeAndClearTableTSMAInfo(pTsmaInfo); } + TSC_ERR_JRET(code); } +_return: taosArrayDestroy(hbRsp.pTsmas); - return TSDB_CODE_SUCCESS; + return code; } static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) { for (int32_t i = 0; i < kvNum; ++i) { SKv *kv = taosArrayGet(pKvs, i); + if (NULL == kv) { + tscError("invalid hb kv, idx:%d", i); + continue; + } switch (kv->key) { case HEARTBEAT_KEY_USER_AUTHINFO: { if (kv->valueLen <= 0 || NULL == kv->value) { tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value); break; } - - hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr); + if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) { + tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value); + break; + } break; } case HEARTBEAT_KEY_DBINFO: { @@ -394,8 +435,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value); break; } - - hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); + if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) { + tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value); + break; + } break; } case HEARTBEAT_KEY_STBINFO: { @@ -403,8 +446,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value); break; } - - hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog); + if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) { + tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value); + break; + } break; } #ifdef TD_ENTERPRISE @@ -413,8 +458,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value); break; } - - hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog); + if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) { + tscError("Process dyn view response failed, len: %d, value: %p", kv->valueLen, kv->value); + break; + } break; } case HEARTBEAT_KEY_VIEWINFO: { @@ -422,8 +469,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value); break; } - - hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog); + if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) { + tscError("Process view info response failed, len: %d, value: %p", kv->valueLen, kv->value); + break; + } break; } #endif @@ -431,7 +480,9 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p if (kv->valueLen <= 0 || !kv->value) { tscError("Invalid tsma info, len: %d, value: %p", kv->valueLen, kv->value); } - hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog); + if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) { + tscError("Process tsma info response failed, len: %d, value: %p", kv->valueLen, kv->value); + } break; } default: @@ -478,7 +529,9 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid); } else { taos_stop_query((TAOS_RES *)pRequest); - releaseRequest(pRsp->query->killRid); + if (TSDB_CODE_SUCCESS != releaseRequest(pRsp->query->killRid)) { + tscWarn("release request failed"); + } } } @@ -487,10 +540,14 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } if (pRsp->query->pQnodeList) { - updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList); + if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) { + tscWarn("update qnode list failed"); + } } - releaseTscObj(pRsp->connKey.tscRid); + if (TSDB_CODE_SUCCESS != releaseTscObj(pRsp->connKey.tscRid)) { + tscWarn("release tscobj failed"); + } } } @@ -508,11 +565,13 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } } + taosHashRelease(pAppHbMgr->activeInfo, pReq); return TSDB_CODE_SUCCESS; } +//TODO(smj) static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { if (0 == atomic_load_8(&clientHbMgr.inited)) { goto _return; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a2bae7f449..f5c7751ec2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -52,7 +52,7 @@ static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_D static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) { char key[512] = {0}; - snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port); + (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port); return taosStrdup(key); } @@ -68,36 +68,30 @@ bool chkRequestKilled(void* param) { return killed; } -static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, - SAppInstInfo* pAppInfo, int connType); - -STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, - uint16_t port, int connType) { - if (taos_init() != TSDB_CODE_SUCCESS) { - return NULL; - } +static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, + SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj); +int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, + uint16_t port, int connType, STscObj** pObj) { + TSC_ERR_RET(taos_init()); if (!validateUserName(user)) { - terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH; - return NULL; + TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH); } char localDb[TSDB_DB_NAME_LEN] = {0}; if (db != NULL && strlen(db) > 0) { if (!validateDbName(db)) { - terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; - return NULL; + TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH); } tstrncpy(localDb, db, sizeof(localDb)); - strdequote(localDb); + (void)strdequote(localDb); } char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; if (auth == NULL) { if (!validatePassword(pass)) { - terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH; - return NULL; + TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH); } taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt); @@ -107,13 +101,9 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas SCorEpSet epSet = {0}; if (ip) { - if (initEpSetFromCfg(ip, NULL, &epSet) < 0) { - return NULL; - } + TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet)); } else { - if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) { - return NULL; - } + TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet)); } if (port) { @@ -122,7 +112,9 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas } char* key = getClusterKey(user, secretEncrypt, ip, port); - + if (NULL == key) { + TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse, user, db, key); for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) { @@ -130,29 +122,44 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas } SAppInstInfo** pInst = NULL; - taosThreadMutexLock(&appInfo.mutex); + int32_t code = taosThreadMutexLock(&appInfo.mutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to lock app info, code:%s", tstrerror(code)); + } pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); SAppInstInfo* p = NULL; if (pInst == NULL) { p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo)); + if (NULL == p) { + taosThreadMutexUnlock(&appInfo.mutex); + taosMemoryFreeClear(key); + TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } p->mgmtEp = epSet; taosThreadMutexInit(&p->qnodeMutex, NULL); - p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2); - if (p->pTransporter == NULL) { + code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter); + if (TSDB_CODE_SUCCESS != code) { taosThreadMutexUnlock(&appInfo.mutex); taosMemoryFreeClear(key); taosMemoryFree(p); - return NULL; + TSC_ERR_RET(code); } p->pAppHbMgr = appHbMgrInit(p, key); if (NULL == p->pAppHbMgr) { destroyAppInst(&p); taosThreadMutexUnlock(&appInfo.mutex); taosMemoryFreeClear(key); - return NULL; + // TODO(smj) : change this to right code. + TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); + if (TSDB_CODE_SUCCESS != code) { + destroyAppInst(&p); + taosThreadMutexUnlock(&appInfo.mutex); + taosMemoryFreeClear(key); + TSC_ERR_RET(code); } - taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); p->instKey = key; key = NULL; tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port); @@ -164,11 +171,14 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0); } - taosThreadMutexUnlock(&appInfo.mutex); + code = taosThreadMutexUnlock(&appInfo.mutex); + if (TSDB_CODE_SUCCESS != code) { + tscError("failed to unlock app info, code:%s", tstrerror(code)); + } taosMemoryFreeClear(key); - return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType); + return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj); } //SAppInstInfo* getAppInstInfo(const char* clusterKey) { @@ -188,10 +198,10 @@ void freeQueryParam(SSyncQueryParam* param) { int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest, int64_t reqid) { - *pRequest = createRequest(connId, TSDB_SQL_SELECT, reqid); - if (*pRequest == NULL) { + int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest); + if (TSDB_CODE_SUCCESS != code) { tscError("failed to malloc sqlObj, %s", sql); - return terrno; + return code; } (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1); @@ -1405,18 +1415,19 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe return 0; } -STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, - SAppInstInfo* pAppInfo, int connType) { - STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo); - if (NULL == pTscObj) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return pTscObj; +int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, + SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) { + *pTscObj = NULL; + int32_t code = createTscObj(user, auth, db, connType, pAppInfo, (void**)&pTscObj); + if (TSDB_CODE_SUCCESS != code) { + return code; } - SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT, 0); - if (pRequest == NULL) { - destroyTscObj(pTscObj); - return NULL; + SRequestObj* pRequest = NULL; + code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest); + if (TSDB_CODE_SUCCESS != code) { + destroyTscObj(*pTscObj); + return code; } pRequest->sqlstr = taosStrdup("taos_connect"); @@ -1427,25 +1438,23 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body); - tsem_wait(&pRequest->body.rspSem); - if (pRequest->code != TSDB_CODE_SUCCESS) { - const char* errorMsg = - (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); + code = tsem_wait(&pRequest->body.rspSem); + if (code != TSDB_CODE_SUCCESS) { + const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); tscError("failed to connect to server, reason: %s", errorMsg); - terrno = pRequest->code; destroyRequest(pRequest); - taos_close_internal(pTscObj); - pTscObj = NULL; + taos_close_internal(*pTscObj); + *pTscObj = NULL; } else { - tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id, - pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); + tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, (*pTscObj)->id, + (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } - return pTscObj; + return code; } static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { @@ -1654,8 +1663,9 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons return NULL; } - STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY); - if (pObj) { + STscObj* pObj = NULL; + int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj); + if (TSDB_CODE_SUCCESS == code) { int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t)); *rid = pObj->id; return (TAOS*)rid; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 0618e233a0..2b2241b2f8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -115,8 +115,9 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha pass = TSDB_DEFAULT_PASS; } - STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY); - if (pObj) { + STscObj *pObj = NULL; + int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj); + if (TSDB_CODE_SUCCESS == code) { int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t)); *rid = pObj->id; return (TAOS *)rid; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 9e1ff6e5e5..e028f76fe3 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1537,8 +1537,8 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat SQuery* pQuery = NULL; SHashObj* pVgHash = NULL; - SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid); - RAW_NULL_CHECK(pRequest); + SRequestObj* pRequest = NULL; + RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid, &pRequest)); uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE, rows, pData, tbname, fields, numFields); @@ -1597,8 +1597,8 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha SQuery* pQuery = NULL; SHashObj* pVgHash = NULL; - SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid); - RAW_NULL_CHECK(pRequest); + SRequestObj* pRequest = NULL; + RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid, &pRequest)); uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname); @@ -1667,8 +1667,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; - SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); - RAW_NULL_CHECK(pRequest); + SRequestObj* pRequest = NULL; + RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; @@ -1778,8 +1778,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) STableMeta* pTableMeta = NULL; SVCreateTbReq* pCreateReqDst = NULL; - SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); - RAW_NULL_CHECK(pRequest); + SRequestObj* pRequest = NULL; + RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 00445b4d12..11c0bd6d6f 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2137,8 +2137,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, SSmlHandle *info = NULL; int cnt = 0; while (1) { - request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); - if (request == NULL) { + code = createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid, &request); + if (TSDB_CODE_SUCCESS != code) { uError("SML:taos_schemaless_insert error request is null"); return NULL; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 650d262870..6d9956122c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1237,8 +1237,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { } // init connection - pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); - if (pTmq->pTscObj == NULL) { + code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj); + if (code) { + terrno = code; tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); (void)tsem2_destroy(&pTmq->rspSem); SET_ERROR_MSG_TMQ("init tscObj failed") diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3801ae9ffd..61672478c9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -156,6 +156,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_ERROR, "Invalid encode param" TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_NULL, "Not found compress param") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_PARAM_ERROR, "Invalid compress param") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR, "Invalid compress level param") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FAIL_GENERATE_JSON, "failed to generate JSON") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INTERNAL_ERROR, "Internal error")