From aff649deb783cf172c2cc083016d9be96825b38d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 27 Nov 2022 23:00:32 +0800 Subject: [PATCH 01/12] adjust parameer --- include/util/tdef.h | 2 +- source/common/src/tglobal.c | 3 +++ source/libs/transport/src/trans.c | 3 +++ source/libs/transport/src/transSvr.c | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 48dedd3e3e..ad44daed46 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -406,7 +406,7 @@ typedef enum ELogicConditionType { #ifdef WINDOWS #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #else -#define TSDB_MAX_RPC_THREADS 10 +#define TSDB_MAX_RPC_THREADS 20 #endif #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f2d8b9aa7c..e27793bd56 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -306,6 +306,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); + if (tsNumOfTaskQueueThreads >= 10) { + tsNumOfTaskQueueThreads = 10; + } if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, 0) != 0) return -1; return 0; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 94bc128de9..88888f2f84 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -58,6 +58,9 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->destroyFp = pInit->dfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + if (pRpc->numOfThreads <= 0) { + pRpc->numOfThreads = 1; + } uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index b7fe404a4e..d00624c4d2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -846,7 +846,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* transWorkerThread(void* arg) { - setThreadName("trans-worker"); + setThreadName("trans-srv-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); From 0611ecbe313e8d56ed1343da513e98c43744e0ee Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 28 Nov 2022 10:53:27 +0800 Subject: [PATCH 02/12] change paramter --- source/client/src/clientImpl.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/libs/transport/src/transCli.c | 5 +++-- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3140371c4..d8d1edc3d3 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -131,7 +131,7 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; taosThreadMutexInit(&p->qnodeMutex, NULL); - p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); + p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2); p->pAppHbMgr = appHbMgrInit(p, key); if (NULL == p->pAppHbMgr) { destroyAppInst(p); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 5546d762f4..f78fd33e47 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -250,7 +250,7 @@ int32_t dmInitClient(SDnode *pDnode) { SRpcInit rpcInit = {0}; rpcInit.label = "DND-C"; - rpcInit.numOfThreads = 4; + rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 55bfb57a82..2b54ce36f5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -653,9 +653,10 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static SCliConn* cliCreateConn(SCliThrd* pThrd) { SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); // read/write stream handle - conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_stream_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; + transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { @@ -1182,7 +1183,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - setThreadName("trans-cli-work"); + setThreadName("trans-cli-worker"); uv_run(pThrd->loop, UV_RUN_DEFAULT); tDebug("thread quit-thread:%08" PRId64, pThrd->pid); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 2759fb5aeb..5a5806417e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -202,7 +202,7 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { - uv_tcp_nodelay(stream, 0); + uv_tcp_nodelay(stream, 1); int ret = uv_tcp_keepalive(stream, 5, 60); return ret; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d00624c4d2..f93eb436a5 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -846,7 +846,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* transWorkerThread(void* arg) { - setThreadName("trans-srv-work"); + setThreadName("trans-svr-worker"); SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); From 90ae53211fbf1a59e06f9ed41bc3ee848c92258a Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 28 Nov 2022 16:50:47 +0800 Subject: [PATCH 03/12] test:add testcase of win ci --- tests/system-test/simpletest.bat | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system-test/simpletest.bat b/tests/system-test/simpletest.bat index cc4ae17955..0365567698 100644 --- a/tests/system-test/simpletest.bat +++ b/tests/system-test/simpletest.bat @@ -7,12 +7,12 @@ python3 .\test.py -f 0-others\taosdMonitor.py @REM python3 .\test.py -f 0-others\udfTest.py @REM python3 .\test.py -f 0-others\udf_create.py @REM python3 .\test.py -f 0-others\udf_restart_taosd.py -@REM python3 .\test.py -f 0-others\cachelast.py +python3 .\test.py -f 0-others\cachelast.py @REM python3 .\test.py -f 0-others\user_control.py @REM python3 .\test.py -f 0-others\fsync.py -@REM python3 .\test.py -f 1-insert\influxdb_line_taosc_insert.py +python3 .\test.py -f 1-insert\influxdb_line_taosc_insert.py @REM python3 .\test.py -f 1-insert\opentsdb_telnet_line_taosc_insert.py @REM python3 .\test.py -f 1-insert\opentsdb_json_taosc_insert.py @REM #python3 .\test.py -f 1-insert\test_stmt_muti_insert_query.py @@ -72,7 +72,7 @@ python3 .\test.py -f 0-others\taosdMonitor.py @REM python3 .\test.py -f 2-query\arcsin.py @REM python3 .\test.py -f 2-query\arccos.py @REM python3 .\test.py -f 2-query\arctan.py -@REM python3 .\test.py -f 2-query\query_cols_tags_and_or.py +python3 .\test.py -f 2-query\query_cols_tags_and_or.py @REM # python3 .\test.py -f 2-query\nestedQuery.py @REM # TD-15983 subquery output duplicate name column. @REM # Please Xiangyang Guo modify the following script @@ -94,7 +94,7 @@ python3 .\test.py -f 0-others\taosdMonitor.py @REM python3 .\test.py -f 7-tmq\subscribeDb.py @REM python3 .\test.py -f 7-tmq\subscribeDb0.py @REM python3 .\test.py -f 7-tmq\subscribeDb1.py -@REM python3 .\test.py -f 7-tmq\subscribeStb.py +python3 .\test.py -f 7-tmq\subscribeStb.py @REM python3 .\test.py -f 7-tmq\subscribeStb0.py @REM python3 .\test.py -f 7-tmq\subscribeStb1.py @REM python3 .\test.py -f 7-tmq\subscribeStb2.py From 2bb59df74be05c7987cf095b5d051623160d0fa9 Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Mon, 28 Nov 2022 17:47:49 +0800 Subject: [PATCH 04/12] fix compile --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2b54ce36f5..fa7c375ee3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -653,10 +653,10 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static SCliConn* cliCreateConn(SCliThrd* pThrd) { SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); // read/write stream handle - conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_stream_t)); + conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - transSetConnOption((uv_tcp_t*)conn->stream); + //transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { From ab87a92f640ce302abe8501f0a973b9103ce58e6 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 28 Nov 2022 17:51:12 +0800 Subject: [PATCH 05/12] test:add win testcase in ci --- tests/system-test/simpletest.bat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/simpletest.bat b/tests/system-test/simpletest.bat index 0365567698..5ae2d3feb3 100644 --- a/tests/system-test/simpletest.bat +++ b/tests/system-test/simpletest.bat @@ -7,7 +7,7 @@ python3 .\test.py -f 0-others\taosdMonitor.py @REM python3 .\test.py -f 0-others\udfTest.py @REM python3 .\test.py -f 0-others\udf_create.py @REM python3 .\test.py -f 0-others\udf_restart_taosd.py -python3 .\test.py -f 0-others\cachelast.py +python3 .\test.py -f 0-others\cachemodel.py @REM python3 .\test.py -f 0-others\user_control.py @REM python3 .\test.py -f 0-others\fsync.py From 25889595d8461b92a162b5310cae71bd53734d55 Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Mon, 28 Nov 2022 18:01:15 +0800 Subject: [PATCH 06/12] fix compile --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fa7c375ee3..a0619def53 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1183,7 +1183,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - setThreadName("trans-cli-worker"); + setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); tDebug("thread quit-thread:%08" PRId64, pThrd->pid); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f93eb436a5..395e28d68f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -846,7 +846,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* transWorkerThread(void* arg) { - setThreadName("trans-svr-worker"); + setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); From 65adb259ff47f61d16efc72d43f7f5df7cd6dbee Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 28 Nov 2022 18:44:59 +0800 Subject: [PATCH 07/12] enh: support multiple groups in exchange operator --- include/libs/parser/parser.h | 2 +- source/client/src/clientImpl.c | 15 +++++----- source/client/src/clientStmt.c | 38 ++++++++++++++++++-------- source/libs/catalog/src/ctgAsync.c | 2 +- source/libs/command/src/explain.c | 19 +++++++++---- source/libs/parser/src/parAstCreater.c | 2 +- source/libs/parser/src/parInsertSql.c | 4 +-- source/util/src/tarray.c | 3 ++ source/util/src/tlog.c | 5 ++-- 9 files changed, 58 insertions(+), 32 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 1a7e6dc748..b72d85243b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -29,7 +29,7 @@ struct SMetaData; typedef struct SStmtCallback { TAOS_STMT* pStmt; int32_t (*getTbNameFn)(TAOS_STMT*, char**); - int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, char*, bool, SHashObj*, SHashObj*, const char*); + int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, SName*, bool, SHashObj*, SHashObj*, const char*); int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**); } SStmtCallback; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3140371c4..2fe53d505f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2215,11 +2215,16 @@ void syncQueryFn(void* param, void* res, int32_t code) { SSyncQueryParam* pParam = param; pParam->pRequest = res; - if (pParam->pRequest) { + if (pParam->pRequest != NULL) { + pParam->pRequest->syncQuery = true; pParam->pRequest->code = code; } tsem_post(&pParam->sem); + + if (NULL == res) { + taosMemoryFree(param); + } } void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) { @@ -2293,9 +2298,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly); tsem_wait(¶m->sem); - if (param->pRequest != NULL) { - param->pRequest->syncQuery = true; - } + return param->pRequest; } @@ -2310,8 +2313,6 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid); tsem_wait(¶m->sem); - if (param->pRequest != NULL) { - param->pRequest->syncQuery = true; - } + return param->pRequest; } diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 86c86d52ab..53c151feeb 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -152,9 +152,12 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, const char* sTableName, bool autoCreateTbl) { +int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName, bool autoCreateTbl) { STscStmt* pStmt = (STscStmt*)stmt; + char tbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(tbName, tbFName); + memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName)); strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1); pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0; @@ -178,11 +181,11 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH return TSDB_CODE_SUCCESS; } -int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl, +int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName) { STscStmt* pStmt = (STscStmt*)stmt; - STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName, sTableName, autoCreateTbl)); + STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl)); STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl)); pStmt->sql.autoCreateTbl = autoCreateTbl; @@ -773,7 +776,9 @@ int stmtAddBatch(TAOS_STMT* stmt) { int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks); - size_t keyLen = 0; + int32_t code = 0; + int32_t finalCode = 0; + size_t keyLen = 0; STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { STableDataBlocks* pBlock = *pIter; @@ -809,10 +814,20 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { } else { tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName); if (NULL == pStmt->pCatalog) { - STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog)); + code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog); + if (code) { + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + finalCode = code; + continue; + } } - STMT_ERR_RET(stmtCreateRequest(pStmt)); + code = stmtCreateRequest(pStmt); + if (code) { + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + finalCode = code; + continue; + } STableMeta* pTableMeta = NULL; SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, @@ -823,10 +838,11 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { taos_free_result(pStmt->exec.pRequest); pStmt->exec.pRequest = NULL; - - if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { - tscDebug("tb %s not exist", pStmt->bInfo.tbFName); - return TSDB_CODE_SUCCESS; + + if (code || NULL == pTableMeta) { + pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); + finalCode = code; + continue; } pMeta->uid = pTableMeta->uid; @@ -836,7 +852,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); } - return TSDB_CODE_SUCCESS; + return finalCode; } int stmtExec(TAOS_STMT* stmt) { diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 2a032de003..7138e0ce16 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1205,7 +1205,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu stbCtx.pName = &stbName; STableMeta* stbMeta = NULL; - ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta); + (void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta); if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) { ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName)); exist = 1; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 915dc08c14..410e62a18b 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -782,13 +782,18 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: { SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; - SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId)); - if (NULL == group) { - qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId); - QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + int32_t nodeNum = 0; + for (int32_t i = pExchNode->srcStartGroupId; i <= pExchNode->srcEndGroupId; ++i) { + SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId)); + if (NULL == group) { + qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId); + QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + nodeNum += group->nodeNum; } - EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->singleChannel ? 1 : group->nodeNum); + EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->singleChannel ? 1 : nodeNum); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); @@ -819,7 +824,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } - QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1, pExchNode->singleChannel)); + for (int32_t i = pExchNode->srcStartGroupId; i <= pExchNode->srcEndGroupId; ++i) { + QRY_ERR_RET(qExplainAppendGroupResRows(ctx, i, level + 1, pExchNode->singleChannel)); + } break; } case QUERY_NODE_PHYSICAL_PLAN_SORT: { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 1db52c123c..6a8c79040d 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1407,7 +1407,7 @@ SNode* createShowTableTagsStmt(SAstCreateContext* pCxt, SNode* pTbName, SNode* p SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo) { CHECK_PARSER_STATUS(pCxt); - char password[TSDB_USET_PASSWORD_LEN] = {0}; + char password[TSDB_USET_PASSWORD_LEN + 3] = {0}; if (!checkUserName(pCxt, pUserName) || !checkPassword(pCxt, pPassword, password)) { return NULL; } diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 9c39954f09..1a87f626c6 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1529,9 +1529,7 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb; - char tbFName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(&pStmt->targetTableName, tbFName); - int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, tbFName, pStmt->usingTableProcessing, + int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 95065972a3..fa39b1004a 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -192,6 +192,9 @@ void* taosArrayPop(SArray* pArray) { } void* taosArrayGet(const SArray* pArray, size_t index) { + if (NULL == pArray) { + return NULL; + } assert(index < pArray->size); return TARRAY_GET_ELEM(pArray, index); } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index a2ce5ac08c..fc9d90c985 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -21,7 +21,7 @@ #define LOG_MAX_LINE_SIZE (1024) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) -#define LOG_MAX_LINE_DUMP_SIZE (65 * 1024) +#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) #define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3) #define LOG_FILE_NAME_LEN 300 @@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons if (!osLogSpaceAvailable()) return; if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return; - char buffer[LOG_MAX_LINE_DUMP_BUFFER_SIZE]; + char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); int32_t len = taosBuildLogHead(buffer, flags); va_list argpointer; @@ -509,6 +509,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons buffer[len] = 0; taosPrintLogImp(level, dflag, buffer, len); + taosMemoryFree(buffer); } void taosDumpData(unsigned char *msg, int32_t len) { From 8daa8f352593998ab32caf9e93ece587bd0cbbdc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 28 Nov 2022 19:04:15 +0800 Subject: [PATCH 08/12] fix: fix table meta memory leak issue --- source/client/src/clientStmt.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 53c151feeb..291e0f6ae3 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -842,11 +842,13 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { if (code || NULL == pTableMeta) { pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); finalCode = code; + taosMemoryFree(pTableMeta); continue; } pMeta->uid = pTableMeta->uid; pStmt->bInfo.tbUid = pTableMeta->uid; + taosMemoryFree(pTableMeta); } pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); From 6e7e02c3cdffcbd47c8da0f1398ba4cc78a68d6b Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Mon, 28 Nov 2022 19:13:11 +0800 Subject: [PATCH 09/12] fix compile --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a0619def53..71cc14493f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -656,7 +656,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - //transSetConnOption((uv_tcp_t*)conn->stream); + transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { From 2bfcabda2730ee2ba81dc35517c6218aa3390c23 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 28 Nov 2022 19:36:58 +0800 Subject: [PATCH 10/12] fix: fix client request param memory leak --- source/client/src/clientImpl.c | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2fe53d505f..55080d5eeb 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2215,16 +2215,11 @@ void syncQueryFn(void* param, void* res, int32_t code) { SSyncQueryParam* pParam = param; pParam->pRequest = res; - if (pParam->pRequest != NULL) { - pParam->pRequest->syncQuery = true; + if (pParam->pRequest) { pParam->pRequest->code = code; } tsem_post(&pParam->sem); - - if (NULL == res) { - taosMemoryFree(param); - } } void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) { @@ -2299,7 +2294,15 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly); tsem_wait(¶m->sem); - return param->pRequest; + SRequestObj *pRequest = NULL; + if (param->pRequest != NULL) { + param->pRequest->syncQuery = true; + pRequest = param->pRequest; + } else { + taosMemoryFree(param); + } + + return pRequest; } TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) { @@ -2314,5 +2317,13 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid); tsem_wait(¶m->sem); - return param->pRequest; + SRequestObj *pRequest = NULL; + if (param->pRequest != NULL) { + param->pRequest->syncQuery = true; + pRequest = param->pRequest; + } else { + taosMemoryFree(param); + } + + return pRequest; } From 3727cedeb3318becf2e725d66964f1b42504cd69 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 28 Nov 2022 20:43:33 +0800 Subject: [PATCH 11/12] change paramter --- source/libs/transport/src/transComm.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5a5806417e..ad8d57c97a 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -202,9 +202,8 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { - uv_tcp_nodelay(stream, 1); - int ret = uv_tcp_keepalive(stream, 5, 60); - return ret; + return uv_tcp_nodelay(stream, 1); + // int ret = uv_tcp_keepalive(stream, 5, 60); } SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { From 9d6f7e978c1225be033d8311b68c75d311d4dae1 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 29 Nov 2022 09:20:33 +0800 Subject: [PATCH 12/12] feat: taosdump for windows (#18529) * feat: taos-tools 2333455 * feat: taosdump for windows * feat: update taos-tools dd298d1 * feat: update taostools d4e4c28 * feat: taosdump for windows d4e4c28 --- cmake/taostools_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 577166353b..e8fb125db7 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 7e9ce09 + GIT_TAG 2aac500 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE