From 1f73c106b91e711880d772bbdae9d432bddcac69 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Mon, 21 Sep 2020 11:56:33 +0800 Subject: [PATCH 1/2] TD-1529: stream async create db connection and also fix failed test cases related to stream. --- src/client/src/tscSql.c | 10 +++++++++- src/client/src/tscStream.c | 5 +++++ src/cq/CMakeLists.txt | 2 ++ src/cq/src/cqMain.c | 18 ++++++++++++------ 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 69bc69cd4a..1df77d1850 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -236,13 +236,21 @@ TAOS *taos_connect_c(const char *ip, uint8_t ipLen, const char *user, uint8_t us return taos_connect(ipBuf, userBuf, passBuf, dbBuf, port); } +static void asyncConnCallback(void *param, TAOS_RES *tres, int code) { + SSqlObj *pSql = (SSqlObj *) tres; + assert(pSql != NULL); + + pSql->fetchFp(pSql->param, tres, code); +} + TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { - SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, fp, param, taos); + SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, taos); if (pSql == NULL) { return NULL; } + pSql->fetchFp = fp; pSql->res.code = tscProcessSql(pSql); tscDebug("%p DB async connection is opening", taos); return taos; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 4a1f4d9d87..93a865a78b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -515,6 +515,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { return; } + uint64_t handle = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + T_REF_INC(pSql->pTscObj); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -608,6 +612,7 @@ void taos_close_stream(TAOS_STREAM *handle) { * Here, we need a check before release memory */ if (pSql->signature == pSql) { + T_REF_DEC(pSql->pTscObj); tscRemoveFromStreamList(pStream, pSql); taosTmrStopA(&(pStream->pTimer)); diff --git a/src/cq/CMakeLists.txt b/src/cq/CMakeLists.txt index db366639ef..e631397348 100644 --- a/src/cq/CMakeLists.txt +++ b/src/cq/CMakeLists.txt @@ -2,6 +2,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) IF (TD_LINUX) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index d8f68f66a5..889cc84374 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -21,6 +21,7 @@ #include #include "taos.h" +#include "tsclient.h" #include "taosdef.h" #include "taosmsg.h" #include "ttimer.h" @@ -238,18 +239,23 @@ void cqDrop(void *handle) { pthread_mutex_unlock(&pContext->mutex); } +static void doCreateStream(void *param, TAOS_RES *result, int code) { + SCqObj* pObj = (SCqObj*)param; + SCqContext* pContext = pObj->pContext; + SSqlObj* pSql = (SSqlObj*)result; + pContext->dbConn = pSql->pTscObj; + cqCreateStream(pContext, pObj); +} + static void cqProcessCreateTimer(void *param, void *tmrId) { SCqObj* pObj = (SCqObj*)param; SCqContext* pContext = pObj->pContext; if (pContext->dbConn == NULL) { - pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, pContext->db, 0); - if (pContext->dbConn == NULL) { - cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno)); - } + taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); + } else { + cqCreateStream(pContext, pObj); } - - cqCreateStream(pContext, pObj); } static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { From 1a5272a49f85994c6e7e10685e5ea5e5938fd692 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 21 Sep 2020 04:34:29 +0000 Subject: [PATCH 2/2] TD-1520 --- src/client/src/tscSQLParser.c | 1 + src/common/inc/tglobal.h | 1 + src/common/src/tglobal.c | 14 +++++++++++++- src/cq/src/cqMain.c | 4 ++-- src/mnode/src/mnodeTable.c | 4 ++-- tests/script/sh/deploy.sh | 1 + tests/script/tmp/prepare.sim | 4 ++-- tests/script/unique/cluster/vgroup100.sim | 4 +++- 8 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4555b0e08d..a6efffe436 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4873,6 +4873,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { {"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"qDebugflag", 10}, {"sdbDebugFlag", 12}, {"uDebugFlag", 10}, {"tsdbDebugFlag", 13}, {"sDebugflag", 10}, {"rpcDebugFlag", 12}, {"dDebugFlag", 10}, {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"tmrDebugFlag", 12}, + {"cqDebugFlag", 11}, }; SStrToken* pOptionToken = &pOptions->a[1]; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 77e8b76456..798e265455 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -174,6 +174,7 @@ extern int32_t rpcDebugFlag; extern int32_t odbcDebugFlag; extern int32_t qDebugFlag; extern int32_t wDebugFlag; +extern int32_t cqDebugFlag; extern int32_t debugFlag; #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 7e46f58a93..94ec6fde0f 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -203,6 +203,7 @@ int32_t debugFlag = 0; int32_t sDebugFlag = 135; int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; +int32_t cqDebugFlag = 135; int32_t (*monitorStartSystemFp)() = NULL; void (*monitorStopSystemFp)() = NULL; @@ -222,12 +223,13 @@ void taosSetAllDebugFlag() { httpDebugFlag = debugFlag; mqttDebugFlag = debugFlag; monitorDebugFlag = debugFlag; + qDebugFlag = debugFlag; rpcDebugFlag = debugFlag; uDebugFlag = debugFlag; sDebugFlag = debugFlag; wDebugFlag = debugFlag; tsdbDebugFlag = debugFlag; - qDebugFlag = debugFlag; + cqDebugFlag = debugFlag; uInfo("all debug flag are set to %d", debugFlag); } } @@ -1209,6 +1211,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "cqDebugFlag"; + cfg.ptr = &cqDebugFlag; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG; + cfg.minValue = 0; + cfg.maxValue = 255; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "tscEnableRecordSql"; cfg.ptr = &tsTscEnableRecordSql; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 889cc84374..1a99a84b8e 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -66,8 +66,6 @@ typedef struct SCqObj { SCqContext * pContext; } SCqObj; -int cqDebugFlag = 135; - static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); @@ -252,6 +250,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { SCqContext* pContext = pObj->pContext; if (pContext->dbConn == NULL) { + cDebug("vgId:%d, try connect to TDengine", pContext->vgId); taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); } else { cqCreateStream(pContext, pObj); @@ -262,6 +261,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; if (pContext->dbConn == NULL) { + cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId); pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl); return; } diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4400927e9b..a35f09cd8d 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -2122,8 +2122,8 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { } pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); - mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pTable->uid); + mDebug("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->uid, pTable->vgId, pTable->sid); return TSDB_CODE_SUCCESS; } diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 0d444a5a6e..8fccb1442f 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -128,6 +128,7 @@ echo "tmrDebugFlag 131" >> $TAOS_CFG echo "udebugFlag 135" >> $TAOS_CFG echo "sdebugFlag 135" >> $TAOS_CFG echo "wdebugFlag 135" >> $TAOS_CFG +echo "cqdebugFlag 135" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index 8b8f206233..343c422e9f 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -34,11 +34,11 @@ system sh/cfg.sh -n dnode4 -c http -v 1 return # for crash_gen -system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2 +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 10 system sh/cfg.sh -n dnode1 -c rpcMaxTime -v 101 system sh/cfg.sh -n dnode1 -c cache -v 2 system sh/cfg.sh -n dnode1 -c keep -v 36500 -system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 # for windows diff --git a/tests/script/unique/cluster/vgroup100.sim b/tests/script/unique/cluster/vgroup100.sim index cddb38cefd..bde6dd2462 100644 --- a/tests/script/unique/cluster/vgroup100.sim +++ b/tests/script/unique/cluster/vgroup100.sim @@ -42,9 +42,11 @@ $count = 2 while $count < 102 $db = d . $count $tb = $db . .t + $tb2 = $db . .t2 sql create database $db replica 3 cache 1 blocks 3 sql create table $tb (ts timestamp, i int) sql insert into $tb values(now, 1) + sql create table $tb2 as select count(*) from $tb interval(10s) $count = $count + 1 print insert into $tb values(now, 1) ==> finished endw @@ -74,7 +76,7 @@ print ============================== step6 system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode3 -s start -sleep 3000 +sleep 10000 print ============================== step7