Merge pull request #3594 from taosdata/bugfix/stream
[TD-1529]<fix>: CQ create db connection async
This commit is contained in:
commit
e589dc241d
|
@ -236,13 +236,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us
|
|||
return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port);
|
||||
}
|
||||
|
||||
static void asyncConnCallback(void *param, TAOS_RES *tres, int code) {
|
||||
SSqlObj *pSql = (SSqlObj *) tres;
|
||||
assert(pSql != NULL);
|
||||
|
||||
pSql->fetchFp(pSql->param, tres, code);
|
||||
}
|
||||
|
||||
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
||||
void *param, void **taos) {
|
||||
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, fp, param, taos);
|
||||
SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos);
|
||||
if (pSql == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pSql->fetchFp = fp;
|
||||
pSql->res.code = tscProcessSql(pSql);
|
||||
tscDebug("%p DB async connection is opening", taos);
|
||||
return taos;
|
||||
|
|
|
@ -515,6 +515,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
|||
return;
|
||||
}
|
||||
|
||||
uint64_t handle = (uint64_t) pSql;
|
||||
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
|
||||
T_REF_INC(pSql->pTscObj);
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
@ -608,6 +612,7 @@ void taos_close_stream(TAOS_STREAM *handle) {
|
|||
* Here, we need a check before release memory
|
||||
*/
|
||||
if (pSql->signature == pSql) {
|
||||
T_REF_DEC(pSql->pTscObj);
|
||||
tscRemoveFromStreamList(pStream, pSql);
|
||||
|
||||
taosTmrStopA(&(pStream->pTimer));
|
||||
|
|
|
@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
|||
PROJECT(TDengine)
|
||||
|
||||
INCLUDE_DIRECTORIES(inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
||||
|
||||
IF (TD_LINUX)
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include <string.h>
|
||||
|
||||
#include "taos.h"
|
||||
#include "tsclient.h"
|
||||
#include "taosdef.h"
|
||||
#include "taosmsg.h"
|
||||
#include "ttimer.h"
|
||||
|
@ -238,18 +239,23 @@ void cqDrop(void *handle) {
|
|||
pthread_mutex_unlock(&pContext->mutex);
|
||||
}
|
||||
|
||||
static void doCreateStream(void *param, TAOS_RES *result, int code) {
|
||||
SCqObj* pObj = (SCqObj*)param;
|
||||
SCqContext* pContext = pObj->pContext;
|
||||
SSqlObj* pSql = (SSqlObj*)result;
|
||||
pContext->dbConn = pSql->pTscObj;
|
||||
cqCreateStream(pContext, pObj);
|
||||
}
|
||||
|
||||
static void cqProcessCreateTimer(void *param, void *tmrId) {
|
||||
SCqObj* pObj = (SCqObj*)param;
|
||||
SCqContext* pContext = pObj->pContext;
|
||||
|
||||
if (pContext->dbConn == NULL) {
|
||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0);
|
||||
if (pContext->dbConn == NULL) {
|
||||
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||
}
|
||||
taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
|
||||
} else {
|
||||
cqCreateStream(pContext, pObj);
|
||||
}
|
||||
|
||||
cqCreateStream(pContext, pObj);
|
||||
}
|
||||
|
||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||
|
|
Loading…
Reference in New Issue