Merge pull request #1841 from taosdata/feature/query
[td-225] use the dnodeConn instead of mgmtConn and vnodeConn
This commit is contained in:
commit
f3189193bd
|
@ -304,6 +304,7 @@ typedef struct STscObj {
|
||||||
struct SSqlObj * pHb;
|
struct SSqlObj * pHb;
|
||||||
struct SSqlObj * sqlList;
|
struct SSqlObj * sqlList;
|
||||||
struct SSqlStream *streamList;
|
struct SSqlStream *streamList;
|
||||||
|
void* pDnodeConn;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} STscObj;
|
} STscObj;
|
||||||
|
|
||||||
|
@ -359,7 +360,7 @@ typedef struct SSqlStream {
|
||||||
struct SSqlStream *prev, *next;
|
struct SSqlStream *prev, *next;
|
||||||
} SSqlStream;
|
} SSqlStream;
|
||||||
|
|
||||||
int32_t tscInitRpc(const char *user, const char *secret);
|
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn);
|
||||||
void tscInitMsgsFp();
|
void tscInitMsgsFp();
|
||||||
|
|
||||||
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
|
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
|
||||||
|
@ -425,7 +426,6 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql);
|
||||||
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
|
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
|
||||||
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
|
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
|
||||||
|
|
||||||
extern void * pDnodeConn;
|
|
||||||
extern void * tscCacheHandle;
|
extern void * tscCacheHandle;
|
||||||
extern void * tscTmr;
|
extern void * tscTmr;
|
||||||
extern void * tscQhandle;
|
extern void * tscQhandle;
|
||||||
|
|
|
@ -191,6 +191,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscSendMsgToServer(SSqlObj *pSql) {
|
int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
|
STscObj* pObj = pSql->pTscObj;
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
char *pMsg = rpcMallocCont(pCmd->payloadLen);
|
char *pMsg = rpcMallocCont(pCmd->payloadLen);
|
||||||
|
@ -215,7 +216,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
.handle = pSql,
|
.handle = pSql,
|
||||||
.code = 0
|
.code = 0
|
||||||
};
|
};
|
||||||
rpcSendRequest(pDnodeConn, &pSql->ipList, &rpcMsg);
|
rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscInitRpc(user, pass) != 0) {
|
void *pDnodeConn = NULL;
|
||||||
|
if (tscInitRpc(user, pass, &pDnodeConn) != 0) {
|
||||||
terrno = TSDB_CODE_NETWORK_UNAVAIL;
|
terrno = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -93,6 +94,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
|
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
|
||||||
if (NULL == pObj) {
|
if (NULL == pObj) {
|
||||||
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
rpcClose(pDnodeConn);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,8 +108,9 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
int32_t len = strlen(db);
|
int32_t len = strlen(db);
|
||||||
/* db name is too long */
|
/* db name is too long */
|
||||||
if (len > TSDB_DB_NAME_LEN) {
|
if (len > TSDB_DB_NAME_LEN) {
|
||||||
free(pObj);
|
|
||||||
terrno = TSDB_CODE_INVALID_DB;
|
terrno = TSDB_CODE_INVALID_DB;
|
||||||
|
rpcClose(pDnodeConn);
|
||||||
|
free(pObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,6 +126,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
|
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
|
||||||
if (NULL == pSql) {
|
if (NULL == pSql) {
|
||||||
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
rpcClose(pDnodeConn);
|
||||||
free(pObj);
|
free(pObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -134,6 +138,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
tsem_init(&pSql->rspSem, 0, 0);
|
tsem_init(&pSql->rspSem, 0, 0);
|
||||||
|
|
||||||
pObj->pSql = pSql;
|
pObj->pSql = pSql;
|
||||||
|
pObj->pDnodeConn = pDnodeConn;
|
||||||
|
|
||||||
pSql->fp = fp;
|
pSql->fp = fp;
|
||||||
pSql->param = param;
|
pSql->param = param;
|
||||||
if (taos != NULL) {
|
if (taos != NULL) {
|
||||||
|
@ -143,6 +149,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
pSql->cmd.command = TSDB_SQL_CONNECT;
|
pSql->cmd.command = TSDB_SQL_CONNECT;
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
||||||
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
|
||||||
|
rpcClose(pDnodeConn);
|
||||||
free(pSql);
|
free(pSql);
|
||||||
free(pObj);
|
free(pObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -30,7 +30,6 @@
|
||||||
#include "tlocale.h"
|
#include "tlocale.h"
|
||||||
|
|
||||||
// global, not configurable
|
// global, not configurable
|
||||||
void * pDnodeConn;
|
|
||||||
void * tscCacheHandle;
|
void * tscCacheHandle;
|
||||||
void * tscTmr;
|
void * tscTmr;
|
||||||
void * tscQhandle;
|
void * tscQhandle;
|
||||||
|
@ -48,12 +47,12 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
|
||||||
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscInitRpc(const char *user, const char *secret) {
|
int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn) {
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
char secretEncrypt[32] = {0};
|
char secretEncrypt[32] = {0};
|
||||||
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
|
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
|
||||||
|
|
||||||
if (pDnodeConn == NULL) {
|
if (*pDnodeConn == NULL) {
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localPort = 0;
|
rpcInit.localPort = 0;
|
||||||
rpcInit.label = "TSC";
|
rpcInit.label = "TSC";
|
||||||
|
@ -66,9 +65,9 @@ int32_t tscInitRpc(const char *user, const char *secret) {
|
||||||
rpcInit.ckey = "key";
|
rpcInit.ckey = "key";
|
||||||
rpcInit.secret = secretEncrypt;
|
rpcInit.secret = secretEncrypt;
|
||||||
|
|
||||||
pDnodeConn = rpcOpen(&rpcInit);
|
*pDnodeConn = rpcOpen(&rpcInit);
|
||||||
if (pDnodeConn == NULL) {
|
if (*pDnodeConn == NULL) {
|
||||||
tscError("failed to init connection to vnode");
|
tscError("failed to init connection to TDengine");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -168,11 +167,6 @@ void taos_cleanup() {
|
||||||
|
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
||||||
if (pDnodeConn != NULL) {
|
|
||||||
rpcClose(pDnodeConn);
|
|
||||||
pDnodeConn = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosTmrCleanUp(tscTmr);
|
taosTmrCleanUp(tscTmr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -762,6 +762,10 @@ void tscCloseTscObj(STscObj* pObj) {
|
||||||
|
|
||||||
pthread_mutex_destroy(&pObj->mutex);
|
pthread_mutex_destroy(&pObj->mutex);
|
||||||
|
|
||||||
|
if (pObj->pDnodeConn != NULL) {
|
||||||
|
rpcClose(pObj->pDnodeConn);
|
||||||
|
}
|
||||||
|
|
||||||
tscTrace("%p DB connection is closed", pObj);
|
tscTrace("%p DB connection is closed", pObj);
|
||||||
tfree(pObj);
|
tfree(pObj);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue