From a2fa41ccc77797da594464221768c43cabc51448 Mon Sep 17 00:00:00 2001 From: Yu Chen <74105241+yu285@users.noreply.github.com> Date: Wed, 9 Oct 2024 09:51:19 +0800 Subject: [PATCH 01/28] Update 01-arch.md --- docs/zh/26-tdinternal/01-arch.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/26-tdinternal/01-arch.md b/docs/zh/26-tdinternal/01-arch.md index 04e47797a8..8aa69e45d5 100644 --- a/docs/zh/26-tdinternal/01-arch.md +++ b/docs/zh/26-tdinternal/01-arch.md @@ -178,7 +178,7 @@ TDengine 集群可以容纳单个、多个甚至几千个数据节点。应用 TDengine 存储的数据包括采集的时序数据以及库、表相关的元数据、标签数据等,这些数据具体分为三部分: -- 时序数据:TDengine 的核心存储对象,存放于 vnode 里,由 data、head 和 last 三个文件组成,数据量大,查询量取决于应用场景。允许乱序写入,但暂时不支持删除操作,并且仅在 update 参数设置为 1 时允许更新操作。通过采用一个采集点一张表的模型,一个时间段的数据是连续存储,对单张表的写入是简单的追加操作,一次读,可以读到多条记录,这样保证对单个采集点的插入和查询操作,性能达到最优。 +- 时序数据:时序数据是 TDengine 的核心存储对象,它们被存储在 vnode 中。时序数据由 data、head、sma 和 stt 4 类文件组成,这些文件共同构成了时序数据的完整存储结构。由于时序数据的特点是数据量大且查询需求取决于具体应用场景,因此 TDengine 采用了“一个数据采集点一张表”的模型来优化存储和查询性能。在这种模型下,一个时间段内的数据是连续存储的,对单张表的写入是简单的追加操作,一次读取可以获取多条记录。这种设计确保了单个数据采集点的写入和查询操作都能达到最优性能。 - 数据表元数据:包含标签信息和 Table Schema 信息,存放于 vnode 里的 meta 文件,支持增删改查四个标准操作。数据量很大,有 N 张表,就有 N 条记录,因此采用 LRU 存储,支持标签数据的索引。TDengine 支持多核多线程并发查询。只要计算内存足够,元数据全内存存储,千万级别规模的标签数据过滤结果能毫秒级返回。在内存资源不足的情况下,仍然可以支持数千万张表的快速查询。 - 数据库元数据:存放于 mnode 里,包含系统节点、用户、DB、STable Schema 等信息,支持增删改查四个标准操作。这部分数据的量不大,可以全内存保存,而且由于客户端有缓存,查询量也不大。因此目前的设计虽是集中式存储管理,但不会构成性能瓶颈。 @@ -321,4 +321,4 @@ TDengine 采用了一种数据驱动的策略来实现缓存数据的持久化 此外,TDengine 还提供了数据分级存储的功能,允许用户将不同时间段的数据存储在不同存储设备的目录中,以此实现将“热”数据和“冷”数据分开存储。这样做可以充分利用各种存储资源,同时节约成本。例如,对于最新采集且需要频繁访问的数据,由于其读取性能要求较高,用户可以配置将这些数据存储在高性能的固态硬盘上。而对于超过一定期限、查询需求较低的数据,则可以将其存储在成本相对较低的机械硬盘上。 -为了进一步降低存储成本,TDengine 还支持将时序数据存储在对象存储系统中。通过其创新性的设计,在大多数情况下,从对象存储系统中查询时序数据的性能接近本地硬盘的一半,而在某些场景下,性能甚至可以与本地硬盘相媲美。同时,TDengine 还允许用户对存储在对象存储中的时序数据执行删除和更新操作。 \ No newline at end of file +为了进一步降低存储成本,TDengine 还支持将时序数据存储在对象存储系统中。通过其创新性的设计,在大多数情况下,从对象存储系统中查询时序数据的性能接近本地硬盘的一半,而在某些场景下,性能甚至可以与本地硬盘相媲美。同时,TDengine 还允许用户对存储在对象存储中的时序数据执行删除和更新操作。 From 9a3184578dc0c352aa96bbb22179881a8aeae2dc Mon Sep 17 00:00:00 2001 From: Yu Chen <74105241+yu285@users.noreply.github.com> Date: Wed, 23 Oct 2024 09:26:28 +0800 Subject: [PATCH 02/28] docs:Update 07-supported.md --- docs/zh/14-reference/07-supported.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/07-supported.md b/docs/zh/14-reference/07-supported.md index 10ca237653..fcb20b8ac1 100644 --- a/docs/zh/14-reference/07-supported.md +++ b/docs/zh/14-reference/07-supported.md @@ -14,7 +14,7 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" | M1 | | | | | | | | ● | 注:1) ● 表示经过官方测试验证, ○ 表示非官方测试验证,E 表示仅企业版支持。 - 2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/RedHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。 + 2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/CentOS Stream/RedHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。 ## TDengine 客户端和连接器支持的平台列表 From d4b0550a01fc787d8e92c84caafc2f4145ab3430 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 29 Oct 2024 20:04:37 +0800 Subject: [PATCH 03/28] fix double free --- source/libs/transport/src/transCli.c | 34 +++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c3e214b5e3..4887240f5d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1300,12 +1300,22 @@ static void cliBatchSendCb(uv_write_t* req, int status) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; + while (!QUEUE_IS_EMPTY(&wrapper->node)) { + queue* h = QUEUE_HEAD(&wrapper->node); + QUEUE_REMOVE(h); + + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); + transQueuePush(&conn->reqsSentOut, &pReq->q); + } + QUEUE_INIT(&wrapper->node); + freeWReqToWQ(&conn->wq, wrapper); int32_t ref = transUnrefCliHandle(conn); if (ref <= 0) { return; } + cliConnRmReqs(conn); if (status != 0) { tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); @@ -1398,6 +1408,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { int j = 0; int32_t batchLimit = 64; + + queue reqToSend; + QUEUE_INIT(&reqToSend); + while (!transQueueEmpty(&pConn->reqsToSend)) { queue* h = transQueuePop(&pConn->reqsToSend); SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q); @@ -1453,24 +1467,42 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); + + QUEUE_PUSH(&reqToSend, &pCliMsg->q); if (j >= batchLimit) { break; } } transRefCliHandle(pConn); uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn); + if (req == NULL) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); + while (!QUEUE_IS_EMPTY(&reqToSend)) { + queue* h = QUEUE_HEAD(&reqToSend); + QUEUE_REMOVE(h); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); + transQueuePush(&pConn->reqsToSend, &pReq->q); + } transRefCliHandle(pConn); return terrno; } + SWReqsWrapper* pWreq = req->data; + QUEUE_MOVE(&reqToSend, &pWreq->node); tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen); int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); if (ret != 0) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); + while (!QUEUE_IS_EMPTY(&pWreq->node)) { + queue* h = QUEUE_HEAD(&pWreq->node); + QUEUE_REMOVE(h); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); + transQueuePush(&pConn->reqsToSend, &pReq->q); + } + QUEUE_INIT(&pWreq->node); + freeWReqToWQ(&pConn->wq, req->data); code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_UNUSED(transUnrefCliHandle(pConn)); From f0ec5736113153a0618f532611373c0b63c50b4d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 29 Oct 2024 20:23:28 +0800 Subject: [PATCH 04/28] fix double free --- source/libs/transport/src/transCli.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4887240f5d..eda45f3d6b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1307,8 +1307,6 @@ static void cliBatchSendCb(uv_write_t* req, int status) { SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); transQueuePush(&conn->reqsSentOut, &pReq->q); } - QUEUE_INIT(&wrapper->node); - freeWReqToWQ(&conn->wq, wrapper); int32_t ref = transUnrefCliHandle(conn); @@ -1501,8 +1499,6 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); transQueuePush(&pConn->reqsToSend, &pReq->q); } - QUEUE_INIT(&pWreq->node); - freeWReqToWQ(&pConn->wq, req->data); code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_UNUSED(transUnrefCliHandle(pConn)); From 1dfad47bfa91ce403ec769be51ee19b48f0fc0cf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 10:01:23 +0800 Subject: [PATCH 05/28] fix quit error --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index eda45f3d6b..8255956f62 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1246,8 +1246,8 @@ static void cliHandleException(SCliConn* conn) { } cliDestroyAllQidFromThrd(conn); - QUEUE_REMOVE(&conn->q); - if (conn->list) { + if (pThrd->quit == false && conn->list) { + QUEUE_REMOVE(&conn->q); conn->list->totalSize -= 1; conn->list = NULL; } From a3b797031d3799e11be0555b223ca4a2a6833dc6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 10:23:28 +0800 Subject: [PATCH 06/28] fix quit error --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8255956f62..9964750d17 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -3142,7 +3142,7 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe if (ctx != NULL) pCtx->userCtx = *ctx; pCliReq = taosMemoryCalloc(1, sizeof(SCliReq)); - if (pReq == NULL) { + if (pCliReq == NULL) { TAOS_CHECK_GOTO(terrno, NULL, _exception); } From 542a6d3672adffa0a985ea905f85700f44a66098 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 10:33:50 +0800 Subject: [PATCH 07/28] opt transport quit --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transComm.c | 2 ++ 3 files changed, 4 insertions(+) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d835d12c79..5c79b379ed 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -353,6 +353,7 @@ typedef struct { queue node; void (*freeFunc)(void* arg); int32_t size; + int8_t inited; } STransQueue; /* diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9964750d17..b5bdbd5ada 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1092,6 +1092,7 @@ _failed: transQueueDestroy(&conn->reqsToSend); transQueueDestroy(&conn->reqsSentOut); taosMemoryFree(conn->dstAddr); + taosMemoryFree(conn->ipStr); } tError("failed to create conn, code:%d", code); taosMemoryFree(conn); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 35ca6678b8..66bd4a08f3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -423,6 +423,7 @@ int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) { QUEUE_INIT(&wq->node); wq->freeFunc = (void (*)(void*))freeFunc; wq->size = 0; + wq->inited = 1; return 0; } void transQueuePush(STransQueue* q, void* arg) { @@ -497,6 +498,7 @@ void transQueueRemove(STransQueue* q, void* e) { bool transQueueEmpty(STransQueue* q) { return q->size == 0 ? true : false; } void transQueueClear(STransQueue* q) { + if (q->inited == 0) return; while (!QUEUE_IS_EMPTY(&q->node)) { queue* h = QUEUE_HEAD(&q->node); QUEUE_REMOVE(h); From 69b36fee446ffcd0c14b20a0126878c361ccb1a6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 10:58:47 +0800 Subject: [PATCH 08/28] handle mem failure --- source/libs/transport/src/transCli.c | 7 +++++++ source/libs/transport/src/transSvr.c | 7 ++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b5bdbd5ada..c11b85ec87 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1349,6 +1349,9 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg } STransMsgHead* pHead = *ppHead; STransMsgHead* tHead = taosMemoryCalloc(1, *msgLen + sizeof(pInst->user)); + if (tHead == NULL) { + return false; + } memcpy((char*)tHead, (char*)pHead, TRANS_MSG_OVERHEAD); memcpy((char*)tHead + TRANS_MSG_OVERHEAD, pInst->user, sizeof(pInst->user)); @@ -1435,6 +1438,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { if (cliConnMayAddUserInfo(pConn, &pHead, &msgLen)) { content = transContFromHead(pHead); contLen = transContLenFromMsg(msgLen); + } else { + if (pConn->userInited == 0) { + return terrno; + } } if (pHead->comp == 0) { pHead->noResp = REQUEST_NO_RESP(pReq) ? 1 : 0; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 5723f2ff23..d02bfb8281 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1289,8 +1289,8 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { int32_t code = 0; SWorkThrd* pThrd = hThrd; int32_t lino; - - SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); + int8_t wqInited = 0; + SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); if (pConn == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } @@ -1340,6 +1340,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { code = initWQ(&pConn->wq); TAOS_CHECK_GOTO(code, &lino, _end); + wqInited = 1; // init client handle pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); @@ -1372,7 +1373,7 @@ _end: transDestroyBuffer(&pConn->readBuf); taosHashCleanup(pConn->pQTable); taosMemoryFree(pConn->pTcp); - destroyWQ(&pConn->wq); + if (wqInited) destroyWQ(&pConn->wq); taosMemoryFree(pConn->buf); taosMemoryFree(pConn); pConn = NULL; From 52f9ea59639f8cd6851f3af3d1226906b17244fe Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 11:06:09 +0800 Subject: [PATCH 09/28] handle mem failure --- source/libs/transport/src/transCli.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c11b85ec87..a2db392dcc 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -3219,6 +3219,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* return TSDB_CODE_INVALID_PARA; } int32_t code = 0; + int8_t transIdInited = 0; STrans* pInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)pInstRef); if (pInst == NULL) { @@ -3236,6 +3237,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* if (exh == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_RPC_MODULE_QUIT, NULL, _exception); } + transIdInited = 1; pReq->info.handle = (void*)(*transpointId); pReq->info.qId = *transpointId; @@ -3252,9 +3254,6 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } - // if (pReq->msgType == TDMT_SCH_DROP_TASK) { - // TAOS_UNUSED(transReleaseCliHandle(pReq->info.handle)); - // } transReleaseExHandle(transGetRefMgt(), *transpointId); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return 0; @@ -3262,6 +3261,7 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* _exception: transFreeMsg(pReq->pCont); pReq->pCont = NULL; + if (transIdInited) transReleaseExHandle(transGetRefMgt(), *transpointId); transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); tError("failed to send request since %s", tstrerror(code)); From 9f9b22e436a4bd1e68d79ad7dcfd4a2084322b82 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 11:56:24 +0800 Subject: [PATCH 10/28] handle quit error --- source/libs/transport/src/transCli.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a2db392dcc..8a1710b609 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -905,6 +905,10 @@ static void addConnToPool(void* pool, SCliConn* conn) { } SCliThrd* thrd = conn->hostThrd; + if (thrd->quit == false) { + return; + } + cliResetConnTimer(conn); if (conn->list == NULL && conn->dstAddr != NULL) { conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); From 989457969ede3f425216074af5f15c7a79950851 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 11:56:36 +0800 Subject: [PATCH 11/28] handle quit error --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8a1710b609..f40ff3eee2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -905,7 +905,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { } SCliThrd* thrd = conn->hostThrd; - if (thrd->quit == false) { + if (thrd->quit == true) { return; } From 5ba90c2388770d5727774dd5810fee4c935f927c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 13:35:34 +0800 Subject: [PATCH 12/28] handle quit error --- source/libs/transport/src/transCli.c | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f40ff3eee2..36fc9a7846 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -127,6 +127,7 @@ typedef struct { typedef struct SCliReq { SReqCtx* ctx; queue q; + queue sendQ; STransMsgType type; uint64_t st; int64_t seq; @@ -1309,8 +1310,8 @@ static void cliBatchSendCb(uv_write_t* req, int status) { queue* h = QUEUE_HEAD(&wrapper->node); QUEUE_REMOVE(h); - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transQueuePush(&conn->reqsSentOut, &pReq->q); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ); + pReq->sent = 1; } freeWReqToWQ(&conn->wq, wrapper); @@ -1471,14 +1472,16 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { wb[j++] = uv_buf_init((char*)pHead, msgLen); totalLen += msgLen; - pCliMsg->sent = 1; pCliMsg->seq = pConn->seq; STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - QUEUE_PUSH(&reqToSend, &pCliMsg->q); + // QUEUE_PUSH(&reqToSend, &pCliMsg->q); + transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); + QUEUE_INIT(&pCliMsg->sendQ); + QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ); if (j >= batchLimit) { break; } @@ -1488,29 +1491,18 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { if (req == NULL) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); - while (!QUEUE_IS_EMPTY(&reqToSend)) { - queue* h = QUEUE_HEAD(&reqToSend); - QUEUE_REMOVE(h); - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transQueuePush(&pConn->reqsToSend, &pReq->q); - } transRefCliHandle(pConn); return terrno; } SWReqsWrapper* pWreq = req->data; + QUEUE_MOVE(&reqToSend, &pWreq->node); tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen); int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); if (ret != 0) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); - while (!QUEUE_IS_EMPTY(&pWreq->node)) { - queue* h = QUEUE_HEAD(&pWreq->node); - QUEUE_REMOVE(h); - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transQueuePush(&pConn->reqsToSend, &pReq->q); - } freeWReqToWQ(&pConn->wq, req->data); code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_UNUSED(transUnrefCliHandle(pConn)); From 4b21fd866df6ea3a95659a9e724cac5ce747218e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Oct 2024 15:21:32 +0800 Subject: [PATCH 13/28] doc: update os support info --- docs/zh/14-reference/07-supported.md | 47 ++++++++++++++++++---------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/docs/zh/14-reference/07-supported.md b/docs/zh/14-reference/07-supported.md index 10ca237653..828c8717e2 100644 --- a/docs/zh/14-reference/07-supported.md +++ b/docs/zh/14-reference/07-supported.md @@ -6,12 +6,25 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" ## TDengine 服务端支持的平台列表 -| | **Windows server 2016/2019** | **Windows 10/11** | **CentOS 7.9/8** | **Ubuntu 18 以上** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **macOS** | -| ------------ | ---------------------------- | ----------------- | ---------------- | ------------------ | ------------ | ----------------- | ---------------- | --------- | -| X64 | ●/E | ●/E | ● | ● | ●/E | ●/E | ●/E | ● | -| 树莓派 ARM64 | | | ● | | | | | | -| 华为云 ARM64 | | | | ● | | | | | -| M1 | | | | | | | | ● | +| | **版本** | **X64 64bit** | **ARM64** | **M1** | +| ----------------------|----------------| ------------- | --------- | ------ | +| **CentOS** | **7.9 以上** | ● | ● | | +| **Ubuntu** | **18 以上** | ● | ● | | +| **RedHat** | **RHEL 6 以上** | ● | ● | | +| **Debian** | **6.0 以上** | ● | ● | | +| **FreeBSD** | **12 以上** | ● | ● | | +| **OpenSUSE** | **全部版本** | ● | ● | | +| **SUSE Linux** | **11 以上** | ● | ● | | +| **Fedora** | **21 以上** | ● | ● | | +| **Windows Server** | **2016/2019** | ●/E | | | +| **Windows** | **10/11** | ●/E | | | +| **银河麒麟** | **V10 以上** | ●/E | | | +| **中标麒麟** | **V7.0 以上** | ●/E | | | +| **通信 UOS** | **V20 以上** | ●/E | | | +| **凝思磐石** | **V6.0 以上** | ●/E | | | +| **华为欧拉 openEuler** | **V20.03 以上** | ●/E | | | +| **龙蜥 Anolis OS** | **V8.6 以上** | ●/E | | | +| **macOS** | **11.0 以上** | | | ● | 注:1) ● 表示经过官方测试验证, ○ 表示非官方测试验证,E 表示仅企业版支持。 2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/RedHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。 @@ -22,16 +35,16 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" 对照矩阵如下: -| **CPU** | **X64 64bit** | **X64 64bit** | **ARM64** | **X64 64bit** | **ARM64** | -| ----------- | ------------- | ------------- | --------- | ------------- | --------- | -| **OS** | **Linux** | **Win64** | **Linux** | **macOS** | **macOS** | -| **C/C++** | ● | ● | ● | ● | ● | -| **JDBC** | ● | ● | ● | ○ | ○ | -| **Python** | ● | ● | ● | ● | ● | -| **Go** | ● | ● | ● | ● | ● | -| **NodeJs** | ● | ● | ● | ○ | ○ | -| **C#** | ● | ● | ○ | ○ | ○ | -| **Rust** | ● | ● | ○ | ● | ● | -| **RESTful** | ● | ● | ● | ● | ● | +| **CPU** | **X64 64bit** | **X64 64bit** | **X64 64bit** | **ARM64** | **ARM64** | +| ----------- | ------------- | ------------- | ------------- | --------- | --------- | +| **OS** | **Linux** | **Win64** | **macOS** | **Linux** | **macOS** | +| **C/C++** | ● | ● | ● | ● | ● | +| **JDBC** | ● | ● | ○ | ● | ○ | +| **Python** | ● | ● | ● | ● | ● | +| **Go** | ● | ● | ● | ● | ● | +| **NodeJs** | ● | ● | ○ | ● | ○ | +| **C#** | ● | ● | ○ | ○ | ○ | +| **Rust** | ● | ● | ● | ○ | ● | +| **RESTful** | ● | ● | ● | ● | ● | 注:● 表示官方测试验证通过,○ 表示非官方测试验证通过,-- 表示未经验证。 From 65f11a24633d7457e41aec32c6a81be5e79b0c09 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Oct 2024 15:38:29 +0800 Subject: [PATCH 14/28] doc: minor changes --- docs/zh/14-reference/07-supported.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/zh/14-reference/07-supported.md b/docs/zh/14-reference/07-supported.md index 828c8717e2..96d12311be 100644 --- a/docs/zh/14-reference/07-supported.md +++ b/docs/zh/14-reference/07-supported.md @@ -18,10 +18,10 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" | **Fedora** | **21 以上** | ● | ● | | | **Windows Server** | **2016/2019** | ●/E | | | | **Windows** | **10/11** | ●/E | | | -| **银河麒麟** | **V10 以上** | ●/E | | | -| **中标麒麟** | **V7.0 以上** | ●/E | | | +| **银河麒麟** | **V10 以上** | ●/E | ●/E | | +| **中标麒麟** | **V7.0 以上** | ●/E | ●/E | | | **通信 UOS** | **V20 以上** | ●/E | | | -| **凝思磐石** | **V6.0 以上** | ●/E | | | +| **凝思磐石** | **V8.0 以上** | ●/E | | | | **华为欧拉 openEuler** | **V20.03 以上** | ●/E | | | | **龙蜥 Anolis OS** | **V8.6 以上** | ●/E | | | | **macOS** | **11.0 以上** | | | ● | From 016c4fa9d7009b00e5231cb9c55da2d53e28e62d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Oct 2024 16:14:14 +0800 Subject: [PATCH 15/28] doc: update minor errors --- docs/zh/14-reference/07-supported.md | 38 ++++++++++++++-------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/docs/zh/14-reference/07-supported.md b/docs/zh/14-reference/07-supported.md index 96d12311be..a062257143 100644 --- a/docs/zh/14-reference/07-supported.md +++ b/docs/zh/14-reference/07-supported.md @@ -6,25 +6,25 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" ## TDengine 服务端支持的平台列表 -| | **版本** | **X64 64bit** | **ARM64** | **M1** | -| ----------------------|----------------| ------------- | --------- | ------ | -| **CentOS** | **7.9 以上** | ● | ● | | -| **Ubuntu** | **18 以上** | ● | ● | | -| **RedHat** | **RHEL 6 以上** | ● | ● | | -| **Debian** | **6.0 以上** | ● | ● | | -| **FreeBSD** | **12 以上** | ● | ● | | -| **OpenSUSE** | **全部版本** | ● | ● | | -| **SUSE Linux** | **11 以上** | ● | ● | | -| **Fedora** | **21 以上** | ● | ● | | -| **Windows Server** | **2016/2019** | ●/E | | | -| **Windows** | **10/11** | ●/E | | | -| **银河麒麟** | **V10 以上** | ●/E | ●/E | | -| **中标麒麟** | **V7.0 以上** | ●/E | ●/E | | -| **通信 UOS** | **V20 以上** | ●/E | | | -| **凝思磐石** | **V8.0 以上** | ●/E | | | -| **华为欧拉 openEuler** | **V20.03 以上** | ●/E | | | -| **龙蜥 Anolis OS** | **V8.6 以上** | ●/E | | | -| **macOS** | **11.0 以上** | | | ● | +| | **版本** | **X64 64bit** | **ARM64** | +| ----------------------|----------------| ------------- | --------- | +| **CentOS** | **7.9 以上** | ● | ● | +| **Ubuntu** | **18 以上** | ● | ● | +| **RedHat** | **RHEL 7 以上** | ● | ● | +| **Debian** | **6.0 以上** | ● | ● | +| **FreeBSD** | **12 以上** | ● | ● | +| **OpenSUSE** | **全部版本** | ● | ● | +| **SUSE Linux** | **11 以上** | ● | ● | +| **Fedora** | **21 以上** | ● | ● | +| **Windows Server** | **2016 以上** | ●/E | | +| **Windows** | **10/11** | ●/E | | +| **银河麒麟** | **V10 以上** | ●/E | ●/E | +| **中标麒麟** | **V7.0 以上** | ●/E | ●/E | +| **统信 UOS** | **V20 以上** | ●/E | | +| **凝思磐石** | **V8.0 以上** | ●/E | | +| **华为欧拉 openEuler** | **V20.03 以上** | ●/E | | +| **龙蜥 Anolis OS** | **V8.6 以上** | ●/E | | +| **macOS** | **11.0 以上** | | ● | 注:1) ● 表示经过官方测试验证, ○ 表示非官方测试验证,E 表示仅企业版支持。 2) 社区版仅支持主流操作系统的较新版本,包括 Ubuntu 18+/CentOS 7+/RedHat/Debian/CoreOS/FreeBSD/OpenSUSE/SUSE Linux/Fedora/macOS 等。如果有其他操作系统及版本的需求,请联系企业版支持。 From 6aba3e9cac2a2de018161c82376e78a2bceea996 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Oct 2024 16:17:53 +0800 Subject: [PATCH 16/28] doc: minor changes --- docs/zh/14-reference/07-supported.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/07-supported.md b/docs/zh/14-reference/07-supported.md index a062257143..f98f96755a 100644 --- a/docs/zh/14-reference/07-supported.md +++ b/docs/zh/14-reference/07-supported.md @@ -43,7 +43,7 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" | **Python** | ● | ● | ● | ● | ● | | **Go** | ● | ● | ● | ● | ● | | **NodeJs** | ● | ● | ○ | ● | ○ | -| **C#** | ● | ● | ○ | ○ | ○ | +| **C#** | ● | ● | ○ | ● | ○ | | **Rust** | ● | ● | ● | ○ | ● | | **RESTful** | ● | ● | ● | ● | ● | From 57743282dbdeef25e8760018190bb9dae8d1e705 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 16:23:57 +0800 Subject: [PATCH 17/28] handle quit error --- source/libs/transport/src/transCli.c | 38 ++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 36fc9a7846..6f843d7fc3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -132,6 +132,7 @@ typedef struct SCliReq { uint64_t st; int64_t seq; int32_t sent; //(0: no send, 1: alread sent) + int8_t inSendQ; STransMsg msg; int8_t inRetry; @@ -275,6 +276,8 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx); +static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq); + static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq); static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead); static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp); @@ -701,6 +704,7 @@ void cliHandleResp(SCliConn* conn) { tstrerror(code)); } } + removeReqFromSendQ(pReq); code = cliBuildRespFromCont(pReq, &resp, pHead); STraceId* trace = &resp.info.traceId; @@ -1279,7 +1283,7 @@ static void cliHandleException(SCliConn* conn) { bool filterToRmReq(void* h, void* arg) { queue* el = h; SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); - if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { + if (pReq->sent == 1 && pReq->inSendQ == 0 && REQUEST_NO_RESP(&pReq->msg)) { return true; } return false; @@ -1311,7 +1315,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) { QUEUE_REMOVE(h); SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ); - pReq->sent = 1; + pReq->inSendQ = 0; } freeWReqToWQ(&conn->wq, wrapper); @@ -1473,6 +1477,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { totalLen += msgLen; pCliMsg->seq = pConn->seq; + pCliMsg->inSendQ = 1; STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), @@ -1482,6 +1487,8 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); QUEUE_INIT(&pCliMsg->sendQ); QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ); + + pCliMsg->inSendQ = 1; if (j >= batchLimit) { break; } @@ -1491,6 +1498,14 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { if (req == NULL) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); + while (!QUEUE_IS_EMPTY(&reqToSend)) { + queue* h = QUEUE_HEAD(&reqToSend); + QUEUE_REMOVE(h); + + SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ); + pCliMsg->inSendQ = 0; + } + transRefCliHandle(pConn); return terrno; } @@ -1503,6 +1518,14 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); if (ret != 0) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); + while (!QUEUE_IS_EMPTY(&pWreq->node)) { + queue* h = QUEUE_HEAD(&pWreq->node); + QUEUE_REMOVE(h); + + SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ); + pCliMsg->inSendQ = 0; + } + freeWReqToWQ(&pConn->wq, req->data); code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_UNUSED(transUnrefCliHandle(pConn)); @@ -2214,11 +2237,21 @@ static void cliAsyncCb(uv_async_t* handle) { if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd, pThrd->stopMsg); } +static FORCE_INLINE void removeReqFromSendQ(SCliReq* pReq) { + if (pReq == NULL || pReq->inSendQ == 0) { + return; + } + QUEUE_REMOVE(&pReq->sendQ); + pReq->inSendQ = 0; +} + static FORCE_INLINE void destroyReq(void* arg) { SCliReq* pReq = arg; if (pReq == NULL) { return; } + + removeReqFromSendQ(pReq); STraceId* trace = &pReq->msg.info.traceId; tGDebug("free memory:%p, free ctx: %p", pReq, pReq->ctx); @@ -2993,6 +3026,7 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { STrans* pInst = pThrd->pInst; if (pReq != NULL) { + removeReqFromSendQ(pReq); if (pResp->code != TSDB_CODE_SUCCESS) { if (cliMayRetry(pConn, pReq, pResp)) { return TSDB_CODE_RPC_ASYNC_IN_PROCESS; From 8d4813590fcb3e6d5a1a1420686cc87d7b19f32e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 16:27:44 +0800 Subject: [PATCH 18/28] handle quit error --- source/libs/transport/src/transCli.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 6f843d7fc3..9d4608c099 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1477,13 +1477,12 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { totalLen += msgLen; pCliMsg->seq = pConn->seq; - pCliMsg->inSendQ = 1; + pCliMsg->sent = 1; STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - // QUEUE_PUSH(&reqToSend, &pCliMsg->q); transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); QUEUE_INIT(&pCliMsg->sendQ); QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ); @@ -1500,10 +1499,9 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); while (!QUEUE_IS_EMPTY(&reqToSend)) { queue* h = QUEUE_HEAD(&reqToSend); - QUEUE_REMOVE(h); SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ); - pCliMsg->inSendQ = 0; + removeReqFromSendQ(pCliMsg); } transRefCliHandle(pConn); @@ -1519,11 +1517,9 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { if (ret != 0) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); while (!QUEUE_IS_EMPTY(&pWreq->node)) { - queue* h = QUEUE_HEAD(&pWreq->node); - QUEUE_REMOVE(h); - + queue* h = QUEUE_HEAD(&pWreq->node); SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ); - pCliMsg->inSendQ = 0; + removeReqFromSendQ(pCliMsg); } freeWReqToWQ(&pConn->wq, req->data); From 70c9d9847f8a56d46a7a51ab77183671b855dcf5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 16:28:26 +0800 Subject: [PATCH 19/28] handle quit error --- source/libs/transport/src/transCli.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9d4608c099..02bc6cd2d5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1498,8 +1498,7 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { if (req == NULL) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); while (!QUEUE_IS_EMPTY(&reqToSend)) { - queue* h = QUEUE_HEAD(&reqToSend); - + queue* h = QUEUE_HEAD(&reqToSend); SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, sendQ); removeReqFromSendQ(pCliMsg); } From a191e9993a4a309e5be89249ddd085a69dc2a24d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 16:30:46 +0800 Subject: [PATCH 20/28] handle quit error --- source/libs/transport/src/transCli.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 02bc6cd2d5..1e4276c438 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1311,11 +1311,9 @@ static void cliBatchSendCb(uv_write_t* req, int status) { STrans* pInst = pThrd->pInst; while (!QUEUE_IS_EMPTY(&wrapper->node)) { - queue* h = QUEUE_HEAD(&wrapper->node); - QUEUE_REMOVE(h); - + queue* h = QUEUE_HEAD(&wrapper->node); SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ); - pReq->inSendQ = 0; + removeReqFromSendQ(pReq); } freeWReqToWQ(&conn->wq, wrapper); From 296eb9a14665247838a9166a43651507924a1d50 Mon Sep 17 00:00:00 2001 From: Yu Chen <74105241+yu285@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:11:20 +0800 Subject: [PATCH 21/28] =?UTF-8?q?docs=EF=BC=9AUpdate=20get-started=20descr?= =?UTF-8?q?iption?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/zh/04-get-started/03-package.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/zh/04-get-started/03-package.md b/docs/zh/04-get-started/03-package.md index 2a1f594b4f..cef3cb6943 100644 --- a/docs/zh/04-get-started/03-package.md +++ b/docs/zh/04-get-started/03-package.md @@ -14,7 +14,9 @@ TDengine 完整的软件包包括服务端(taosd)、应用驱动(taosc) 为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 Lite 版本的安装包。 -在 Linux 系统上,TDengine 社区版提供 Deb 和 RPM 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 Deb 支持 Debian/Ubuntu 及其衍生系统,RPM 支持 CentOS/RHEL/SUSE 及其衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是,RPM 和 Deb 包不含 `taosdump` 和 TDinsight 安装脚本,这些工具需要通过安装 taosTools 包获得。TDengine 也提供 Windows x64 平台和 macOS x64/m1 平台的安装包。 +在 Linux 系统上,TDengine 社区版提供 Deb 和 RPM 格式安装包,其中 Deb 支持 Debian/Ubuntu 及其衍生系统,RPM 支持 CentOS/RHEL/SUSE 及其衍生系统,用户可以根据自己的运行环境自行选择。同时我们也提供了 tar.gz 格式安装包,以及 `apt-get` 工具从线上进行安装。 + +此外,TDengine 也提供 macOS x64/m1 平台的 pkg 安装包。 ## 运行环境要求 在linux系统中,运行环境最低要求如下: @@ -317,4 +319,4 @@ SELECT AVG(current), MAX(voltage), MIN(phase) FROM test.meters WHERE groupId = 1 SELECT _wstart, AVG(current), MAX(voltage), MIN(phase) FROM test.d1001 INTERVAL(10s); ``` -在上面的查询中,使用系统提供的伪列_wstart 来给出每个窗口的开始时间。 \ No newline at end of file +在上面的查询中,使用系统提供的伪列_wstart 来给出每个窗口的开始时间。 From 456ed98e1707361406e9136f2c4b7a4cbf8b0415 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 17:36:55 +0800 Subject: [PATCH 22/28] handle quit error --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1e4276c438..b3f325c5b8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -604,7 +604,7 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead queue* el = QUEUE_HEAD(&set); QUEUE_REMOVE(el); SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); - + removeReqFromSendQ(pReq); STraceId* trace = &pReq->msg.info.traceId; tGDebug("start to free msg %p", pReq); destroyReqWrapper(pReq, pThrd); From 68be4a7c3baab3969dd63bd1d0e414b6a2556880 Mon Sep 17 00:00:00 2001 From: Yu Chen <74105241+yu285@users.noreply.github.com> Date: Wed, 18 Sep 2024 16:13:04 +0800 Subject: [PATCH 23/28] Update 14-stream.md --- docs/zh/14-reference/03-taos-sql/14-stream.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/zh/14-reference/03-taos-sql/14-stream.md b/docs/zh/14-reference/03-taos-sql/14-stream.md index cd5c76a4ad..6baad76e23 100644 --- a/docs/zh/14-reference/03-taos-sql/14-stream.md +++ b/docs/zh/14-reference/03-taos-sql/14-stream.md @@ -212,11 +212,11 @@ TDengine 对于修改数据提供两种处理方式,由 IGNORE UPDATE 选项 ```sql [field1_name,...] ``` -用来指定stb_name的列与subquery输出结果的对应关系。如果stb_name的列与subquery输出结果的位置、数量全部匹配,则不需要显示指定对应关系。如果stb_name的列与subquery输出结果的数据类型不匹配,会把subquery输出结果的类型转换成对应的stb_name的列的类型。 +在本页文档顶部的 [field1_name,...] 是用来指定 stb_name 的列与 subquery 输出结果的对应关系的。如果 stb_name 的列与 subquery 输出结果的位置、数量全部匹配,则不需要显示指定对应关系。如果 stb_name 的列与 subquery 输出结果的数据类型不匹配,会把 subquery 输出结果的类型转换成对应的 stb_name 的列的类型。 对于已经存在的超级表,检查列的schema信息 -1. 检查列的schema信息是否匹配,对于不匹配的,则自动进行类型转换,当前只有数据长度大于4096byte时才报错,其余场景都能进行类型转换。 -2. 检查列的个数是否相同,如果不同,需要显示的指定超级表与subquery的列的对应关系,否则报错;如果相同,可以指定对应关系,也可以不指定,不指定则按位置顺序对应。 +1. 检查列的 schema 信息是否匹配,对于不匹配的,则自动进行类型转换,当前只有数据长度大于 4096byte 时才报错,其余场景都能进行类型转换。 +2. 检查列的个数是否相同,如果不同,需要显示的指定超级表与 subquery 的列的对应关系,否则报错;如果相同,可以指定对应关系,也可以不指定,不指定则按位置顺序对应。 ## 自定义TAG From 97c9dba167a629efea6ba0abc47109c0c1a80312 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 17:51:59 +0800 Subject: [PATCH 24/28] add test case --- source/libs/transport/test/CMakeLists.txt | 19 + source/libs/transport/test/cliBench.c | 54 ++- source/libs/transport/test/svrBench.c | 38 +- source/libs/transport/test/transUT.cpp | 30 +- source/libs/transport/test/transUT2.cpp | 529 ++++++++++++++++++++++ 5 files changed, 602 insertions(+), 68 deletions(-) create mode 100644 source/libs/transport/test/transUT2.cpp diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index e68e93c48e..3a5213a726 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -1,5 +1,6 @@ add_executable(transportTest "") add_executable(transUT "") +add_executable(transUT2 "") add_executable(svrBench "") add_executable(cliBench "") add_executable(httpBench "") @@ -9,6 +10,10 @@ target_sources(transUT "transUT.cpp" ) +target_sources(transUT2 + PRIVATE + "transUT2.cpp" +) target_sources(transportTest PRIVATE "transportTests.cpp" @@ -56,6 +61,20 @@ target_include_directories(transUT "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_link_libraries (transUT2 + os + util + common + gtest_main + transport +) + +target_include_directories(transUT2 + PUBLIC + "${TD_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + target_include_directories(svrBench PUBLIC "${TD_SOURCE_DIR}/include/libs/transport" diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index e73c209d55..6330b2d2c6 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -53,8 +53,6 @@ static void processResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); - if (pEpSet) pInfo->epSet = *pEpSet; - rpcFreeCont(pMsg->pCont); tsem_post(&pInfo->rspSem); } @@ -72,12 +70,12 @@ static void *sendRequest(void *param) { rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.contLen = pInfo->msgSize; rpcMsg.info.ahandle = pInfo; - rpcMsg.info.noResp = 1; + rpcMsg.info.noResp = 0; rpcMsg.msgType = 1; tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); - // tsem_wait(&pInfo->rspSem); + tsem_wait(&pInfo->rspSem); } tDebug("thread:%d, it is over", pInfo->index); @@ -110,17 +108,15 @@ int main(int argc, char *argv[]) { rpcInit.label = "APP"; rpcInit.numOfThreads = 1; rpcInit.cfp = processResponse; - rpcInit.sessions = 100; + rpcInit.sessions = 1000; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = "michael"; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.connLimitNum = 10; - rpcInit.connLimitLock = 1; - rpcInit.shareConnLimit = 16 * 1024; + rpcInit.shareConnLimit = tsShareConnLimit; rpcInit.supportBatch = 1; - - rpcDebugFlag = 135; + rpcInit.compressSize = -1; + rpcDebugFlag = 143; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) { @@ -139,6 +135,10 @@ int main(int argc, char *argv[]) { } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) { } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) { } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) { + } else if (strcmp(argv[i], "-l") == 0 && i < argc - 1) { + rpcInit.shareConnLimit = atoi(argv[++i]); + } else if (strcmp(argv[i], "-c") == 0 && i < argc - 1) { + rpcInit.compressSize = atoi(argv[++i]); } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { rpcDebugFlag = atoi(argv[++i]); } else { @@ -150,6 +150,8 @@ int main(int argc, char *argv[]) { printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs); printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user); printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag); + printf(" [-c compressSize]: compress size, default:%d\n", tsCompressMsgSize); + printf(" [-l shareConnLimit]: share conn limit, default:%d\n", tsShareConnLimit); printf(" [-h help]: print out this help\n\n"); exit(0); } @@ -168,18 +170,18 @@ int main(int argc, char *argv[]) { int64_t now = taosGetTimestampUs(); - SInfo *pInfo = (SInfo *)taosMemoryCalloc(1, sizeof(SInfo) * appThreads); - SInfo *p = pInfo; + SInfo **pInfo = (SInfo **)taosMemoryCalloc(1, sizeof(SInfo *) * appThreads); for (int i = 0; i < appThreads; ++i) { - pInfo->index = i; - pInfo->epSet = epSet; - pInfo->numOfReqs = numOfReqs; - pInfo->msgSize = msgSize; - tsem_init(&pInfo->rspSem, 0, 0); - pInfo->pRpc = pRpc; + SInfo *p = taosMemoryCalloc(1, sizeof(SInfo)); + p->index = i; + p->epSet = epSet; + p->numOfReqs = numOfReqs; + p->msgSize = msgSize; + tsem_init(&p->rspSem, 0, 0); + p->pRpc = pRpc; + pInfo[i] = p; - taosThreadCreate(&pInfo->thread, NULL, sendRequest, pInfo); - pInfo++; + taosThreadCreate(&p->thread, NULL, sendRequest, pInfo[i]); } do { @@ -192,12 +194,14 @@ int main(int argc, char *argv[]) { tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); for (int i = 0; i < appThreads; i++) { - SInfo *pInfo = p; - taosThreadJoin(pInfo->thread, NULL); - p++; + SInfo *p = pInfo[i]; + taosThreadJoin(p->thread, NULL); + taosMemoryFree(p); } - int ch = getchar(); - UNUSED(ch); + taosMemoryFree(pInfo); + + // int ch = getchar(); + // UNUSED(ch); taosCloseLog(); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 6408e4dcb2..dacde982ed 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -76,23 +76,6 @@ void *processShellMsg(void *arg) { for (int i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); - - if (pDataFile != NULL) { - if (taosWriteFile(pDataFile, pRpcMsg->pCont, pRpcMsg->contLen) < 0) { - tInfo("failed to write data file, reason:%s", strerror(errno)); - } - } - } - - if (commit >= 2) { - num += numOfMsgs; - // if (taosFsync(pDataFile) < 0) { - // tInfo("failed to flush data to file, reason:%s", strerror(errno)); - //} - - if (num % 10000 == 0) { - tInfo("%d request have been written into disk", num); - } } taosResetQitems(qall); @@ -107,16 +90,7 @@ void *processShellMsg(void *arg) { rpcMsg.code = 0; rpcSendResponse(&rpcMsg); - void *handle = pRpcMsg->info.handle; taosFreeQitem(pRpcMsg); - //{ - // SRpcMsg nRpcMsg = {0}; - // nRpcMsg.pCont = rpcMallocCont(msgSize); - // nRpcMsg.contLen = msgSize; - // nRpcMsg.info.handle = handle; - // nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; - // rpcSendResponse(&nRpcMsg); - //} } taosUpdateItemSize(qinfo.queue, numOfMsgs); @@ -149,12 +123,13 @@ int main(int argc, char *argv[]) { rpcInit.localPort = 7000; memcpy(rpcInit.localFqdn, "localhost", strlen("localhost")); rpcInit.label = "SER"; - rpcInit.numOfThreads = 1; + rpcInit.numOfThreads = 10; rpcInit.cfp = processRequestMsg; rpcInit.idleTime = 2 * 1500; taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); rpcDebugFlag = 131; + rpcInit.compressSize = -1; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { @@ -205,8 +180,8 @@ int main(int argc, char *argv[]) { if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno)); } - int32_t numOfAthread = 5; - multiQ = taosMemoryMalloc(sizeof(numOfAthread)); + int32_t numOfAthread = 1; + multiQ = taosMemoryMalloc(sizeof(MultiThreadQhandle)); multiQ->numOfThread = numOfAthread; multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread); multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread); @@ -221,11 +196,6 @@ int main(int argc, char *argv[]) { threads[i].idx = i; taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]); } - // qhandle = taosOpenQueue(); - // qset = taosOpenQset(); - // taosAddIntoQset(qset, qhandle, NULL); - - // processShellMsg(); if (pDataFile != NULL) { taosCloseFile(&pDataFile); diff --git a/source/libs/transport/test/transUT.cpp b/source/libs/transport/test/transUT.cpp index e57d01bcbc..cac8abf857 100644 --- a/source/libs/transport/test/transUT.cpp +++ b/source/libs/transport/test/transUT.cpp @@ -54,6 +54,7 @@ class Client { rpcInit_.user = (char *)user; rpcInit_.parent = this; rpcInit_.connType = TAOS_CONN_CLIENT; + rpcInit_.shareConnLimit = 200; taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); this->transCli = rpcOpen(&rpcInit_); @@ -85,6 +86,14 @@ class Client { SemWait(); *resp = this->resp; } + void sendReq(SRpcMsg *req) { + SEpSet epSet = {0}; + epSet.inUse = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 7000); + + rpcSendRequest(this->transCli, &epSet, req, NULL); + + } void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { if (req->info.handle != NULL) { rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT); @@ -160,6 +169,7 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { rpcMsg.contLen = 100; rpcMsg.info = pMsg->info; rpcMsg.code = 0; + rpcFreeCont(pMsg->pCont); rpcSendResponse(&rpcMsg); } @@ -264,6 +274,7 @@ class TransObj { cli->Stop(); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } + void cliSendReq(SRpcMsg *req) { cli->sendReq(req); } void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); } ~TransObj() { @@ -492,15 +503,16 @@ TEST_F(TransEnv, queryExcept) { TEST_F(TransEnv, noResp) { SRpcMsg resp = {0}; SRpcMsg req = {0}; - // for (int i = 0; i < 5; i++) { - // memset(&req, 0, sizeof(req)); - // req.info.noResp = 1; - // req.msgType = 1; - // req.pCont = rpcMallocCont(10); - // req.contLen = 10; - // tr->cliSendAndRecv(&req, &resp); - //} - // taosMsleep(2000); + for (int i = 0; i < 500000; i++) { + memset(&req, 0, sizeof(req)); + req.info.noResp = 1; + req.msgType = 3; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendReq(&req); + //tr->cliSendAndRecv(&req, &resp); + } + taosMsleep(2000); // no resp } diff --git a/source/libs/transport/test/transUT2.cpp b/source/libs/transport/test/transUT2.cpp new file mode 100644 index 0000000000..22faf0a3ed --- /dev/null +++ b/source/libs/transport/test/transUT2.cpp @@ -0,0 +1,529 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include "tdatablock.h" +#include "tglobal.h" +#include "tlog.h" +#include "tmisce.h" +#include "transLog.h" +#include "trpc.h" +#include "tversion.h" +using namespace std; + +const char *label = "APP"; +const char *secret = "secret"; +const char *user = "user"; +const char *ckey = "ckey"; + +class Server; +int port = 7000; +// server process +// server except + +typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); + +static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +// client process; +static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +class Client { + public: + void Init(int nThread) { + memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH)); + memset(&rpcInit_, 0, sizeof(rpcInit_)); + rpcInit_.localPort = 0; + rpcInit_.label = (char *)"client"; + rpcInit_.numOfThreads = nThread; + rpcInit_.cfp = processResp; + rpcInit_.user = (char *)user; + rpcInit_.parent = this; + rpcInit_.connType = TAOS_CONN_CLIENT; + rpcInit_.shareConnLimit = 200; + + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); + this->transCli = rpcOpen(&rpcInit_); + //tsem_init(&this->sem, 0, 0); + } + void SetResp(SRpcMsg *pMsg) { + // set up resp; + this->resp = *pMsg; + } + SRpcMsg *Resp() { return &this->resp; } + + void Restart(CB cb) { + rpcClose(this->transCli); + rpcInit_.cfp = cb; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); + this->transCli = rpcOpen(&rpcInit_); + } + void Stop() { + rpcClose(this->transCli); + this->transCli = NULL; + } + + void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) { + SEpSet epSet = {0}; + epSet.inUse = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 7000); + + rpcSendRequest(this->transCli, &epSet, req, NULL); + SemWait(); + *resp = this->resp; + } + void sendReq(SRpcMsg *req) { + SEpSet epSet = {0}; + epSet.inUse = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 7000); + + rpcSendRequest(this->transCli, &epSet, req, NULL); + } + + void sendReqWithId(SRpcMsg *req, int64_t *id) { + SEpSet epSet = {0}; + epSet.inUse = 0; + addEpIntoEpSet(&epSet, "127.0.0.1",7000); + rpcSendRequestWithCtx(this->transCli, &epSet, req, id, NULL); + + } + void freeId(int64_t *id) { + rpcFreeConnById(this->transCli, *id); + } + void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { + if (req->info.handle != NULL) { + rpcReleaseHandle(req->info.handle, TAOS_CONN_CLIENT); + req->info.handle = NULL; + } + SendAndRecv(req, resp); + } + + void SemWait() { tsem_wait(&this->sem); } + void SemPost() { tsem_post(&this->sem); } + void Reset() {} + + ~Client() { + if (this->transCli) rpcClose(this->transCli); + } + + private: + tsem_t sem; + SRpcInit rpcInit_; + void *transCli; + SRpcMsg resp; +}; +class Server { + public: + Server() { + memcpy(tsTempDir, TD_TMP_DIR_PATH, strlen(TD_TMP_DIR_PATH)); + memset(&rpcInit_, 0, sizeof(rpcInit_)); + + memcpy(rpcInit_.localFqdn, "localhost", strlen("localhost")); + rpcInit_.localPort = port; + rpcInit_.label = (char *)"server"; + rpcInit_.numOfThreads = 5; + rpcInit_.cfp = processReq; + rpcInit_.user = (char *)user; + rpcInit_.connType = TAOS_CONN_SERVER; + taosVersionStrToInt(version, &(rpcInit_.compatibilityVer)); + } + void Start() { + this->transSrv = rpcOpen(&this->rpcInit_); + taosMsleep(1000); + } + void SetSrvContinueSend(CB cb) { + this->Stop(); + rpcInit_.cfp = cb; + this->Start(); + } + void Stop() { + if (this->transSrv == NULL) return; + rpcClose(this->transSrv); + this->transSrv = NULL; + } + void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { + this->Stop(); + rpcInit_.cfp = cfp; + this->Start(); + } + void Restart() { + this->Stop(); + this->Start(); + } + ~Server() { + if (this->transSrv) rpcClose(this->transSrv); + this->transSrv = NULL; + } + + private: + SRpcInit rpcInit_; + void *transSrv; +}; +static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.info = pMsg->info; + rpcMsg.code = 0; + rpcFreeCont(pMsg->pCont); + rpcSendResponse(&rpcMsg); +} + +static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + // for (int i = 0; i < 10; i++) { + // SRpcMsg rpcMsg = {0}; + // rpcMsg.pCont = rpcMallocCont(100); + // rpcMsg.contLen = 100; + // rpcMsg.info = pMsg->info; + // rpcMsg.code = 0; + // rpcSendResponse(&rpcMsg); + // } +} +static void processReleaseHandleCb(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.info = pMsg->info; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); + + rpcReleaseHandle(&pMsg->info, TAOS_CONN_SERVER); +} +static void processRegisterFailure(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + // { + // SRpcMsg rpcMsg1 = {0}; + // rpcMsg1.pCont = rpcMallocCont(100); + // rpcMsg1.contLen = 100; + // rpcMsg1.info = pMsg->info; + // rpcMsg1.code = 0; + // rpcRegisterBrokenLinkArg(&rpcMsg1); + // } + // taosMsleep(10); + + // SRpcMsg rpcMsg = {0}; + // rpcMsg.pCont = rpcMallocCont(100); + // rpcMsg.contLen = 100; + // rpcMsg.info = pMsg->info; + // rpcMsg.code = 0; + // rpcSendResponse(&rpcMsg); +} +// client process; +static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + Client *client = (Client *)parent; + rpcFreeCont(pMsg->pCont); + STraceId *trace = (STraceId *)&pMsg->info.traceId; + tGDebug("received resp %s",tstrerror(pMsg->code)); +} + +static void initEnv() { + dDebugFlag = 143; + vDebugFlag = 0; + mDebugFlag = 143; + cDebugFlag = 0; + jniDebugFlag = 0; + tmrDebugFlag = 143; + uDebugFlag = 143; + rpcDebugFlag = 143; + qDebugFlag = 0; + wDebugFlag = 0; + sDebugFlag = 0; + tsdbDebugFlag = 0; + tsLogEmbedded = 1; + tsAsyncLog = 0; + + std::string path = TD_TMP_DIR_PATH "transport"; + // taosRemoveDir(path.c_str()); + taosMkDir(path.c_str()); + + tstrncpy(tsLogDir, path.c_str(), PATH_MAX); + if (taosInitLog("taosdlog", 1, false) != 0) { + printf("failed to init log file\n"); + } +} + +class TransObj { + public: + TransObj() { + initEnv(); + cli = new Client; + cli->Init(1); + srv = new Server; + srv->Start(); + } + + void RestartCli(CB cb) { + // + cli->Restart(cb); + } + void StopSrv() { + // + srv->Stop(); + } + // call when link broken, and notify query or fetch stop + void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { + /////// + srv->SetSrvContinueSend(cfp); + } + void RestartSrv() { srv->Restart(); } + void StopCli() { + /////// + cli->Stop(); + } + void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } + void cliSendReq(SRpcMsg *req) { cli->sendReq(req); } + + void cliSendReqWithId(SRpcMsg *req, int64_t *id) { cli->sendReqWithId(req, id);} + void cliFreeReqId(int64_t *id) { cli->freeId(id);} + void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); } + + ~TransObj() { + delete cli; + delete srv; + } + + private: + Client *cli; + Server *srv; +}; +class TransEnv : public ::testing::Test { + protected: + virtual void SetUp() { + // set up trans obj + tr = new TransObj(); + } + virtual void TearDown() { + // tear down + delete tr; + } + + TransObj *tr = NULL; +}; + +TEST_F(TransEnv, 01sendAndRec) { + // for (int i = 0; i < 10; i++) { + // SRpcMsg req = {0}, resp = {0}; + // req.msgType = 0; + // req.pCont = rpcMallocCont(10); + // req.contLen = 10; + // tr->cliSendAndRecv(&req, &resp); + // assert(resp.code == 0); + // } +} + +TEST_F(TransEnv, 02StopServer) { + // for (int i = 0; i < 1; i++) { + // SRpcMsg req = {0}, resp = {0}; + // req.msgType = 0; + // req.info.ahandle = (void *)0x35; + // req.pCont = rpcMallocCont(10); + // req.contLen = 10; + // tr->cliSendAndRecv(&req, &resp); + // assert(resp.code == 0); + // } + // SRpcMsg req = {0}, resp = {0}; + // req.info.ahandle = (void *)0x35; + // req.msgType = 1; + // req.pCont = rpcMallocCont(10); + // req.contLen = 10; + // tr->StopSrv(); + // // tr->RestartSrv(); + // tr->cliSendAndRecv(&req, &resp); + // assert(resp.code != 0); +} +TEST_F(TransEnv, clientUserDefined) { + // tr->RestartSrv(); + // for (int i = 0; i < 10; i++) { + // SRpcMsg req = {0}, resp = {0}; + // req.msgType = 0; + // req.pCont = rpcMallocCont(10); + // req.contLen = 10; + // tr->cliSendAndRecv(&req, &resp); + // assert(resp.code == 0); + // } + + ////////////////// +} + +TEST_F(TransEnv, cliPersistHandle) { + // SRpcMsg resp = {0}; + // void *handle = NULL; + // for (int i = 0; i < 10; i++) { + // SRpcMsg req = {0}; + // req.info = resp.info; + // req.info.persistHandle = 1; + + // req.msgType = 1; + // req.pCont = rpcMallocCont(10); + // req.contLen = 10; + // tr->cliSendAndRecv(&req, &resp); + // // if (i == 5) { + // // std::cout << "stop server" << std::endl; + // // tr->StopSrv(); + // //} + // // if (i >= 6) { + // // EXPECT_TRUE(resp.code != 0); + // //} + // handle = resp.info.handle; + // } + // rpcReleaseHandle(handle, TAOS_CONN_CLIENT); + // for (int i = 0; i < 10; i++) { + // SRpcMsg req = {0}; + // req.msgType = 1; + // req.pCont = rpcMallocCont(10); + // req.contLen = 10; + // tr->cliSendAndRecv(&req, &resp); + // } + + // taosMsleep(1000); + ////////////////// +} + +TEST_F(TransEnv, srvReleaseHandle) { + // SRpcMsg resp = {0}; + // tr->SetSrvContinueSend(processReleaseHandleCb); + // // tr->Restart(processReleaseHandleCb); + // void *handle = NULL; + // SRpcMsg req = {0}; + // for (int i = 0; i < 1; i++) { + // memset(&req, 0, sizeof(req)); + // req.info = resp.info; + // req.info.persistHandle = 1; + // req.msgType = 1; + // req.pCont = rpcMallocCont(10); + // req.contLen = 10; + // tr->cliSendAndRecv(&req, &resp); + // // tr->cliSendAndRecvNoHandle(&req, &resp); + // EXPECT_TRUE(resp.code == 0); + // } + ////////////////// +} +// reopen later +// TEST_F(TransEnv, cliReleaseHandleExcept) { +// SRpcMsg resp = {0}; +// SRpcMsg req = {0}; +// for (int i = 0; i < 3; i++) { +// memset(&req, 0, sizeof(req)); +// req.info = resp.info; +// req.info.persistHandle = 1; +// req.info.ahandle = (void *)1234; +// req.msgType = 1; +// req.pCont = rpcMallocCont(10); +// req.contLen = 10; +// tr->cliSendAndRecv(&req, &resp); +// if (i == 1) { +// std::cout << "stop server" << std::endl; +// tr->StopSrv(); +// } +// if (i > 1) { +// EXPECT_TRUE(resp.code != 0); +// } +// } +// ////////////////// +//} +TEST_F(TransEnv, srvContinueSend) { + // tr->SetSrvContinueSend(processContinueSend); + // SRpcMsg req = {0}, resp = {0}; + // for (int i = 0; i < 10; i++) { + // // memset(&req, 0, sizeof(req)); + // // memset(&resp, 0, sizeof(resp)); + // // req.msgType = 1; + // // req.pCont = rpcMallocCont(10); + // // req.contLen = 10; + // // tr->cliSendAndRecv(&req, &resp); + // } + // taosMsleep(1000); +} + +TEST_F(TransEnv, srvPersistHandleExcept) { + // tr->SetSrvContinueSend(processContinueSend); + // // tr->SetCliPersistFp(cliPersistHandle); + // SRpcMsg resp = {0}; + // SRpcMsg req = {0}; + // for (int i = 0; i < 5; i++) { + // // memset(&req, 0, sizeof(req)); + // // req.info = resp.info; + // // req.msgType = 1; + // // req.pCont = rpcMallocCont(10); + // // req.contLen = 10; + // // tr->cliSendAndRecv(&req, &resp); + // // if (i > 2) { + // // tr->StopCli(); + // // break; + // //} + // } + // taosMsleep(2000); + // conn broken + // +} +TEST_F(TransEnv, cliPersistHandleExcept) { + // tr->SetSrvContinueSend(processContinueSend); + // SRpcMsg resp = {0}; + // SRpcMsg req = {0}; + // for (int i = 0; i < 5; i++) { + // // memset(&req, 0, sizeof(req)); + // // req.info = resp.info; + // // req.msgType = 1; + // // req.pCont = rpcMallocCont(10); + // // req.contLen = 10; + // // tr->cliSendAndRecv(&req, &resp); + // // if (i > 2) { + // // tr->StopSrv(); + // // break; + // //} + // } + // taosMsleep(2000); + // // conn broken + // +} + +TEST_F(TransEnv, multiCliPersistHandleExcept) { + // conn broken +} +TEST_F(TransEnv, queryExcept) { + //taosMsleep(4 * 1000); +} +TEST_F(TransEnv, idTest) { + SRpcMsg resp = {0}; + SRpcMsg req = {0}; + for (int i = 0; i < 50000; i++) { + memset(&req, 0, sizeof(req)); + req.info.noResp = 0; + req.msgType = 3; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + int64_t id; + tr->cliSendReqWithId(&req, &id); + tr->cliFreeReqId(&id); + } + taosMsleep(1000); + // no resp +} +TEST_F(TransEnv, noResp) { + SRpcMsg resp = {0}; + SRpcMsg req = {0}; + for (int i = 0; i < 500000; i++) { + memset(&req, 0, sizeof(req)); + req.info.noResp = 0; + req.msgType = 3; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendReq(&req); + //tr->cliSendAndRecv(&req, &resp); + } + taosMsleep(100000); + // no resp +} From 411bcd04e1b8b43bb23a9d0340d3407ddf8c9dde Mon Sep 17 00:00:00 2001 From: Yu Chen <74105241+yu285@users.noreply.github.com> Date: Fri, 21 Jun 2024 16:22:30 +0800 Subject: [PATCH 25/28] docs:Update README-CN.md --- README-CN.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README-CN.md b/README-CN.md index 06ac087859..1f785eb458 100644 --- a/README-CN.md +++ b/README-CN.md @@ -348,7 +348,7 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java # 成为社区贡献者 -点击 [这里](https://www.taosdata.com/cn/contributor/),了解如何成为 TDengine 的贡献者。 +点击 [这里](https://www.taosdata.com/contributor),了解如何成为 TDengine 的贡献者。 # 加入技术交流群 From a881ff3edfc8e7382513d480684a996bbeb5e909 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 30 Oct 2024 18:06:30 +0800 Subject: [PATCH 26/28] add test case --- source/libs/transport/src/transCli.c | 5 +++++ source/libs/transport/test/transUT2.cpp | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b3f325c5b8..43f64924e1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -457,6 +457,7 @@ static bool filteBySeq(void* key, void* arg) { SFiterArg* targ = arg; SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) { + removeReqFromSendQ(pReq); return true; } else { return false; @@ -543,6 +544,7 @@ bool filterByQid(void* key, void* arg) { SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); if (pReq->msg.info.qId == *qid) { + removeReqFromSendQ(pReq); return true; } else { return false; @@ -1226,6 +1228,7 @@ static FORCE_INLINE void destroyReqInQueue(SCliConn* conn, queue* set, int32_t c QUEUE_REMOVE(el); SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); + removeReqFromSendQ(pReq); notifyAndDestroyReq(conn, pReq, code); } } @@ -1284,6 +1287,7 @@ bool filterToRmReq(void* h, void* arg) { queue* el = h; SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); if (pReq->sent == 1 && pReq->inSendQ == 0 && REQUEST_NO_RESP(&pReq->msg)) { + removeReqFromSendQ(pReq); return true; } return false; @@ -3700,6 +3704,7 @@ bool filterTimeoutReq(void* key, void* arg) { if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) { int64_t elapse = ((st - pReq->st) / 1000000); if (listArg && elapse >= listArg->pInst->readTimeout) { + removeReqFromSendQ(pReq); return true; } else { return false; diff --git a/source/libs/transport/test/transUT2.cpp b/source/libs/transport/test/transUT2.cpp index 22faf0a3ed..f69151813e 100644 --- a/source/libs/transport/test/transUT2.cpp +++ b/source/libs/transport/test/transUT2.cpp @@ -524,6 +524,6 @@ TEST_F(TransEnv, noResp) { tr->cliSendReq(&req); //tr->cliSendAndRecv(&req, &resp); } - taosMsleep(100000); + taosMsleep(10000); // no resp } From 23cd94c3564bd6746e6b3d25c8d7fbab0a3fee42 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Wed, 30 Oct 2024 19:38:31 +0800 Subject: [PATCH 27/28] docs: modify the taosBenchmark documentation for the float min problem --- docs/en/14-reference/01-components/10-taosbenchmark | 3 +++ docs/zh/14-reference/02-tools/10-taosbenchmark.md | 2 ++ 2 files changed, 5 insertions(+) diff --git a/docs/en/14-reference/01-components/10-taosbenchmark b/docs/en/14-reference/01-components/10-taosbenchmark index e4884b889c..45c5cd2fb8 100644 --- a/docs/en/14-reference/01-components/10-taosbenchmark +++ b/docs/en/14-reference/01-components/10-taosbenchmark @@ -364,6 +364,9 @@ The configuration parameters for specifying super table tag columns and data col - **min**: The minimum value of the column/label of the data type. The generated value will equal or large than the minimum value. - **max**: The maximum value of the column/label of the data type. The generated value will less than the maximum value. + +- **scalingFactor**: Floating-point precision enhancement factor, which takes effect only when the data type is float/double. It has a valid range of positive integers from 1 to 1,000,000. It is used to enhance the precision of generated floating-point numbers, particularly when the min or max values are small. This property enhances the precision after the decimal point by powers of 10: scalingFactor of 10 indicates an enhancement of 1 decimal precision, 100 indicates an enhancement of 2 decimal precision, and so on. + - **fun**: This column of data is filled with functions. Currently, only the sin and cos functions are supported. The input parameter is the timestamp and converted to an angle value. The conversion formula is: angle x=input time column ts value % 360. At the same time, it supports coefficient adjustment and random fluctuation factor adjustment, presented in a fixed format expression, such as fun="10\*sin(x)+100\*random(5)", where x represents the angle, ranging from 0 to 360 degrees, and the growth step size is consistent with the time column step size. 10 represents the coefficient of multiplication, 100 represents the coefficient of addition or subtraction, and 5 represents the fluctuation range within a random range of 5%. The currently supported data types are int, bigint, float, and double. Note: The expression is fixed and cannot be reversed. - **values**: The value field of the nchar/binary column/label, which will be chosen randomly from the values. diff --git a/docs/zh/14-reference/02-tools/10-taosbenchmark.md b/docs/zh/14-reference/02-tools/10-taosbenchmark.md index 3c43c58915..303cfaa395 100644 --- a/docs/zh/14-reference/02-tools/10-taosbenchmark.md +++ b/docs/zh/14-reference/02-tools/10-taosbenchmark.md @@ -364,6 +364,8 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\) - **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。 +- **scalingFactor** : 浮点数精度增强因子,仅当数据类型是float/double时生效,有效值范围为1至1000000的正整数。用于增强生成浮点数的精度,特别是在min或max值较小的情况下。此属性按10的幂次增强小数点后的精度:scalingFactor为10表示增强1位小数精度,100表示增强2位,依此类推。 + - **fun** : 此列数据以函数填充,目前只支持 sin 和 cos 两函数,输入参数为时间戳换算成角度值,换算公式: 角度 x = 输入的时间列ts值 % 360。同时支持系数调节,随机波动因子调节,以固定格式的表达式展现,如 fun=“10\*sin(x)+100\*random(5)” , x 表示角度,取值 0 ~ 360度,增长步长与时间列步长一致。10 表示乘的系数,100 表示加或减的系数,5 表示波动幅度在 5% 的随机范围内。目前支持的数据类型为 int, bigint, float, double 四种数据类型。注意:表达式为固定模式,不可前后颠倒。 - **values** : nchar/binary 列/标签的值域,将从值中随机选择。 From 25767e7fb3ffb7121a035fd6c8ef4d46f9123ed2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Oct 2024 20:36:39 +0800 Subject: [PATCH 28/28] doc: minor changes --- docs/zh/14-reference/02-tools/10-taosbenchmark.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/02-tools/10-taosbenchmark.md b/docs/zh/14-reference/02-tools/10-taosbenchmark.md index 303cfaa395..d655290577 100644 --- a/docs/zh/14-reference/02-tools/10-taosbenchmark.md +++ b/docs/zh/14-reference/02-tools/10-taosbenchmark.md @@ -364,7 +364,7 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\) - **max** : 数据类型的 列/标签 的最大值。生成的值将小于最小值。 -- **scalingFactor** : 浮点数精度增强因子,仅当数据类型是float/double时生效,有效值范围为1至1000000的正整数。用于增强生成浮点数的精度,特别是在min或max值较小的情况下。此属性按10的幂次增强小数点后的精度:scalingFactor为10表示增强1位小数精度,100表示增强2位,依此类推。 +- **scalingFactor** : 浮点数精度增强因子,仅当数据类型是 float/double 时生效,有效值范围为 1 至 1000000 的正整数。用于增强生成浮点数的精度,特别是在 min 或 max 值较小的情况下。此属性按 10 的幂次增强小数点后的精度:scalingFactor 为 10 表示增强 1 位小数精度,100 表示增强 2 位,依此类推。 - **fun** : 此列数据以函数填充,目前只支持 sin 和 cos 两函数,输入参数为时间戳换算成角度值,换算公式: 角度 x = 输入的时间列ts值 % 360。同时支持系数调节,随机波动因子调节,以固定格式的表达式展现,如 fun=“10\*sin(x)+100\*random(5)” , x 表示角度,取值 0 ~ 360度,增长步长与时间列步长一致。10 表示乘的系数,100 表示加或减的系数,5 表示波动幅度在 5% 的随机范围内。目前支持的数据类型为 int, bigint, float, double 四种数据类型。注意:表达式为固定模式,不可前后颠倒。