From d5f21eda176d13f91414406ebfd6c0dd0295657e Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Mon, 21 Sep 2020 11:56:33 +0800 Subject: [PATCH 1/2] TD-1529: stream async create db connection and also fix failed test cases related to stream. --- src/client/src/tscSql.c | 10 +++++++++- src/client/src/tscStream.c | 5 +++++ src/cq/CMakeLists.txt | 2 ++ src/cq/src/cqMain.c | 18 ++++++++++++------ 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 69bc69cd4a..1df77d1850 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -236,13 +236,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port); } +static void asyncConnCallback(void *param, TAOS_RES *tres, int code) { + SSqlObj *pSql = (SSqlObj *) tres; + assert(pSql != NULL); + + pSql->fetchFp(pSql->param, tres, code); +} + TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { - SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, fp, param, taos); + SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos); if (pSql == NULL) { return NULL; } + pSql->fetchFp = fp; pSql->res.code = tscProcessSql(pSql); tscDebug("%p DB async connection is opening", taos); return taos; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 4a1f4d9d87..93a865a78b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -515,6 +515,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { return; } + uint64_t handle = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + T_REF_INC(pSql->pTscObj); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -608,6 +612,7 @@ void taos_close_stream(TAOS_STREAM *handle) { * Here, we need a check before release memory */ if (pSql->signature == pSql) { + T_REF_DEC(pSql->pTscObj); tscRemoveFromStreamList(pStream, pSql); taosTmrStopA(&(pStream->pTimer)); diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt index db366639ef..e631397348 100644 --- a/src/cq/CMakeLists.txt +++ b/src/cq/CMakeLists.txt @@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) IF (TD_LINUX) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index d8f68f66a5..889cc84374 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -21,6 +21,7 @@ #include #include "taos.h" +#include "tsclient.h" #include "taosdef.h" #include "taosmsg.h" #include "ttimer.h" @@ -238,18 +239,23 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } +static void doCreateStream(void *param, TAOS_RES *result, int code) { + SCqObj* pObj = (SCqObj*)param; + SCqContext* pContext = pObj->pContext; + SSqlObj* pSql = (SSqlObj*)result; + pContext->dbConn = pSql->pTscObj; + cqCreateStream(pContext, pObj); +} + static void cqProcessCreateTimer(void *param, void *tmrId) { SCqObj* pObj = (SCqObj*)param; SCqContext* pContext = pObj->pContext; if (pContext->dbConn == NULL) { - pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); - if (pContext->dbConn == NULL) { - cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); - } + taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); + } else { + cqCreateStream(pContext, pObj); } - - cqCreateStream(pContext, pObj); } static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { From fbc4ebb915ea1e94b885fe2ce89d6f350a683642 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 21 Sep 2020 12:53:11 +0800 Subject: [PATCH 2/2] [td-225] fix bugs in regression test. --- src/client/src/tscServer.c | 9 ++++++--- src/rpc/src/rpcMain.c | 2 ++ src/util/src/tcache.c | 10 ++++++++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c6e9cbafd7..12712c6542 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1959,6 +1959,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { return 0; } +// TODO multithread problem static void createHBObj(STscObj* pObj) { if (pObj->pHb != NULL) { return; @@ -1987,10 +1988,13 @@ static void createHBObj(STscObj* pObj) { pSql->pTscObj = pObj; pSql->signature = pSql; pObj->pHb = pSql; - T_REF_INC(pObj); tscAddSubqueryInfo(&pObj->pHb->cmd); + int64_t ad = (int64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000); + T_REF_INC(pObj); + tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); } @@ -2017,8 +2021,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->connId = htonl(pConnect->connId); createHBObj(pObj); - -// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f59bf62ec5..cb318d5c24 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -580,6 +580,8 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); if (pConn->chandle == NULL) { + tError("failed to connect to:0x%x:%d", pConn->peerIp, pConn->peerPort); + terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 4d737ebe66..e1dd521547 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -260,7 +260,13 @@ static void incRefFn(void* ptNode) { } void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { - if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0 || pCacheObj->deleting == 1) { + if (pCacheObj == NULL || pCacheObj->deleting == 1) { + return NULL; + } + + if (taosHashGetSize(pCacheObj->pHashTable) == 0) { + atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); + uError("cache:%s, key:%p, not in cache, retrieved failed, reason: empty sqlObj cache", pCacheObj->name, key); return NULL; } @@ -274,7 +280,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); + uError("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);