From d6fca91f9f54531f6c51124c4d2f2d03046f95f4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 May 2022 17:02:12 +0800 Subject: [PATCH 1/9] fix: crash when executing create database test vgroups 1024 --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 4 --- source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 31 ++++++++------------- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 -- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 5 +--- 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 51b3860461..5d8ec50e81 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -46,8 +46,6 @@ typedef struct { int32_t vgId; int32_t vgVersion; int8_t dropped; - uint64_t dbUid; - char db[TSDB_DB_FNAME_LEN]; char path[PATH_MAX + 20]; } SWrapperCfg; @@ -57,8 +55,6 @@ typedef struct { int32_t vgVersion; int8_t dropped; int8_t accessState; - uint64_t dbUid; - char *db; char *path; SVnode *pImpl; STaosQueue *pWriteQ; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index f251dd120e..ba4293eeb2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -47,7 +47,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; - int32_t maxLen = 30000; + int32_t maxLen = 1024 * 1024; char *content = taosMemoryCalloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; @@ -64,6 +64,11 @@ int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t goto _OVER; } + if (content == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); @@ -116,20 +121,6 @@ int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t goto _OVER; } pCfg->vgVersion = vgVersion->valueint; - - cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid"); - if (!dbUid || dbUid->type != cJSON_String) { - dError("failed to read %s since dbUid not found", file); - goto _OVER; - } - pCfg->dbUid = atoll(dbUid->valuestring); - - cJSON *db = cJSON_GetObjectItem(vnode, "db"); - if (!db || db->type != cJSON_String) { - dError("failed to read %s since db not found", file); - goto _OVER; - } - tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN); } *ppCfgs = pCfgs; @@ -165,8 +156,12 @@ int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) { SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); int32_t len = 0; - int32_t maxLen = 65536; + int32_t maxLen = 1024 * 1024; char *content = taosMemoryCalloc(1, maxLen + 1); + if (content == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); @@ -175,9 +170,7 @@ int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) { len += snprintf(content + len, maxLen - len, " {\n"); len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped); - len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d,\n", pVnode->vgVersion); - len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid); - len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db); + len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d\n", pVnode->vgVersion); if (i < numOfVnodes - 1) { len += snprintf(content + len, maxLen - len, " },\n"); } else { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 914acce2ea..be4cfb95e2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -174,8 +174,6 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S pCfg->vgId = pCreate->vgId; pCfg->vgVersion = pCreate->vgVersion; pCfg->dropped = 0; - pCfg->dbUid = pCreate->dbUid; - tstrncpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index af439fcc03..9a0a524267 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -57,13 +57,11 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->vgVersion = pCfg->vgVersion; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; - pVnode->dbUid = pCfg->dbUid; - pVnode->db = tstrdup(pCfg->db); pVnode->path = tstrdup(pCfg->path); pVnode->pImpl = pImpl; pVnode->pWrapper = pMgmt->pWrapper; - if (pVnode->path == NULL || pVnode->db == NULL) { + if (pVnode->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -109,7 +107,6 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { } taosMemoryFree(pVnode->path); - taosMemoryFree(pVnode->db); taosMemoryFree(pVnode); } From 56c1b070160925985be2ad42b5af2634257d8ef4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 May 2022 17:18:35 +0800 Subject: [PATCH 2/9] fix: error msg incorrect when no enough memory to create vnode --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/vnode/src/vnd/vnodeOpen.c | 3 +++ 2 files changed, 4 insertions(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index be4cfb95e2..528f6a0ffe 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -225,6 +225,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); + code = terrno; goto _OVER; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ae134e6496..7a0561327f 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -137,18 +137,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); + terrno = TAOS_SYSTEM_ERROR(terrno); goto _err; } // vnode begin if (vnodeBegin(pVnode) < 0) { vError("vgId:%d failed to begin since %s", TD_VID(pVnode), tstrerror(terrno)); + terrno = TAOS_SYSTEM_ERROR(terrno); goto _err; } // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); + terrno = TAOS_SYSTEM_ERROR(terrno); goto _err; } From 2178eabcd8bb30b67a673bb7b8e550051ceab268 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 May 2022 17:36:32 +0800 Subject: [PATCH 3/9] fix: error msg incorrect when no enough memory to create vnode --- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5085de8610..4828b9f523 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1051,7 +1051,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mError("failed to execute redoActions since %s", terrstr()); + mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 7a0561327f..3f29700cb7 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -137,21 +137,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TAOS_SYSTEM_ERROR(terrno); + terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } // vnode begin if (vnodeBegin(pVnode) < 0) { vError("vgId:%d failed to begin since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TAOS_SYSTEM_ERROR(terrno); + terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TAOS_SYSTEM_ERROR(terrno); + terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } From 558d50fef2326946596491f438947a79c0e1d8b6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 May 2022 17:40:22 +0800 Subject: [PATCH 4/9] fix: error msg incorrect when no enough memory to create vnode --- source/dnode/vnode/src/vnd/vnodeOpen.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 3f29700cb7..7476da2a0f 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -137,21 +137,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } // vnode begin if (vnodeBegin(pVnode) < 0) { vError("vgId:%d failed to begin since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } From d7e34a66424414cd36f8dbca72a8d0a55c0bc7c1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 13 May 2022 18:39:37 +0800 Subject: [PATCH 5/9] fix(tmq): set config --- example/src/tmq.c | 6 +++--- source/client/src/tmq.c | 28 ++++++++++++++++++---------- source/dnode/vnode/src/tq/tq.c | 1 + 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index e867f17e78..b4013f26ee 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -61,7 +61,7 @@ int32_t init_env() { taos_free_result(pRes); pRes = - taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"); + taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int)"); if (taos_errno(pRes) != 0) { printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -106,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a6b8b842f9..639f00ab86 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -185,6 +185,7 @@ typedef struct { int32_t async; tsem_t rspSem; tmq_resp_err_t rspErr; + SArray* offsets; } SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { @@ -246,10 +247,13 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value if (strcmp(key, "msg.with.table.name") == 0) { if (strcmp(value, "true") == 0) { conf->withTbName = 1; + return TMQ_CONF_OK; } else if (strcmp(value, "false") == 0) { conf->withTbName = 0; + return TMQ_CONF_OK; } else if (strcmp(value, "none") == 0) { conf->withTbName = -1; + return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } @@ -395,6 +399,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { if (!pParam->async) tsem_post(&pParam->rspSem); else { + if (pParam->offsets) { + taosArrayDestroy(pParam->offsets); + } tsem_destroy(&pParam->rspSem); /*if (pParam->pArray) {*/ /*taosArrayDestroy(pParam->pArray);*/ @@ -540,10 +547,10 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in // build msg // send to mnode SMqCMCommitOffsetReq req; - SArray* pArray = NULL; + SArray* pOffsets = NULL; if (offsets == NULL) { - pArray = taosArrayInit(0, sizeof(SMqOffset)); + pOffsets = taosArrayInit(0, sizeof(SMqOffset)); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -553,11 +560,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in strcpy(offset.cgroup, tmq->groupId); offset.vgId = pVg->vgId; offset.offset = pVg->currentOffset; - taosArrayPush(pArray, &offset); + taosArrayPush(pOffsets, &offset); } } - req.num = pArray->size; - req.offsets = pArray->pData; + req.num = pOffsets->size; + req.offsets = pOffsets->pData; } else { req.num = taosArrayGetSize(&offsets->container); req.offsets = (SMqOffset*)offsets->container.pData; @@ -591,6 +598,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); pParam->async = async; + pParam->offsets = pOffsets; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, @@ -613,8 +621,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); - if (pArray) { - taosArrayDestroy(pArray); + if (pOffsets) { + taosArrayDestroy(pOffsets); } } @@ -1015,7 +1023,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { atomic_store_32(&tmq->epSkipCnt, 0); #endif int32_t tlen = sizeof(SMqAskEpReq); - SMqAskEpReq* req = taosMemoryMalloc(tlen); + SMqAskEpReq* req = taosMemoryCalloc(1, tlen); if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); /*atomic_store_8(&tmq->epStatus, 0);*/ @@ -1025,7 +1033,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { req->epoch = htonl(tmq->epoch); strcpy(req->cgroup, tmq->groupId); - SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam)); + SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); taosMemoryFree(req); @@ -1107,7 +1115,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* reqOffset = tmq->resetOffsetCfg; } - SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq)); + SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq)); if (pReq == NULL) { return NULL; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 28cdb39bd5..9526451907 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -559,6 +559,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } // db subscribe } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { + rsp.withSchema = 1; STqReadHandle* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pCont, 0); while (tqNextDataBlock(pReader)) { From eba47b9ef4d34da14b64360d5449c20cf317cc42 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 13 May 2022 18:54:29 +0800 Subject: [PATCH 6/9] [test: add valgrind run tmq] --- tests/system-test/99-TDcase/TD-15557.py | 206 +++++++++++++++++++++++- 1 file changed, 198 insertions(+), 8 deletions(-) diff --git a/tests/system-test/99-TDcase/TD-15557.py b/tests/system-test/99-TDcase/TD-15557.py index 4798bb7c8d..e005985fe0 100644 --- a/tests/system-test/99-TDcase/TD-15557.py +++ b/tests/system-test/99-TDcase/TD-15557.py @@ -49,6 +49,19 @@ class TDTestCase: print(cur) return cur + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind): + shellCmd = 'nohup ' + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) @@ -113,9 +126,8 @@ class TDTestCase: parameterDict["startTs"]) return - def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: Produce while consume to subscribe one db") + tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ @@ -153,7 +165,7 @@ class TDTestCase: auto.offset.reset:earliest' sql = "insert into %s.consumeinfo values "%cdbName sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) - tdSql.query(sql) + tdSql.query(sql) event.wait() @@ -162,11 +174,8 @@ class TDTestCase: showMsg = 1 showRow = 1 - shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath - shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) - shellCmd += "> /dev/null 2>&1 &" - tdLog.info(shellCmd) - os.system(shellCmd) + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) # wait for data ready prepareEnvThread.join() @@ -190,6 +199,187 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") + def tmqCase2(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + consumerId = 1 + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 2: + break + else: + time.sleep(5) + + consumerId0 = tdSql.getData(0 , 1) + consumerId1 = tdSql.getData(1 , 1) + actConsumeRows0 = tdSql.getData(0 , 3) + actConsumeRows1 = tdSql.getData(1 , 3) + + tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) + tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) + + totalConsumeRows = actConsumeRows0 + actConsumeRows1 + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def tmqCase3(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + parameterDict2 = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 4, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + # consumerId = 1 + # sql = "insert into %s.consumeinfo values "%cdbName + # sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + # tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + consumerId0 = tdSql.getData(0 , 1) + #consumerId1 = tdSql.getData(1 , 1) + actConsumeRows0 = tdSql.getData(0 , 3) + #actConsumeRows1 = tdSql.getData(1 , 3) + + tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) + #tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) + + #totalConsumeRows = actConsumeRows0 + actConsumeRows1 + if actConsumeRows0 != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + def run(self): tdSql.prepare() From 8873e392fb361d5787e785fdb67dbbadb2641258 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 13 May 2022 18:59:44 +0800 Subject: [PATCH 7/9] enh: enable limit in shell --- tools/shell/src/shellArguments.c | 2 +- tools/shell/src/shellEngine.c | 79 ++++++++++++++++++++++++-------- 2 files changed, 62 insertions(+), 19 deletions(-) diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index 13f8cde3e3..1639fd1ca6 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -332,7 +332,7 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) { shellInitArgs(argc, argv); shell.info.clientVersion = "Welcome to the TDengine shell from %s, Client Version:%s\n" - "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; + "Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.\n\n"; shell.info.promptHeader = "taos> "; shell.info.promptContinue = " -> "; shell.info.promptSize = 6; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 21fd3d0359..8f0d39377e 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -29,11 +29,11 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); static void shellPrintNChar(const char *str, int32_t length, int32_t width); static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); -static int32_t shellVerticalPrintResult(TAOS_RES *tres); +static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); -static int32_t shellHorizontalPrintResult(TAOS_RES *tres); -static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical); +static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); +static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); static void shellReadHistory(); static void shellWriteHistory(); static void shellPrintError(TAOS_RES *tres, int64_t st); @@ -121,7 +121,7 @@ int32_t shellRunCommand(char *command) { char quote = 0, *cmd = command; for (char c = *command++; c != 0; c = *command++) { if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) { - command ++; + command++; continue; } @@ -190,7 +190,7 @@ void shellRunSingleCommandImp(char *command) { if (pFields != NULL) { // select and show kinds of commands int32_t error_no = 0; - int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); + int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command); if (numOfRows < 0) return; et = taosGetTimestampUs(); @@ -272,6 +272,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i return; } + int n; char buf[TSDB_MAX_BYTES_PER_ROW]; switch (field->type) { case TSDB_DATA_TYPE_BOOL: @@ -280,20 +281,37 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i case TSDB_DATA_TYPE_TINYINT: taosFprintfFile(pFile, "%d", *((int8_t *)val)); break; + case TSDB_DATA_TYPE_UTINYINT: + taosFprintfFile(pFile, "%u", *((uint8_t *)val)); + break; case TSDB_DATA_TYPE_SMALLINT: taosFprintfFile(pFile, "%d", *((int16_t *)val)); break; + case TSDB_DATA_TYPE_USMALLINT: + taosFprintfFile(pFile, "%u", *((uint16_t *)val)); + break; case TSDB_DATA_TYPE_INT: taosFprintfFile(pFile, "%d", *((int32_t *)val)); break; + case TSDB_DATA_TYPE_UINT: + taosFprintfFile(pFile, "%u", *((uint32_t *)val)); + break; case TSDB_DATA_TYPE_BIGINT: taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val)); break; + case TSDB_DATA_TYPE_UBIGINT: + taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val)); + break; case TSDB_DATA_TYPE_FLOAT: taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val)); break; case TSDB_DATA_TYPE_DOUBLE: - taosFprintfFile(pFile, "%.9f", GET_DOUBLE_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val)); + if (n > MAX(25, length)) { + taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val)); + } else { + taosFprintfFile(pFile, "%s", buf); + } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: @@ -435,6 +453,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t return; } + int n; char buf[TSDB_MAX_BYTES_PER_ROW]; switch (field->type) { case TSDB_DATA_TYPE_BOOL: @@ -468,7 +487,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t printf("%*.5f", width, GET_FLOAT_VAL(val)); break; case TSDB_DATA_TYPE_DOUBLE: - printf("%*.9f", width, GET_DOUBLE_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); + if (n > MAX(25, width)) { + printf("%*.15e", width, GET_DOUBLE_VAL(val)); + } else { + printf("%s", buf); + } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: @@ -483,7 +507,16 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t } } -int32_t shellVerticalPrintResult(TAOS_RES *tres) { +bool shellIsLimitQuery(const char *sql) { + //todo refactor + if (strstr(sql, "limit") != NULL || strstr(sql, "LIMIT") != NULL) { + return true; + } + + return false; +} + +int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -503,7 +536,7 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres) { uint64_t resShowMaxNum = UINT64_MAX; - if (shell.args.commands == NULL && shell.args.file[0] == 0) { + if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } @@ -525,8 +558,13 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres) { putchar('\n'); } } else if (showMore) { - printf("[100 Rows showed, and more rows are fetching but will not be showed. You can ctrl+c to stop or wait.]\n"); - printf("[You can add limit statement to get more or redirect results to specific file to get all.]\n"); + printf("\n"); + printf(" Notice: The result shows only the first %d rows.\n", SHELL_DEFAULT_RES_SHOW_NUM); + printf(" You can use the `LIMIT` clause to get fewer result to show.\n"); + printf(" Or use '>>' to redirect the whole set of the result to a specified file.\n"); + printf("\n"); + printf(" You can use Ctrl+C to stop the underway fetching.\n"); + printf("\n"); showMore = 0; } @@ -618,7 +656,7 @@ void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields) { putchar('\n'); } -int32_t shellHorizontalPrintResult(TAOS_RES *tres) { +int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -637,7 +675,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) { uint64_t resShowMaxNum = UINT64_MAX; - if (shell.args.commands == NULL && shell.args.file[0] == 0) { + if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } @@ -655,8 +693,13 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) { } putchar('\n'); } else if (showMore) { - printf("[100 Rows showed, and more rows are fetching but will not be showed. You can ctrl+c to stop or wait.]\n"); - printf("[You can add limit statement to show more or redirect results to specific file to get all.]\n"); + printf("\n"); + printf(" Notice: The result shows only the first %d rows.\n", SHELL_DEFAULT_RES_SHOW_NUM); + printf(" You can use the `LIMIT` clause to get fewer result to show.\n"); + printf(" Or use '>>' to redirect the whole set of the result to a specified file.\n"); + printf("\n"); + printf(" You can use Ctrl+C to stop the underway fetching.\n"); + printf("\n"); showMore = 0; } @@ -667,14 +710,14 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) { return numOfRows; } -int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical) { +int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) { int32_t numOfRows = 0; if (fname != NULL) { numOfRows = shellDumpResultToFile(fname, tres); } else if (vertical) { - numOfRows = shellVerticalPrintResult(tres); + numOfRows = shellVerticalPrintResult(tres, sql); } else { - numOfRows = shellHorizontalPrintResult(tres); + numOfRows = shellHorizontalPrintResult(tres, sql); } *error_no = taos_errno(tres); From b8f7526361bceda08972371088d995f0a7034964 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 13 May 2022 19:09:36 +0800 Subject: [PATCH 8/9] fix(tmq): msg schema --- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tqRead.c | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 5eb89e8bb7..38dedee5a2 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -93,6 +93,7 @@ struct STqReadHandle { SMeta* pVnodeMeta; SArray* pColIdList; // SArray int32_t sver; + int64_t cachedSchemaUid; SSchemaWrapper* pSchemaWrapper; STSchema* pSchema; }; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index f531d3f5fb..996d789e24 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -25,6 +25,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; pReadHandle->sver = -1; + pReadHandle->cachedSchemaUid = -1; pReadHandle->pSchema = NULL; pReadHandle->pSchemaWrapper = NULL; pReadHandle->tbIdHash = NULL; @@ -84,19 +85,20 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { return false; } -int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t* pNumOfRows, - int16_t* pNumOfCols) { +int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, + int32_t* pNumOfRows, int16_t* pNumOfCols) { /*int32_t sversion = pHandle->pBlock->sversion;*/ // TODO set to real sversion *pUid = 0; int32_t sversion = 0; - if (pHandle->sver != sversion) { + if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); // this interface use suid instead of uid pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->sver = sversion; + pHandle->cachedSchemaUid = pHandle->msgIter.suid; } STSchema* pTschema = pHandle->pSchema; From 1e8ac7b06c7ac75425440ea91522670537278174 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 13 May 2022 19:55:05 +0800 Subject: [PATCH 9/9] test: modify debug flag --- tests/system-test/99-TDcase/TD-15554.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/system-test/99-TDcase/TD-15554.py b/tests/system-test/99-TDcase/TD-15554.py index fb3817ed89..890580ca2c 100644 --- a/tests/system-test/99-TDcase/TD-15554.py +++ b/tests/system-test/99-TDcase/TD-15554.py @@ -13,12 +13,11 @@ from util.dnodes import * class TDTestCase: hostname = socket.gethostname() - #rpcDebugFlagVal = '143' - #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal - #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} - #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal - #print ("===================: ", updatecfgDict) + + clientCfgDict = {'qdebugflag':'143'} + updatecfgDict = {'clientCfg': {}, 'qdebugflag':'143'} + updatecfgDict["clientCfg"] = clientCfgDict + print ("===================: ", updatecfgDict) def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}")