[td-11818] refactor.
This commit is contained in:
parent
9ea9444b36
commit
de6fa20810
|
@ -82,7 +82,6 @@ typedef struct STscObj {
|
|||
int32_t acctId;
|
||||
uint32_t connId;
|
||||
uint64_t id; // ref ID returned by taosAddRef
|
||||
void *pTransporter;
|
||||
pthread_mutex_t mutex; // used to protect the operation on db
|
||||
int32_t numOfReqs; // number of sqlObj bound to this connection
|
||||
SAppInstInfo *pAppInfo;
|
||||
|
|
|
@ -89,13 +89,12 @@ static void tscInitLogFile() {
|
|||
|
||||
// todo close the transporter properly
|
||||
void closeTransporter(STscObj* pTscObj) {
|
||||
if (pTscObj == NULL || pTscObj->pTransporter == NULL) {
|
||||
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pTransporter, pTscObj->id);
|
||||
rpcClose(pTscObj->pTransporter);
|
||||
pTscObj->pTransporter = NULL;
|
||||
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
|
||||
rpcClose(pTscObj->pAppInfo->pTransporter);
|
||||
}
|
||||
|
||||
// TODO refactor
|
||||
|
@ -140,10 +139,6 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
|
|||
}
|
||||
|
||||
pObj->pAppInfo = pAppInfo;
|
||||
if (pAppInfo != NULL) {
|
||||
pObj->pTransporter = pAppInfo->pTransporter;
|
||||
}
|
||||
|
||||
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
||||
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
|
||||
|
||||
|
|
|
@ -158,7 +158,7 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
|
|||
.sqlLen = pRequest->sqlLen,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pTransporter = pTscObj->pTransporter,
|
||||
.pTransporter = pTscObj->pAppInfo->pTransporter,
|
||||
};
|
||||
|
||||
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
|
@ -191,10 +191,10 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
|||
pShowReqInfo->pArray = pDcl->pExtension;
|
||||
}
|
||||
}
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
|
||||
} else {
|
||||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg);
|
||||
}
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
@ -241,7 +241,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
|
|||
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
|
||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||
|
||||
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
|
||||
int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// handle error and retry
|
||||
} else {
|
||||
|
@ -255,7 +255,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
|
|||
return pRequest->code;
|
||||
}
|
||||
|
||||
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
|
||||
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
|
||||
}
|
||||
|
||||
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
|
||||
|
@ -305,7 +305,7 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq
|
|||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
|
@ -406,7 +406,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
|
|||
SMsgSendInfo* body = buildConnectMsg(pRequest);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -417,7 +417,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
|
|||
taos_close(pTscObj);
|
||||
pTscObj = NULL;
|
||||
} else {
|
||||
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId);
|
||||
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
|
||||
destroyRequest(pRequest);
|
||||
}
|
||||
|
||||
|
@ -585,7 +585,7 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
|
||||
int64_t transporterId = 0;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
||||
|
@ -595,7 +595,7 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
|
||||
int64_t transporterId = 0;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
|
|
Loading…
Reference in New Issue