enh:[TD-31070] Handling return value of clientEnv.c
This commit is contained in:
parent
99db448f18
commit
99b0f1c652
|
@ -198,7 +198,8 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_TSC_ENCODE_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0231)
|
||||
#define TSDB_CODE_TSC_ENCODE_PARAM_NULL TAOS_DEF_ERROR_CODE(0, 0X0232)
|
||||
#define TSDB_CODE_TSC_COMPRESS_PARAM_ERROR TAOS_DEF_ERROR_CODE(0, 0X0233)
|
||||
#define TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR TAOS_DEF_ERROR_CODE(0, 0X0234)
|
||||
#define TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR TAOS_DEF_ERROR_CODE(0, 0X0234)
|
||||
#define TSDB_CODE_TSC_FAIL_GENERATE_JSON TAOS_DEF_ERROR_CODE(0, 0X0235)
|
||||
#define TSDB_CODE_TSC_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0X02FF)
|
||||
|
||||
// mnode-common
|
||||
|
|
|
@ -348,7 +348,8 @@ __async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
|
|||
|
||||
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
|
||||
|
||||
void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo);
|
||||
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
|
||||
void **p);
|
||||
void destroyTscObj(void* pObj);
|
||||
STscObj* acquireTscObj(int64_t rid);
|
||||
int32_t releaseTscObj(int64_t rid);
|
||||
|
@ -356,7 +357,7 @@ void destroyAppInst(void* pAppInfo);
|
|||
|
||||
uint64_t generateRequestId();
|
||||
|
||||
void* createRequest(uint64_t connId, int32_t type, int64_t reqid);
|
||||
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest);
|
||||
void destroyRequest(SRequestObj* pRequest);
|
||||
SRequestObj* acquireRequest(int64_t rid);
|
||||
int32_t releaseRequest(int64_t rid);
|
||||
|
@ -370,7 +371,7 @@ void resetConnectDB(STscObj* pTscObj);
|
|||
|
||||
int taos_options_imp(TSDB_OPTION option, const char* str);
|
||||
|
||||
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
|
||||
int32_t openTransporter(const char* user, const char* auth, int32_t numOfThreads, void **pDnodeConn);
|
||||
void tscStopCrashReport();
|
||||
|
||||
typedef struct AsyncArg {
|
||||
|
@ -381,8 +382,8 @@ typedef struct AsyncArg {
|
|||
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
|
||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
||||
|
||||
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType);
|
||||
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType, STscObj** pObj);
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||
|
||||
|
@ -443,6 +444,30 @@ void freeQueryParam(SSyncQueryParam* param);
|
|||
int32_t clientParseSqlImpl(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effeciveUser, SParseSqlRes* pRes);
|
||||
#endif
|
||||
|
||||
#define TSC_ERR_RET(c) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
return _code; \
|
||||
} \
|
||||
} while (0)
|
||||
#define TSC_RET(c) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
} \
|
||||
return _code; \
|
||||
} while (0)
|
||||
#define TSC_ERR_JRET(c) \
|
||||
do { \
|
||||
code = c; \
|
||||
if (code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = code; \
|
||||
goto _return; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
void slowQueryLog(int64_t rid, bool killed, int32_t code, int32_t cost);
|
||||
|
||||
|
|
|
@ -44,6 +44,26 @@
|
|||
#define TSC_VAR_NOT_RELEASE 1
|
||||
#define TSC_VAR_RELEASED 0
|
||||
|
||||
#define ENV_JSON_FALSE_CHECK(c) \
|
||||
do { \
|
||||
if (!c) { \
|
||||
tscError("faild to add item to JSON object");\
|
||||
code = TSDB_CODE_TSC_FAIL_GENERATE_JSON; \
|
||||
goto _end; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define ENV_ERR_RET(c,info) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
errno = _code; \
|
||||
tscInitRes = _code; \
|
||||
tscError(info); \
|
||||
return; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
STscDbg tscDbg = {0};
|
||||
SAppInfo appInfo;
|
||||
int64_t lastClusterId = 0;
|
||||
|
@ -57,8 +77,14 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
|
|||
volatile int32_t tscInitRes = 0;
|
||||
|
||||
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
// connection has been released already, abort creating request.
|
||||
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
|
||||
if (pRequest->self < 0) {
|
||||
tscError("failed to add ref to request");
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
|
||||
|
||||
|
@ -72,19 +98,23 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
|
|||
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void concatStrings(SArray *list, char* buf, int size){
|
||||
int len = 0;
|
||||
for(int i = 0; i < taosArrayGetSize(list); i++){
|
||||
char* db = taosArrayGet(list, i);
|
||||
if (NULL == db) {
|
||||
tscError("get dbname failed, buf:%s", buf);
|
||||
break;
|
||||
}
|
||||
char* dot = strchr(db, '.');
|
||||
if (dot != NULL) {
|
||||
db = dot + 1;
|
||||
}
|
||||
if (i != 0){
|
||||
strcat(buf, ",");
|
||||
(void)strcat(buf, ",");
|
||||
len += 1;
|
||||
}
|
||||
int ret = snprintf(buf + len, size - len, "%s", db);
|
||||
|
@ -100,61 +130,70 @@ static void concatStrings(SArray *list, char* buf, int size){
|
|||
}
|
||||
}
|
||||
|
||||
static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration){
|
||||
static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_t reqType, int64_t duration){
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (json == NULL) {
|
||||
tscError("[monitor] cJSON_CreateObject failed");
|
||||
return;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
char clusterId[32] = {0};
|
||||
if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0){
|
||||
tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
|
||||
code = TSDB_CODE_FAILED;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
char startTs[32] = {0};
|
||||
if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start/1000) < 0){
|
||||
tscError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000);
|
||||
code = TSDB_CODE_FAILED;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
char requestId[32] = {0};
|
||||
if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0){
|
||||
tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
|
||||
code = TSDB_CODE_FAILED;
|
||||
goto _end;
|
||||
}
|
||||
cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId));
|
||||
cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs));
|
||||
cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId));
|
||||
cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration/1000));
|
||||
cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code));
|
||||
cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code)));
|
||||
cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType));
|
||||
cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "request_id", cJSON_CreateString(requestId)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "query_time", cJSON_CreateNumber(duration/1000)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
|
||||
if(pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){
|
||||
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
|
||||
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
|
||||
cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
|
||||
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp;
|
||||
}else{
|
||||
cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
|
||||
}
|
||||
|
||||
cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user));
|
||||
cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName));
|
||||
cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "user", cJSON_CreateString(pTscObj->user)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_name", cJSON_CreateString(appInfo.appName)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "ip", cJSON_CreateString(tsLocalFqdn)));
|
||||
|
||||
char pid[32] = {0};
|
||||
if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0){
|
||||
tscError("failed to generate pid:%d", appInfo.pid);
|
||||
code = TSDB_CODE_FAILED;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)));
|
||||
if(pRequest->dbList != NULL){
|
||||
char dbList[1024] = {0};
|
||||
concatStrings(pRequest->dbList, dbList, sizeof(dbList) - 1);
|
||||
cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(dbList)));
|
||||
}else if(pRequest->pDb != NULL){
|
||||
cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)));
|
||||
}else{
|
||||
cJSON_AddItemToObject(json, "db", cJSON_CreateString(""));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "db", cJSON_CreateString("")));
|
||||
}
|
||||
|
||||
char* value = cJSON_PrintUnformatted(json);
|
||||
|
@ -162,11 +201,15 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
|
|||
data.clusterId = pTscObj->pAppInfo->clusterId;
|
||||
data.type = SLOW_LOG_WRITE;
|
||||
data.data = value;
|
||||
if(monitorPutData2MonitorQueue(data) < 0){
|
||||
code = monitorPutData2MonitorQueue(data);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemoryFree(value);
|
||||
goto _end;
|
||||
}
|
||||
|
||||
_end:
|
||||
cJSON_Delete(json);
|
||||
return code;
|
||||
}
|
||||
|
||||
static bool checkSlowLogExceptDb(SRequestObj *pRequest, char* exceptDb) {
|
||||
|
@ -176,6 +219,10 @@ static bool checkSlowLogExceptDb(SRequestObj *pRequest, char* exceptDb) {
|
|||
|
||||
for (int i = 0; i < taosArrayGetSize(pRequest->dbList); i++) {
|
||||
char *db = taosArrayGet(pRequest->dbList, i);
|
||||
if (NULL == db) {
|
||||
tscError("get dbname failed, exceptDb:%s", exceptDb);
|
||||
return false;
|
||||
}
|
||||
char *dot = strchr(db, '.');
|
||||
if (dot != NULL) {
|
||||
db = dot + 1;
|
||||
|
@ -215,7 +262,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
"us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
|
||||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
||||
(void)atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
|
||||
reqType = SLOW_LOG_TYPE_INSERT;
|
||||
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
||||
tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
|
||||
|
@ -223,11 +270,13 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
|
||||
pRequest->metric.planCostUs, pRequest->metric.execCostUs);
|
||||
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
||||
(void)atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
|
||||
reqType = SLOW_LOG_TYPE_QUERY;
|
||||
}
|
||||
|
||||
nodesSimReleaseAllocator(pRequest->allocatorRefId);
|
||||
if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
|
||||
tscError("failed to release allocator");
|
||||
}
|
||||
}
|
||||
|
||||
if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){
|
||||
|
@ -242,19 +291,23 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
|
||||
if ((duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL || duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThresholdTest * 1000000UL) &&
|
||||
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->monitorParas.tsSlowLogExceptDb)) {
|
||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
|
||||
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
|
||||
if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) {
|
||||
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s",
|
||||
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
|
||||
pRequest->sqlstr);
|
||||
if(pTscObj->pAppInfo->monitorParas.tsEnableMonitor){
|
||||
slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
|
||||
generateWriteSlowLog(pTscObj, pRequest, reqType, duration);
|
||||
if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
|
||||
tscError("failed to generate write slow log");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
releaseTscObj(pTscObj->id);
|
||||
if (TSDB_CODE_SUCCESS != releaseTscObj(pTscObj->id)) {
|
||||
tscError("failed to release STscObj");
|
||||
}
|
||||
}
|
||||
|
||||
// todo close the transporter properly
|
||||
|
@ -289,9 +342,9 @@ static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
|
|||
}
|
||||
|
||||
// TODO refactor
|
||||
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
||||
int32_t openTransporter(const char *user, const char *auth, int32_t numOfThread, void **pDnodeConn) {
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
(void)memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "TSC";
|
||||
rpcInit.numOfThreads = tsNumOfRpcThreads;
|
||||
|
@ -315,15 +368,19 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
|
|||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||
|
||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
|
||||
void *pDnodeConn = rpcOpen(&rpcInit);
|
||||
if (pDnodeConn == NULL) {
|
||||
tscError("failed to init connection to server");
|
||||
return NULL;
|
||||
int32_t code = taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("invalid version string.");
|
||||
return code;
|
||||
}
|
||||
|
||||
return pDnodeConn;
|
||||
*pDnodeConn = rpcOpen(&rpcInit);
|
||||
if (*pDnodeConn == NULL) {
|
||||
tscError("failed to init connection to server.");
|
||||
code = TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void destroyAllRequests(SHashObj *pRequests) {
|
||||
|
@ -334,7 +391,7 @@ void destroyAllRequests(SHashObj *pRequests) {
|
|||
SRequestObj *pRequest = acquireRequest(*rid);
|
||||
if (pRequest) {
|
||||
destroyRequest(pRequest);
|
||||
releaseRequest(*rid);
|
||||
(void)releaseRequest(*rid); // ignore error
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pRequests, pIter);
|
||||
|
@ -349,7 +406,7 @@ void stopAllRequests(SHashObj *pRequests) {
|
|||
SRequestObj *pRequest = acquireRequest(*rid);
|
||||
if (pRequest) {
|
||||
taos_stop_query(pRequest);
|
||||
releaseRequest(*rid);
|
||||
(void)releaseRequest(*rid); // ignore error
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pRequests, pIter);
|
||||
|
@ -360,18 +417,31 @@ void destroyAppInst(void *info) {
|
|||
SAppInstInfo* pAppInfo = *(SAppInstInfo**)info;
|
||||
tscDebug("destroy app inst mgr %p", pAppInfo);
|
||||
|
||||
taosThreadMutexLock(&appInfo.mutex);
|
||||
int32_t code = taosThreadMutexLock(&appInfo.mutex);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to lock app info, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
|
||||
|
||||
taosThreadMutexUnlock(&appInfo.mutex);
|
||||
code = taosThreadMutexUnlock(&appInfo.mutex);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to unlock app info, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pAppInfo->instKey);
|
||||
closeTransporter(pAppInfo);
|
||||
|
||||
taosThreadMutexLock(&pAppInfo->qnodeMutex);
|
||||
code = taosThreadMutexLock(&pAppInfo->qnodeMutex);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to lock qnode mutex, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
taosArrayDestroy(pAppInfo->pQnodeList);
|
||||
taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
|
||||
code = taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to unlock qnode mutex, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
taosMemoryFree(pAppInfo);
|
||||
}
|
||||
|
@ -396,97 +466,110 @@ void destroyTscObj(void *pObj) {
|
|||
pTscObj->pAppInfo->numOfConns);
|
||||
|
||||
// In any cases, we should not free app inst here. Or an race condition rises.
|
||||
/*int64_t connNum = */ atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
/*int64_t connNum = */ (void)atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
|
||||
taosThreadMutexDestroy(&pTscObj->mutex);
|
||||
(void)taosThreadMutexDestroy(&pTscObj->mutex);
|
||||
taosMemoryFree(pTscObj);
|
||||
|
||||
tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
|
||||
}
|
||||
|
||||
void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
|
||||
STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
|
||||
int32_t createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo,
|
||||
void **p) {
|
||||
STscObj *pObj = (STscObj *)*p;
|
||||
pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
|
||||
if (NULL == pObj) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pObj->pRequests) {
|
||||
taosMemoryFree(pObj);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pObj->connType = connType;
|
||||
pObj->pAppInfo = pAppInfo;
|
||||
pObj->appHbMgrIdx = pAppInfo->pAppHbMgr->idx;
|
||||
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
||||
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
|
||||
(void)memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
|
||||
|
||||
if (db != NULL) {
|
||||
tstrncpy(pObj->db, db, tListLen(pObj->db));
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&pObj->mutex, NULL);
|
||||
pObj->id = taosAddRef(clientConnRefPool, pObj);
|
||||
int32_t code = taosThreadMutexInit(&pObj->mutex, NULL);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return TAOS_SYSTEM_ERROR(code);
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);
|
||||
pObj->id = taosAddRef(clientConnRefPool, pObj);
|
||||
if (pObj->id < 0) {
|
||||
tscError("failed to add object to clientConnRefPool");
|
||||
code = terrno;
|
||||
taosMemoryFree(pObj);
|
||||
return code;
|
||||
}
|
||||
|
||||
(void)atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);
|
||||
|
||||
tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj);
|
||||
return pObj;
|
||||
return code;
|
||||
}
|
||||
|
||||
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
|
||||
|
||||
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
|
||||
|
||||
void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
|
||||
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
|
||||
if (NULL == pRequest) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj **pRequest) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
|
||||
if (NULL == *pRequest) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = acquireTscObj(connId);
|
||||
if (pTscObj == NULL) {
|
||||
taosMemoryFree(pRequest);
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return NULL;
|
||||
code = TSDB_CODE_TSC_DISCONNECTED;
|
||||
goto _return;
|
||||
}
|
||||
SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
||||
if (interParam == NULL) {
|
||||
releaseTscObj(connId);
|
||||
doDestroyRequest(pRequest);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
if (TSDB_CODE_SUCCESS != releaseTscObj(connId)) {
|
||||
tscError("failed to release TscObj");
|
||||
}
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _return;
|
||||
}
|
||||
tsem_init(&interParam->sem, 0, 0);
|
||||
interParam->pRequest = pRequest;
|
||||
pRequest->body.interParam = interParam;
|
||||
TSC_ERR_JRET(tsem_init(&interParam->sem, 0, 0));
|
||||
interParam->pRequest = *pRequest;
|
||||
(*pRequest)->body.interParam = interParam;
|
||||
|
||||
pRequest->resType = RES_TYPE__QUERY;
|
||||
pRequest->requestId = reqid == 0 ? generateRequestId() : reqid;
|
||||
pRequest->metric.start = taosGetTimestampUs();
|
||||
(*pRequest)->resType = RES_TYPE__QUERY;
|
||||
(*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
|
||||
(*pRequest)->metric.start = taosGetTimestampUs();
|
||||
|
||||
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
|
||||
pRequest->type = type;
|
||||
pRequest->allocatorRefId = -1;
|
||||
(*pRequest)->body.resInfo.convertUcs4 = true; // convert ucs4 by default
|
||||
(*pRequest)->type = type;
|
||||
(*pRequest)->allocatorRefId = -1;
|
||||
|
||||
pRequest->pDb = getDbOfConnection(pTscObj);
|
||||
pRequest->pTscObj = pTscObj;
|
||||
pRequest->inCallback = false;
|
||||
(*pRequest)->pDb = getDbOfConnection(pTscObj);
|
||||
(*pRequest)->pTscObj = pTscObj;
|
||||
(*pRequest)->inCallback = false;
|
||||
|
||||
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||
tsem_init(&pRequest->body.rspSem, 0, 0);
|
||||
|
||||
if (registerRequest(pRequest, pTscObj)) {
|
||||
doDestroyRequest(pRequest);
|
||||
return NULL;
|
||||
(*pRequest)->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||
if (NULL == (*pRequest)->msgBuf) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _return;
|
||||
}
|
||||
(*pRequest)->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||
TSC_ERR_JRET(tsem_init(&(*pRequest)->body.rspSem, 0, 0));
|
||||
TSC_ERR_JRET(registerRequest(*pRequest, pTscObj));
|
||||
|
||||
return pRequest;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_return:
|
||||
doDestroyRequest(*pRequest);
|
||||
return code;
|
||||
}
|
||||
|
||||
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
||||
|
@ -521,12 +604,12 @@ int64_t removeFromMostPrevReq(SRequestObj* pRequest) {
|
|||
pTmp = acquireRequest(pTmp->relation.prevRefId);
|
||||
if (pTmp) {
|
||||
mostPrevReqRefId = pTmp->self;
|
||||
releaseRequest(mostPrevReqRefId);
|
||||
(void)releaseRequest(mostPrevReqRefId); // ignore error
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
removeRequest(mostPrevReqRefId);
|
||||
(void)removeRequest(mostPrevReqRefId); // ignore error
|
||||
return mostPrevReqRefId;
|
||||
}
|
||||
|
||||
|
@ -534,8 +617,8 @@ void destroyNextReq(int64_t nextRefId) {
|
|||
if (nextRefId) {
|
||||
SRequestObj* pObj = acquireRequest(nextRefId);
|
||||
if (pObj) {
|
||||
releaseRequest(nextRefId);
|
||||
releaseRequest(nextRefId);
|
||||
(void)releaseRequest(nextRefId); // ignore error
|
||||
(void)releaseRequest(nextRefId); // ignore error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -555,7 +638,7 @@ void destroySubRequests(SRequestObj *pRequest) {
|
|||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
(void)releaseRequest(tmpRefId); // ignore error
|
||||
} else {
|
||||
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
|
@ -563,7 +646,7 @@ void destroySubRequests(SRequestObj *pRequest) {
|
|||
}
|
||||
|
||||
for (int32_t i = reqIdx; i >= 0; i--) {
|
||||
removeRequest(pReqList[i]->self);
|
||||
(void)removeRequest(pReqList[i]->self); // ignore error
|
||||
}
|
||||
|
||||
tmpRefId = pRequest->relation.nextRefId;
|
||||
|
@ -571,8 +654,8 @@ void destroySubRequests(SRequestObj *pRequest) {
|
|||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
tmpRefId = pTmp->relation.nextRefId;
|
||||
removeRequest(pTmp->self);
|
||||
releaseRequest(pTmp->self);
|
||||
(void)removeRequest(pTmp->self); // ignore error
|
||||
(void)releaseRequest(pTmp->self); // ignore error
|
||||
} else {
|
||||
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
|
@ -592,8 +675,10 @@ void doDestroyRequest(void *p) {
|
|||
|
||||
int64_t nextReqRefId = pRequest->relation.nextRefId;
|
||||
|
||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||
|
||||
int32_t code = taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to remove request from hash, code:%s", tstrerror(code));
|
||||
}
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
|
||||
destorySqlCallbackWrapper(pRequest->pWrapper);
|
||||
|
@ -601,7 +686,7 @@ void doDestroyRequest(void *p) {
|
|||
taosMemoryFreeClear(pRequest->msgBuf);
|
||||
|
||||
doFreeReqResultInfo(&pRequest->body.resInfo);
|
||||
tsem_destroy(&pRequest->body.rspSem);
|
||||
(void)tsem_destroy(&pRequest->body.rspSem);
|
||||
|
||||
taosArrayDestroy(pRequest->tableList);
|
||||
taosArrayDestroy(pRequest->targetTableList);
|
||||
|
@ -615,13 +700,15 @@ void doDestroyRequest(void *p) {
|
|||
taosMemoryFreeClear(pRequest->pDb);
|
||||
taosArrayDestroy(pRequest->dbList);
|
||||
if (pRequest->body.interParam) {
|
||||
tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem);
|
||||
(void)tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem);
|
||||
}
|
||||
taosMemoryFree(pRequest->body.interParam);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) {
|
||||
qDestroyQuery(pRequest->pQuery);
|
||||
nodesSimReleaseAllocator(pRequest->allocatorRefId);
|
||||
if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) {
|
||||
tscError("failed to release allocator");
|
||||
}
|
||||
}
|
||||
nodesDestroyAllocator(pRequest->allocatorRefId);
|
||||
|
||||
|
@ -638,7 +725,7 @@ void destroyRequest(SRequestObj *pRequest) {
|
|||
}
|
||||
|
||||
taos_stop_query(pRequest);
|
||||
removeFromMostPrevReq(pRequest);
|
||||
(void)removeFromMostPrevReq(pRequest);
|
||||
}
|
||||
|
||||
void taosStopQueryImpl(SRequestObj *pRequest) {
|
||||
|
@ -677,7 +764,7 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
|
||||
for (int32_t i = reqIdx; i >= 0; i--) {
|
||||
taosStopQueryImpl(pReqList[i]);
|
||||
releaseRequest(pReqList[i]->self);
|
||||
(void)releaseRequest(pReqList[i]->self); // ignore error
|
||||
}
|
||||
|
||||
taosStopQueryImpl(pRequest);
|
||||
|
@ -688,7 +775,7 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
if (pTmp) {
|
||||
tmpRefId = pTmp->relation.nextRefId;
|
||||
taosStopQueryImpl(pTmp);
|
||||
releaseRequest(pTmp->self);
|
||||
(void)releaseRequest(pTmp->self); // ignore error
|
||||
} else {
|
||||
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
|
@ -701,7 +788,7 @@ void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop,
|
|||
static void *tscCrashReportThreadFp(void *param) {
|
||||
setThreadName("client-crashReport");
|
||||
char filepath[PATH_MAX] = {0};
|
||||
snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
|
||||
(void)snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
|
||||
char *pMsg = NULL;
|
||||
int64_t msgLen = 0;
|
||||
TdFilePtr pFile = NULL;
|
||||
|
@ -774,15 +861,15 @@ int32_t tscCrashReportInit() {
|
|||
}
|
||||
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
TSC_ERR_RET(taosThreadAttrInit(&thAttr));
|
||||
TSC_ERR_RET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
|
||||
TdThread crashReportThread;
|
||||
if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
|
||||
tscError("failed to create crashReport thread since %s", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
TSC_ERR_RET(taosThreadAttrDestroy(&thAttr));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -842,6 +929,11 @@ void taos_init_imp(void) {
|
|||
appInfo.startTime = taosGetTimestampMs();
|
||||
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
appInfo.pInstMapByClusterId = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == appInfo.pInstMap || NULL == appInfo.pInstMapByClusterId) {
|
||||
tscError("failed to allocate memory when init appInfo");
|
||||
tscInitRes = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return;
|
||||
}
|
||||
taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst);
|
||||
deltaToUtcInitOnce();
|
||||
|
||||
|
@ -849,7 +941,7 @@ void taos_init_imp(void) {
|
|||
#ifdef CUS_PROMPT
|
||||
snprintf(logDirName, 64, "%slog", CUS_PROMPT);
|
||||
#else
|
||||
snprintf(logDirName, 64, "taoslog");
|
||||
(void)snprintf(logDirName, 64, "taoslog");
|
||||
#endif
|
||||
if (taosCreateLog(logDirName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||
printf(" WARING: Create %s failed:%s. configDir=%s\n", logDirName, strerror(errno), configDir);
|
||||
|
@ -857,24 +949,12 @@ void taos_init_imp(void) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
|
||||
tscInitRes = -1;
|
||||
return;
|
||||
}
|
||||
ENV_ERR_RET(taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1), "failed to init cfg");
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
if (taosConvInit() != 0) {
|
||||
tscInitRes = -1;
|
||||
tscError("failed to init conv");
|
||||
return;
|
||||
}
|
||||
if (monitorInit() != 0){
|
||||
tscInitRes = -1;
|
||||
tscError("failed to init monitor");
|
||||
return;
|
||||
}
|
||||
rpcInit();
|
||||
ENV_ERR_RET(taosConvInit(), "failed to init conv");
|
||||
ENV_ERR_RET(monitorInit(), "failed to init monitor");
|
||||
ENV_ERR_RET(rpcInit(), "failed to init rpc");
|
||||
|
||||
if (InitRegexCache() != 0) {
|
||||
tscInitRes = -1;
|
||||
|
@ -883,34 +963,25 @@ void taos_init_imp(void) {
|
|||
}
|
||||
|
||||
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
||||
catalogInit(&cfg);
|
||||
ENV_ERR_RET(catalogInit(&cfg), "failed to init catalog");
|
||||
ENV_ERR_RET(schedulerInit(), "failed to init scheduler");
|
||||
|
||||
schedulerInit();
|
||||
tscDebug("starting to initialize TAOS driver");
|
||||
|
||||
#ifndef WINDOWS
|
||||
taosSetCoreDump(true);
|
||||
#endif
|
||||
|
||||
if (initTaskQueue() != 0){
|
||||
tscInitRes = -1;
|
||||
tscError("failed to init task queue");
|
||||
return;
|
||||
}
|
||||
if (fmFuncMgtInit() != TSDB_CODE_SUCCESS) {
|
||||
tscInitRes = -1;
|
||||
tscError("failed to init function manager");
|
||||
return;
|
||||
}
|
||||
nodesInitAllocatorSet();
|
||||
ENV_ERR_RET(initTaskQueue(), "failed to init task queue");
|
||||
ENV_ERR_RET(fmFuncMgtInit(), "failed to init funcMgt");
|
||||
ENV_ERR_RET(nodesInitAllocatorSet(), "failed to init allocator set");
|
||||
|
||||
clientConnRefPool = taosOpenRef(200, destroyTscObj);
|
||||
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
|
||||
|
||||
taosGetAppName(appInfo.appName, NULL);
|
||||
taosThreadMutexInit(&appInfo.mutex, NULL);
|
||||
|
||||
tscCrashReportInit();
|
||||
ENV_ERR_RET(taosGetAppName(appInfo.appName, NULL), "failed to get app name");
|
||||
ENV_ERR_RET(taosThreadMutexInit(&appInfo.mutex, NULL), "failed to init thread mutex");
|
||||
ENV_ERR_RET(tscCrashReportInit(), "failed to init crash report");
|
||||
|
||||
tscDebug("client is initialized successfully");
|
||||
}
|
||||
|
@ -957,7 +1028,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
|||
return -1;
|
||||
}
|
||||
newstr[0] = '"';
|
||||
memcpy(newstr+1, str, len);
|
||||
(void)memcpy(newstr+1, str, len);
|
||||
newstr[len + 1] = '"';
|
||||
newstr[len + 2] = '\0';
|
||||
str = newstr;
|
||||
|
|
|
@ -44,28 +44,34 @@ static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { retur
|
|||
|
||||
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
|
||||
SAppHbMgr *pAppHbMgr) {
|
||||
int32_t code = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SUserAuthBatchRsp batchRsp = {0};
|
||||
if (tDeserializeSUserAuthBatchRsp(value, valueLen, &batchRsp) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||
if (NULL == rsp) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
goto _return;
|
||||
}
|
||||
tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
|
||||
|
||||
catalogUpdateUserAuthInfo(pCatalog, rsp);
|
||||
TSC_ERR_JRET(catalogUpdateUserAuthInfo(pCatalog, rsp));
|
||||
}
|
||||
|
||||
if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp);
|
||||
if (numOfBatchs > 0) {
|
||||
TSC_ERR_JRET(hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp));
|
||||
}
|
||||
|
||||
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
|
||||
(void)atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
|
||||
|
||||
_return:
|
||||
taosArrayDestroy(batchRsp.pArray);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
|
||||
|
@ -93,7 +99,9 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
|
|||
}
|
||||
}
|
||||
if (!pRsp) {
|
||||
releaseTscObj(pReq->connKey.tscRid);
|
||||
if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) {
|
||||
tscError("failed to release tscObj");
|
||||
}
|
||||
taosHashCancelIterate(hbMgr->activeInfo, pReq);
|
||||
break;
|
||||
}
|
||||
|
@ -108,7 +116,9 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
|
|||
}
|
||||
}
|
||||
}
|
||||
releaseTscObj(pReq->connKey.tscRid);
|
||||
if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) {
|
||||
tscError("failed to release tscObj");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -153,7 +163,9 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
|
|||
tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user,
|
||||
oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id);
|
||||
}
|
||||
releaseTscObj(pReq->connKey.tscRid);
|
||||
if (TSDB_CODE_SUCCESS != releaseTscObj(pReq->connKey.tscRid)) {
|
||||
tscError("failed to release tscObj");
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -211,6 +223,10 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
|
||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||
SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||
if (NULL == rsp) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
goto _return;
|
||||
}
|
||||
if (rsp->useDbRsp) {
|
||||
tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->useDbRsp->db,
|
||||
rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
|
||||
|
@ -227,7 +243,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
|
||||
tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
|
||||
|
||||
catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo);
|
||||
TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo));
|
||||
|
||||
if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
|
||||
code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
|
||||
|
@ -235,23 +251,28 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
goto _return;
|
||||
}
|
||||
|
||||
catalogUpdateDBVgInfo(pCatalog,
|
||||
(rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
|
||||
rsp->useDbRsp->uid, vgInfo);
|
||||
TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog,
|
||||
(rsp->useDbRsp->db[0] == 'i') ?
|
||||
TSDB_PERFORMANCE_SCHEMA_DB :
|
||||
TSDB_INFORMATION_SCHEMA_DB,
|
||||
rsp->useDbRsp->uid, vgInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rsp->cfgRsp) {
|
||||
tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
|
||||
catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
|
||||
code = catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
|
||||
rsp->cfgRsp = NULL;
|
||||
}
|
||||
if (rsp->pTsmaRsp) {
|
||||
if (rsp->pTsmaRsp->pTsmas) {
|
||||
for (int32_t i = 0; i < rsp->pTsmaRsp->pTsmas->size; ++i) {
|
||||
STableTSMAInfo *pTsma = taosArrayGetP(rsp->pTsmaRsp->pTsmas, i);
|
||||
catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion);
|
||||
if (NULL == pTsma) {
|
||||
TSC_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
|
||||
}
|
||||
TSC_ERR_JRET(catalogAsyncUpdateTSMA(pCatalog, &pTsma, rsp->dbTsmaVersion));
|
||||
}
|
||||
taosArrayClear(rsp->pTsmaRsp->pTsmas);
|
||||
}
|
||||
|
@ -265,7 +286,7 @@ _return:
|
|||
}
|
||||
|
||||
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
int32_t code = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SSTbHbRsp hbRsp = {0};
|
||||
if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
|
||||
|
@ -276,10 +297,13 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
|
||||
for (int32_t i = 0; i < numOfMeta; ++i) {
|
||||
STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
|
||||
|
||||
if (NULL == rsp) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
goto _return;
|
||||
}
|
||||
if (rsp->numOfColumns < 0) {
|
||||
tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||
catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
|
||||
TSC_ERR_JRET(catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid));
|
||||
} else {
|
||||
tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
|
@ -288,22 +312,26 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
catalogAsyncUpdateTableMeta(pCatalog, rsp);
|
||||
TSC_ERR_JRET(catalogAsyncUpdateTableMeta(pCatalog, rsp));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
|
||||
for (int32_t i = 0; i < numOfIndex; ++i) {
|
||||
STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
|
||||
|
||||
catalogUpdateTableIndex(pCatalog, rsp);
|
||||
if (NULL == rsp) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
goto _return;
|
||||
}
|
||||
TSC_ERR_JRET(catalogUpdateTableIndex(pCatalog, rsp));
|
||||
}
|
||||
|
||||
_return:
|
||||
taosArrayDestroy(hbRsp.pIndexRsp);
|
||||
hbRsp.pIndexRsp = NULL;
|
||||
|
||||
tFreeSSTbHbRsp(&hbRsp);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
|
@ -320,7 +348,7 @@ static void hbFreeSViewMetaInRsp(void *p) {
|
|||
}
|
||||
|
||||
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
int32_t code = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SViewHbRsp hbRsp = {0};
|
||||
if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
|
||||
|
@ -332,20 +360,25 @@ static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatal
|
|||
int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
|
||||
for (int32_t i = 0; i < numOfMeta; ++i) {
|
||||
SViewMetaRsp *rsp = taosArrayGetP(hbRsp.pViewRsp, i);
|
||||
|
||||
if (NULL == rsp) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
goto _return;
|
||||
}
|
||||
if (rsp->numOfCols < 0) {
|
||||
tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
|
||||
catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
|
||||
code = catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
|
||||
tFreeSViewMetaRsp(rsp);
|
||||
taosMemoryFreeClear(rsp);
|
||||
} else {
|
||||
tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
|
||||
catalogUpdateViewMeta(pCatalog, rsp);
|
||||
code = catalogUpdateViewMeta(pCatalog, rsp);
|
||||
}
|
||||
TSC_ERR_JRET(code);
|
||||
}
|
||||
|
||||
_return:
|
||||
taosArrayDestroy(hbRsp.pViewRsp);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
|
@ -363,30 +396,38 @@ static int32_t hbprocessTSMARsp(void *value, int32_t valueLen, struct SCatalog *
|
|||
|
||||
if (!pTsmaInfo->pFuncs) {
|
||||
tscDebug("hb to remove tsma: %s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
|
||||
catalogRemoveTSMA(pCatalog, pTsmaInfo);
|
||||
code = catalogRemoveTSMA(pCatalog, pTsmaInfo);
|
||||
tFreeAndClearTableTSMAInfo(pTsmaInfo);
|
||||
} else {
|
||||
tscDebug("hb to update tsma: %s.%s", pTsmaInfo->dbFName, pTsmaInfo->name);
|
||||
catalogUpdateTSMA(pCatalog, &pTsmaInfo);
|
||||
code = catalogUpdateTSMA(pCatalog, &pTsmaInfo);
|
||||
tFreeAndClearTableTSMAInfo(pTsmaInfo);
|
||||
}
|
||||
TSC_ERR_JRET(code);
|
||||
}
|
||||
|
||||
_return:
|
||||
taosArrayDestroy(hbRsp.pTsmas);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
|
||||
for (int32_t i = 0; i < kvNum; ++i) {
|
||||
SKv *kv = taosArrayGet(pKvs, i);
|
||||
if (NULL == kv) {
|
||||
tscError("invalid hb kv, idx:%d", i);
|
||||
continue;
|
||||
}
|
||||
switch (kv->key) {
|
||||
case HEARTBEAT_KEY_USER_AUTHINFO: {
|
||||
if (kv->valueLen <= 0 || NULL == kv->value) {
|
||||
tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr);
|
||||
if (TSDB_CODE_SUCCESS != hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr)) {
|
||||
tscError("process user auth info response faild, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_DBINFO: {
|
||||
|
@ -394,8 +435,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p
|
|||
tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
if (TSDB_CODE_SUCCESS != hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog)) {
|
||||
tscError("process db info response faild, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_STBINFO: {
|
||||
|
@ -403,8 +446,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p
|
|||
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
if (TSDB_CODE_SUCCESS != hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog)) {
|
||||
tscError("process stb info response faild, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
#ifdef TD_ENTERPRISE
|
||||
|
@ -413,8 +458,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p
|
|||
tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog);
|
||||
if (TSDB_CODE_SUCCESS != hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog)) {
|
||||
tscError("Process dyn view response failed, len: %d, value: %p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_VIEWINFO: {
|
||||
|
@ -422,8 +469,10 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p
|
|||
tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
if (TSDB_CODE_SUCCESS != hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog)) {
|
||||
tscError("Process view info response failed, len: %d, value: %p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
@ -431,7 +480,9 @@ static void hbProcessQueryRspKvs(int32_t kvNum, SArray *pKvs, struct SCatalog *p
|
|||
if (kv->valueLen <= 0 || !kv->value) {
|
||||
tscError("Invalid tsma info, len: %d, value: %p", kv->valueLen, kv->value);
|
||||
}
|
||||
hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog);
|
||||
if (TSDB_CODE_SUCCESS != hbprocessTSMARsp(kv->value, kv->valueLen, pCatalog)) {
|
||||
tscError("Process tsma info response failed, len: %d, value: %p", kv->valueLen, kv->value);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -478,7 +529,9 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
tscDebug("request 0x%" PRIx64 " not exist to kill", pRsp->query->killRid);
|
||||
} else {
|
||||
taos_stop_query((TAOS_RES *)pRequest);
|
||||
releaseRequest(pRsp->query->killRid);
|
||||
if (TSDB_CODE_SUCCESS != releaseRequest(pRsp->query->killRid)) {
|
||||
tscWarn("release request failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -487,10 +540,14 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
}
|
||||
|
||||
if (pRsp->query->pQnodeList) {
|
||||
updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList);
|
||||
if (TSDB_CODE_SUCCESS != updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList)) {
|
||||
tscWarn("update qnode list failed");
|
||||
}
|
||||
}
|
||||
|
||||
releaseTscObj(pRsp->connKey.tscRid);
|
||||
if (TSDB_CODE_SUCCESS != releaseTscObj(pRsp->connKey.tscRid)) {
|
||||
tscWarn("release tscobj failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -508,11 +565,13 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
taosHashRelease(pAppHbMgr->activeInfo, pReq);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//TODO(smj)
|
||||
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
||||
if (0 == atomic_load_8(&clientHbMgr.inited)) {
|
||||
goto _return;
|
||||
|
|
|
@ -52,7 +52,7 @@ static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_D
|
|||
|
||||
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
|
||||
char key[512] = {0};
|
||||
snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
|
||||
(void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
|
||||
return taosStrdup(key);
|
||||
}
|
||||
|
||||
|
@ -68,36 +68,30 @@ bool chkRequestKilled(void* param) {
|
|||
return killed;
|
||||
}
|
||||
|
||||
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo, int connType);
|
||||
|
||||
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType) {
|
||||
if (taos_init() != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
|
||||
|
||||
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType, STscObj** pObj) {
|
||||
TSC_ERR_RET(taos_init());
|
||||
if (!validateUserName(user)) {
|
||||
terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
|
||||
return NULL;
|
||||
TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
|
||||
}
|
||||
|
||||
char localDb[TSDB_DB_NAME_LEN] = {0};
|
||||
if (db != NULL && strlen(db) > 0) {
|
||||
if (!validateDbName(db)) {
|
||||
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
|
||||
return NULL;
|
||||
TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
|
||||
}
|
||||
|
||||
tstrncpy(localDb, db, sizeof(localDb));
|
||||
strdequote(localDb);
|
||||
(void)strdequote(localDb);
|
||||
}
|
||||
|
||||
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
|
||||
if (auth == NULL) {
|
||||
if (!validatePassword(pass)) {
|
||||
terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
|
||||
return NULL;
|
||||
TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
|
||||
}
|
||||
|
||||
taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
|
||||
|
@ -107,13 +101,9 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
|
|||
|
||||
SCorEpSet epSet = {0};
|
||||
if (ip) {
|
||||
if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
|
||||
} else {
|
||||
if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
|
||||
}
|
||||
|
||||
if (port) {
|
||||
|
@ -122,7 +112,9 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
|
|||
}
|
||||
|
||||
char* key = getClusterKey(user, secretEncrypt, ip, port);
|
||||
|
||||
if (NULL == key) {
|
||||
TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
|
||||
user, db, key);
|
||||
for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
|
||||
|
@ -130,29 +122,44 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
|
|||
}
|
||||
|
||||
SAppInstInfo** pInst = NULL;
|
||||
taosThreadMutexLock(&appInfo.mutex);
|
||||
int32_t code = taosThreadMutexLock(&appInfo.mutex);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to lock app info, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
|
||||
SAppInstInfo* p = NULL;
|
||||
if (pInst == NULL) {
|
||||
p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
|
||||
if (NULL == p) {
|
||||
taosThreadMutexUnlock(&appInfo.mutex);
|
||||
taosMemoryFreeClear(key);
|
||||
TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
p->mgmtEp = epSet;
|
||||
taosThreadMutexInit(&p->qnodeMutex, NULL);
|
||||
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2);
|
||||
if (p->pTransporter == NULL) {
|
||||
code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosThreadMutexUnlock(&appInfo.mutex);
|
||||
taosMemoryFreeClear(key);
|
||||
taosMemoryFree(p);
|
||||
return NULL;
|
||||
TSC_ERR_RET(code);
|
||||
}
|
||||
p->pAppHbMgr = appHbMgrInit(p, key);
|
||||
if (NULL == p->pAppHbMgr) {
|
||||
destroyAppInst(&p);
|
||||
taosThreadMutexUnlock(&appInfo.mutex);
|
||||
taosMemoryFreeClear(key);
|
||||
return NULL;
|
||||
// TODO(smj) : change this to right code.
|
||||
TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
destroyAppInst(&p);
|
||||
taosThreadMutexUnlock(&appInfo.mutex);
|
||||
taosMemoryFreeClear(key);
|
||||
TSC_ERR_RET(code);
|
||||
}
|
||||
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
||||
p->instKey = key;
|
||||
key = NULL;
|
||||
tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
|
||||
|
@ -164,11 +171,14 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
|
|||
atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&appInfo.mutex);
|
||||
code = taosThreadMutexUnlock(&appInfo.mutex);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to unlock app info, code:%s", tstrerror(code));
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(key);
|
||||
|
||||
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
|
||||
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
|
||||
}
|
||||
|
||||
//SAppInstInfo* getAppInstInfo(const char* clusterKey) {
|
||||
|
@ -188,10 +198,10 @@ void freeQueryParam(SSyncQueryParam* param) {
|
|||
|
||||
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
|
||||
SRequestObj** pRequest, int64_t reqid) {
|
||||
*pRequest = createRequest(connId, TSDB_SQL_SELECT, reqid);
|
||||
if (*pRequest == NULL) {
|
||||
int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tscError("failed to malloc sqlObj, %s", sql);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
(*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
|
||||
|
@ -1405,18 +1415,19 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
|
|||
return 0;
|
||||
}
|
||||
|
||||
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo, int connType) {
|
||||
STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
|
||||
if (NULL == pTscObj) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return pTscObj;
|
||||
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
|
||||
*pTscObj = NULL;
|
||||
int32_t code = createTscObj(user, auth, db, connType, pAppInfo, (void**)&pTscObj);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT, 0);
|
||||
if (pRequest == NULL) {
|
||||
destroyTscObj(pTscObj);
|
||||
return NULL;
|
||||
SRequestObj* pRequest = NULL;
|
||||
code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
destroyTscObj(*pTscObj);
|
||||
return code;
|
||||
}
|
||||
|
||||
pRequest->sqlstr = taosStrdup("taos_connect");
|
||||
|
@ -1427,25 +1438,23 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
|
|||
SMsgSendInfo* body = buildConnectMsg(pRequest);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
const char* errorMsg =
|
||||
(pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
|
||||
code = tsem_wait(&pRequest->body.rspSem);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
|
||||
tscError("failed to connect to server, reason: %s", errorMsg);
|
||||
|
||||
terrno = pRequest->code;
|
||||
destroyRequest(pRequest);
|
||||
taos_close_internal(pTscObj);
|
||||
pTscObj = NULL;
|
||||
taos_close_internal(*pTscObj);
|
||||
*pTscObj = NULL;
|
||||
} else {
|
||||
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
|
||||
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
|
||||
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, (*pTscObj)->id,
|
||||
(*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
|
||||
destroyRequest(pRequest);
|
||||
}
|
||||
|
||||
return pTscObj;
|
||||
return code;
|
||||
}
|
||||
|
||||
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
|
||||
|
@ -1654,8 +1663,9 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
|
||||
if (pObj) {
|
||||
STscObj* pObj = NULL;
|
||||
int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
|
||||
*rid = pObj->id;
|
||||
return (TAOS*)rid;
|
||||
|
|
|
@ -115,8 +115,9 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
|||
pass = TSDB_DEFAULT_PASS;
|
||||
}
|
||||
|
||||
STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
|
||||
if (pObj) {
|
||||
STscObj *pObj = NULL;
|
||||
int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
|
||||
*rid = pObj->id;
|
||||
return (TAOS *)rid;
|
||||
|
|
|
@ -1537,8 +1537,8 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
|
|||
SQuery* pQuery = NULL;
|
||||
SHashObj* pVgHash = NULL;
|
||||
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid);
|
||||
RAW_NULL_CHECK(pRequest);
|
||||
SRequestObj* pRequest = NULL;
|
||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid, &pRequest));
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
|
||||
rows, pData, tbname, fields, numFields);
|
||||
|
@ -1597,8 +1597,8 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
|
|||
SQuery* pQuery = NULL;
|
||||
SHashObj* pVgHash = NULL;
|
||||
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid);
|
||||
RAW_NULL_CHECK(pRequest);
|
||||
SRequestObj* pRequest = NULL;
|
||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid, &pRequest));
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
|
||||
|
||||
|
@ -1667,8 +1667,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
SDecoder decoder = {0};
|
||||
STableMeta* pTableMeta = NULL;
|
||||
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
|
||||
RAW_NULL_CHECK(pRequest);
|
||||
SRequestObj* pRequest = NULL;
|
||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
|
||||
pRequest->syncQuery = true;
|
||||
|
@ -1778,8 +1778,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
STableMeta* pTableMeta = NULL;
|
||||
SVCreateTbReq* pCreateReqDst = NULL;
|
||||
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
|
||||
RAW_NULL_CHECK(pRequest);
|
||||
SRequestObj* pRequest = NULL;
|
||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
|
||||
pRequest->syncQuery = true;
|
||||
|
|
|
@ -2137,8 +2137,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
|||
SSmlHandle *info = NULL;
|
||||
int cnt = 0;
|
||||
while (1) {
|
||||
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
|
||||
if (request == NULL) {
|
||||
code = createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid, &request);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
uError("SML:taos_schemaless_insert error request is null");
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -1237,8 +1237,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
}
|
||||
|
||||
// init connection
|
||||
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
|
||||
if (pTmq->pTscObj == NULL) {
|
||||
code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
|
||||
(void)tsem2_destroy(&pTmq->rspSem);
|
||||
SET_ERROR_MSG_TMQ("init tscObj failed")
|
||||
|
|
|
@ -156,6 +156,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_ERROR, "Invalid encode param"
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ENCODE_PARAM_NULL, "Not found compress param")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_PARAM_ERROR, "Invalid compress param")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR, "Invalid compress level param")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FAIL_GENERATE_JSON, "failed to generate JSON")
|
||||
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INTERNAL_ERROR, "Internal error")
|
||||
|
|
Loading…
Reference in New Issue