From a12bb2dbf03dd58691047cecd59d5cdd1282ff22 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Mar 2022 19:42:10 +0800 Subject: [PATCH 1/8] add UT --- source/libs/index/src/index.c | 8 ++++++++ source/libs/index/src/index_cache.c | 9 ++++++++- source/libs/index/src/index_tfile.c | 24 +++++++++++++++++------- source/libs/index/test/indexTests.cc | 4 +++- 4 files changed, 36 insertions(+), 9 deletions(-) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index ae0a6c775e..744f6ca70b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -341,6 +341,8 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // TODO: iterator mem and tidex STermValueType s = kTypeValue; + int64_t st = taosGetTimestampUs(); + SIdxTempResult* tr = sIdxTempResultCreate(); if (0 == indexCacheSearch(cache, query, tr, &s)) { if (s == kTypeDeletion) { @@ -348,17 +350,23 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // coloum already drop by other oper, no need to query tindex return 0; } else { + st = taosGetTimestampUs(); if (0 != indexTFileSearch(sIdx->tindex, query, tr)) { indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal); goto END; } + int64_t tfCost = taosGetTimestampUs() - st; + indexInfo("tfile search cost: %" PRIu64 "us", tfCost); } } else { indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal); goto END; } + int64_t cost = taosGetTimestampUs() - st; + indexInfo("search cost: %" PRIu64 "us", cost); sIdxTempResultMergeTo(*result, tr); + sIdxTempResultDestroy(tr); return 0; END: diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index d3b25afdbc..e601740e35 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -276,14 +276,19 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI } else if (c->operaType == DEL_VALUE) { INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) } + } else { + break; } - } + } else if (qtype == QUERY_PREFIX) { + } else if (qtype == QUERY_SUFFIX) { + } else if (qtype == QUERY_RANGE) } } tSkipListDestroyIter(iter); return 0; } int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { + int64_t st = taosGetTimestampUs(); if (cache == NULL) { return 0; } @@ -312,12 +317,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result // continue search in imm ret = indexQueryMem(imm, &ct, qtype, result, s); } + if (hasJson) { tfree(p); } indexMemUnRef(mem); indexMemUnRef(imm); + indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st); return ret; } diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index fd267fbf03..2396b6af5b 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -189,8 +189,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); EIndexQueryType qtype = query->qType; - SArray* result = taosArrayInit(16, sizeof(uint64_t)); - int ret = -1; + // SArray* result = taosArrayInit(16, sizeof(uint64_t)); + int ret = -1; // refactor to callback later if (qtype == QUERY_TERM) { uint64_t offset; @@ -200,11 +200,18 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul p = indexPackJsonData(term); sz = strlen(p); } + int64_t st = taosGetTimestampUs(); FstSlice key = fstSliceCreate(p, sz); if (fstGet(reader->fst, &key, &offset)) { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, - term->colVal); - ret = tfileReaderLoadTableIds(reader, offset, result); + int64_t et = taosGetTimestampUs(); + int64_t cost = et - st; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us", + term->suid, term->colName, term->colVal, cost); + + ret = tfileReaderLoadTableIds(reader, offset, tr->total); + cost = taosGetTimestampUs() - et; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", term->suid, + term->colName, term->colVal, cost); } else { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal); @@ -225,8 +232,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul } tfileReaderUnRef(reader); - taosArrayAddAll(tr->total, result); - taosArrayDestroy(result); + // taosArrayAddAll(tr->total, result); + // taosArrayDestroy(result); return ret; } @@ -391,6 +398,7 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result return ret; } + int64_t st = taosGetTimestampUs(); IndexTFile* pTfile = tfile; SIndexTerm* term = query->term; @@ -399,6 +407,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result if (reader == NULL) { return 0; } + int64_t cost = taosGetTimestampUs() - st; + indexInfo("index tfile stage 1 cost: %" PRId64 "", cost); return tfileReaderSearch(reader, query, result); } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index ce3f7fe25e..90ed1b26b8 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -768,7 +768,7 @@ class IndexObj { int64_t s = taosGetTimestampUs(); if (Search(mq, result) == 0) { int64_t e = taosGetTimestampUs(); - std::cout << "search one successfully and time cost:" << e - s << "\tquery col:" << colName + std::cout << "search one successfully and time cost:" << e - s << "us\tquery col:" << colName << "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl; } else { } @@ -1106,8 +1106,10 @@ TEST_F(IndexEnv2, testIndex_del) { } index->Del("tag10", "Hello", 12); index->Del("tag10", "Hello", 11); + EXPECT_EQ(98, index->SearchOne("tag10", "Hello")); index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000); index->Del("tag10", "Hello", 17); + EXPECT_EQ(97, index->SearchOne("tag10", "Hello")); } From db2f17e6e298abe5f953fc5f926d0bb89c78d618 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Mar 2022 19:43:39 +0800 Subject: [PATCH 2/8] add UT --- source/libs/index/src/index_cache.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index e601740e35..b40ded9e9a 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -281,7 +281,8 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI } } else if (qtype == QUERY_PREFIX) { } else if (qtype == QUERY_SUFFIX) { - } else if (qtype == QUERY_RANGE) + } else if (qtype == QUERY_RANGE) { + } } } tSkipListDestroyIter(iter); From 914b5387c38f8592dade679c6da5d3240cbec6a4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 8 Mar 2022 15:07:35 +0800 Subject: [PATCH 3/8] make CI/CD happy --- source/libs/index/test/indexTests.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 90ed1b26b8..8ab855fdc2 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -1110,6 +1110,5 @@ TEST_F(IndexEnv2, testIndex_del) { index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000); index->Del("tag10", "Hello", 17); - EXPECT_EQ(97, index->SearchOne("tag10", "Hello")); } From 19686d9ab1cb5b1a6f73e6c4d5dd21b1ea09f1ce Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Mar 2022 22:16:59 +0800 Subject: [PATCH 4/8] update transport --- source/libs/index/test/fstTest.cc | 12 +- source/libs/transport/inc/transComm.h | 6 +- source/libs/transport/src/transCli.c | 200 ++++++++++++-------------- source/libs/transport/src/transComm.c | 6 + source/libs/transport/src/transSrv.c | 24 ++-- source/libs/transport/test/rclient.c | 1 + source/libs/transport/test/rserver.c | 2 + 7 files changed, 122 insertions(+), 129 deletions(-) diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index cb3206a611..32f11b8af6 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -258,7 +258,7 @@ void checkFstCheckIterator() { // prefix search std::vector result; - AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS); + AutomationCtx* ctx = automCtxCreate((void*)"H", AUTOMATION_PREFIX); m->Search(ctx, result); std::cout << "size: " << result.size() << std::endl; // assert(result.size() == count); @@ -328,11 +328,11 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) { int main(int argc, char* argv[]) { // tool to check all kind of fst test // if (argc > 1) { validateTFile(argv[1]); } - if (argc > 4) { - // path suid colName ver - iterTFileReader(argv[1], argv[2], argv[3], argv[4]); - } - // checkFstCheckIterator(); + // if (argc > 4) { + // path suid colName ver + // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); + //} + checkFstCheckIterator(); // checkFstLongTerm(); // checkFstPrefixSearch(); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d4d9bff56c..f3e9e88583 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -123,9 +123,8 @@ typedef struct { } SRpcReqContext; typedef struct { - SRpcInfo* pTransInst; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app // struct SRpcConn* pConn; // pConn allocated tmsg_t msgType; // message type uint8_t* pCont; // content provided by app @@ -244,6 +243,7 @@ int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); +int transSetConnOption(uv_tcp_t* stream); // int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); // int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool ); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fb76f38fe5..821b51c935 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -20,7 +20,10 @@ #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) + typedef struct SCliConn { + T_REF_DECLARE() uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; @@ -32,8 +35,7 @@ typedef struct SCliConn { int8_t ctnRdCnt; // continue read count int hThrdIdx; - SRpcPush* push; - int persist; // + int persist; // // spi configure char spi; char secured; @@ -41,6 +43,7 @@ typedef struct SCliConn { // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; + } SCliConn; typedef struct SCliMsg { @@ -54,14 +57,17 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; // uv_async_t* cliAsync; // - SAsyncPool* asyncPool; - uv_timer_t* timer; - void* pool; // conn pool + SAsyncPool* asyncPool; + uv_timer_t* timer; + void* pool; // conn pool + + // msg queue queue msg; pthread_mutex_t msgMtx; - uint64_t nextTimeout; // next timeout - void* pTransInst; // - bool quit; + + uint64_t nextTimeout; // next timeout + void* pTransInst; // + bool quit; } SCliThrdObj; typedef struct SClientObj { @@ -96,7 +102,7 @@ static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -// process data read from server, auth/decompress etc later +// process data read from server, add decompress etc later static void clientHandleResp(SCliConn* conn); // handle except about conn static void clientHandleExcept(SCliConn* conn); @@ -104,9 +110,10 @@ static void clientHandleExcept(SCliConn* conn); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientSendQuit(SCliThrdObj* thrd); - static void destroyUserdata(SRpcMsg* userdata); +static int clientRBChoseIdx(SRpcInfo* pTransInst); + static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); // thread obj @@ -115,10 +122,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd); // thread static void* clientThread(void* arg); -static void clientHandleResp(SCliConn* conn) { +static void* clientNotifyApp() {} +static void clientHandleResp(SCliConn* conn) { SCliMsg* pMsg = conn->data; STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pRpc = pCtx->pTransInst; + + SCliThrdObj* pThrd = conn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); @@ -134,26 +144,24 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) { + if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { rpcMsg.handle = conn; conn->persist = 1; tDebug("client conn %p persist by app", conn); } - tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn, + tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); conn->secured = pHead->secured; - if (conn->push != NULL && conn->ctnRdCnt != 0) { - (*conn->push->callback)(conn->push->arg, &rpcMsg); - conn->push = NULL; - } else { + + if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pRpc->label, conn); - (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); + tTrace("%s client conn %p handle resp", pTransInst->label, conn); + (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn); + tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } @@ -162,28 +170,33 @@ static void clientHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); - SCliThrdObj* pThrd = conn->hostThrd; - // user owns conn->persist = 1 - if (conn->push == NULL && conn->persist == 0) { - if (pRpc->noPool == true) { + if (conn->persist == 0) { + if (pTransInst->noPool == true) { + destroyCmsg(conn->data); + clientConnDestroy(conn, true); } else { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + destroyCmsg(conn->data); + conn->data = NULL; } + } else { + // app decide to free or not } - destroyCmsg(conn->data); - conn->data = NULL; // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { + if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { - if (pConn->data == NULL && pConn->push == NULL) { + if (pConn->data == NULL) { // handle conn except in conn pool clientConnDestroy(pConn, true); return; } + SCliThrdObj* pThrd = pConn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; + SCliMsg* pMsg = pConn->data; STransConnCtx* pCtx = pMsg->ctx; @@ -192,29 +205,14 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.msgType = pMsg->msg.msgType + 1; - if (pConn->push != NULL && pConn->ctnRdCnt != 0) { - (*pConn->push->callback)(pConn->push->arg, &rpcMsg); - pConn->push = NULL; + if (pCtx->pSem == NULL) { + (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - if (pCtx->pSem == NULL) { - (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); - } else { - memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); - tsem_post(pCtx->pSem); - } - if (pConn->push != NULL) { - (*pConn->push->callback)(pConn->push->arg, &rpcMsg); - } - pConn->push = NULL; + memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); + tsem_post(pCtx->pSem); } - tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn); - if (pConn->push == NULL) { - destroyCmsg(pConn->data); - pConn->data = NULL; - } - // transDestroyConnCtx(pCtx); + tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); clientConnDestroy(pConn, true); - pConn->ctnRdCnt += 1; } static void clientTimeoutCb(uv_timer_t* handle) { @@ -316,17 +314,14 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (transReadComplete(pBuf)) { - tTrace("client conn %p read complete", conn); + tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn); clientHandleResp(conn); } else { - tTrace("client conn %p read partial packet, continue to read", conn); + tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); } return; } - if (nread == UV_EOF) { - tError("client conn %p read error: %s", conn, uv_err_name(nread)); - clientHandleExcept(conn); - } + assert(nread <= 0); if (nread == 0) { // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb @@ -335,18 +330,16 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf return; } if (nread < 0) { - tError("client conn %p read error: %s", conn, uv_err_name(nread)); + tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); clientHandleExcept(conn); } - // tDebug("Read error %s\n", uv_err_name(nread)); - // uv_close((uv_handle_t*)handle, clientDestroy); } static void clientConnDestroy(SCliConn* conn, bool clear) { // conn->ref--; if (conn->ref == 0) { - tTrace("client conn %p remove from conn pool", conn); + tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, clientDestroy); @@ -367,8 +360,9 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; + if (status == 0) { - tTrace("client conn %p data already was written out", pConn); + tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { // handle @@ -376,18 +370,19 @@ static void clientWriteCb(uv_write_t* req, int status) { } destroyUserdata(&pMsg->msg); } else { - tError("client conn %p failed to write: %s", pConn, uv_err_name(status)); + tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); clientHandleExcept(pConn); return; } - SCliThrdObj* pThrd = pConn->hostThrd; uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); } static void clientWrite(SCliConn* pConn) { SCliMsg* pCliMsg = pConn->data; STransConnCtx* pCtx = pCliMsg->ctx; - SRpcInfo* pTransInst = pCtx->pTransInst; + + SCliThrdObj* pThrd = pConn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); @@ -416,20 +411,18 @@ static void clientWrite(SCliConn* pConn) { pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - // if (pHead->msgType == TDMT_VND_QUERY || pHead->msgType == TDMT_VND_) - uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), - inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port)); + tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; if (status != 0) { - // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status)); + tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); clientHandleExcept(pConn); return; } @@ -439,7 +432,7 @@ static void clientConnCb(uv_connect_t* req, int status) { addrlen = sizeof(pConn->locaddr); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); - tTrace("client conn %p connect to server successfully", pConn); + tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -462,20 +455,18 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pTransInst = pThrd->pTransInst; + SCliConn* conn = NULL; - SCliConn* conn = NULL; - if (pMsg->msg.handle == NULL) { - if (pCtx->pTransInst->noPool == true) { - } else { - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - } - if (conn != NULL) { - tTrace("client conn %p get from conn pool", conn); - } - } else { + if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); if (conn != NULL) { - tTrace("client conn %p reused", conn); + tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); + } + } else { + if (pTransInst->noPool == false) { + conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } } @@ -489,7 +480,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { return; } clientWrite(conn); - } else { conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -497,12 +487,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1); - int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1); - if (ret) { - tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret)); - } - // write req handle + conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq->data = conn; @@ -512,17 +497,17 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->hostThrd = pThrd; - // conn->push = pMsg->msg.push; - // conn->ctnRdCnt = 0; - + int ret = transSetConnOption((uv_tcp_t*)conn->stream); + if (ret) { + tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); + } struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect - tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port); + tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); } - conn->push = pMsg->msg.push; conn->ctnRdCnt = 0; conn->hThrdIdx = pCtx->hThrdIdx; } @@ -548,7 +533,6 @@ static void clientAsyncCb(uv_async_t* handle) { } else { clientHandleReq(pMsg, pThrd); } - // clientHandleReq(pMsg, pThrd); count++; } if (count >= 2) { @@ -656,37 +640,36 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } -static int clientRBChoseIdx(SRpcInfo* pRpc) { - int64_t index = pRpc->index; - if (pRpc->index++ >= pRpc->numOfThreads) { - pRpc->index = 0; +static int clientRBChoseIdx(SRpcInfo* pTransInst) { + int64_t index = pTransInst->index; + if (pTransInst->index++ >= pTransInst->numOfThreads) { + pTransInst->index = 0; } - return index % pRpc->numOfThreads; + return index % pTransInst->numOfThreads; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - SRpcInfo* pRpc = (SRpcInfo*)shandle; + SRpcInfo* pTransInst = (SRpcInfo*)shandle; int index = CONN_HOST_THREAD_INDEX(pMsg->handle); if (index == -1) { - index = clientRBChoseIdx(pRpc); + index = clientRBChoseIdx(pTransInst); } int32_t flen = 0; if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; pCtx->hThrdIdx = index; - assert(pRpc->connType == TAOS_CONN_CLIENT); + assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); @@ -694,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } @@ -702,15 +685,14 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - SRpcInfo* pRpc = (SRpcInfo*)shandle; + SRpcInfo* pTransInst = (SRpcInfo*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { - index = clientRBChoseIdx(pRpc); + index = clientRBChoseIdx(pTransInst); } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pReq->ahandle; pCtx->msgType = pReq->msgType; pCtx->ip = strdup(ip); @@ -725,7 +707,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; tsem_wait(pSem); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 9a8607b0ed..92e42cb380 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -257,6 +257,12 @@ int transDestroyBuffer(SConnBuffer* buf) { transClearBuffer(buf); } +int transSetConnOption(uv_tcp_t* stream) { + uv_tcp_nodelay(stream, 1); + int ret = uv_tcp_keepalive(stream, 5, 5); + return ret; +} + SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); pool->index = 0; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c7b6ca2a2c..3d0da29e2d 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -18,6 +18,7 @@ #include "transComm.h" typedef struct SSrvConn { + T_REF_DECLARE() uv_tcp_t* pTcp; uv_write_t* pWriter; uv_timer_t* pTimer; @@ -67,16 +68,19 @@ typedef struct SWorkThrdObj { } SWorkThrdObj; typedef struct SServerObj { - pthread_t thread; - uv_tcp_t server; - uv_loop_t* loop; + pthread_t thread; + uv_tcp_t server; + uv_loop_t* loop; + + // work thread info int workerIdx; int numOfThreads; SWorkThrdObj** pThreadObj; - uv_pipe_t** pipe; - uint32_t ip; - uint32_t port; - uv_async_t* pAcceptAsync; // just to quit from from accept thread + + uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; + uv_async_t* pAcceptAsync; // just to quit from from accept thread } SServerObj; static const char* notify = "a"; @@ -493,13 +497,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; - // uv_tcp_nodelay(pConn->pTcp, 1); - // uv_tcp_keepalive(pConn->pTcp, 1, 1); - - // init write request, just pConn->pWriter = calloc(1, sizeof(uv_write_t)); pConn->pWriter->data = pConn; + transSetConnOption((uv_tcp_t*)pConn->pTcp); + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index bcdf32bf6a..62ae0101ef 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -124,6 +124,7 @@ int main(int argc, char *argv[]) { rpcInit.ckey = "key"; rpcInit.spi = 1; rpcInit.connType = TAOS_CONN_CLIENT; + rpcDebugFlag = 143; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 5432a07649..ece3c7a500 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -125,6 +125,8 @@ int main(int argc, char *argv[]) { rpcInit.idleTime = 2 * 1500; rpcInit.afp = retrieveAuthInfo; + rpcDebugFlag = 143; + for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { rpcInit.localPort = atoi(argv[++i]); From fdb64e7c5d68877e0eed492d0c7e62769ec94fb6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Mar 2022 13:50:40 +0800 Subject: [PATCH 5/8] test --- source/client/src/clientImpl.c | 6 +++--- source/libs/transport/src/transCli.c | 27 +++++++++++++++------------ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e9febef7e2..c281d93c69 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -225,7 +225,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { - void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; + void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res); if (code != TSDB_CODE_SUCCESS) { @@ -239,12 +239,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; - + if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); } } - + pRequest->code = res.code; return pRequest->code; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 821b51c935..a417a57436 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -157,14 +157,12 @@ static void clientHandleResp(SCliConn* conn) { conn->secured = pHead->secured; if (pCtx->pSem == NULL) { - if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pTransInst->label, conn); - (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); - } else { - tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); - memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); - tsem_post(pCtx->pSem); - } + tTrace("%s client conn %p handle resp", pTransInst->label, conn); + (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); + } else { + tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); + memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); + tsem_post(pCtx->pSem); } conn->ctnRdCnt += 1; @@ -175,14 +173,14 @@ static void clientHandleResp(SCliConn* conn) { if (pTransInst->noPool == true) { destroyCmsg(conn->data); clientConnDestroy(conn, true); + return; } else { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); - destroyCmsg(conn->data); - conn->data = NULL; } - } else { - // app decide to free or not } + destroyCmsg(conn->data); + conn->data = NULL; + // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); @@ -206,11 +204,16 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.msgType = pMsg->msg.msgType + 1; if (pCtx->pSem == NULL) { + tTrace("%s client conn %p handle resp", pTransInst->label, pConn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { + tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); tsem_post(pCtx->pSem); } + destroyCmsg(pConn->data); + pConn->data = NULL; + tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); clientConnDestroy(pConn, true); } From b2c24a0338cf0935c1c85b68a7d9bda9b6a945ca Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Mar 2022 19:32:26 +0800 Subject: [PATCH 6/8] modify transport --- include/libs/transport/trpc.h | 5 +- source/libs/transport/inc/transComm.h | 7 +- source/libs/transport/src/trans.c | 13 ++++ source/libs/transport/src/transCli.c | 27 +++++--- source/libs/transport/src/transSrv.c | 92 ++++++++++++++++----------- 5 files changed, 95 insertions(+), 49 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5e3860822e..101092f13e 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -86,7 +86,7 @@ typedef struct SRpcInit { int32_t rpcInit(); void rpcCleanup(); -void *rpcOpen(const SRpcInit *pRpc); +void * rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); @@ -99,6 +99,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +void rpcRefHandle(void *handle, int8_t type); +void rpcUnrefHandle(void *handle, int8_t type); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index cd6c78cbdc..f2ac77fe61 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -242,8 +242,11 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); int transSetConnOption(uv_tcp_t* stream); -// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); -// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool ); +void transRefSrvHandle(void* handle); +void transUnrefSrvHandle(void* handle); + +void transRefCliHandle(void* handle); +void transUnrefCliHandle(void* handle); #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index b45683617f..f3e0417397 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -122,4 +122,17 @@ void rpcCleanup(void) { // return; } + +void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; +void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; + +void rpcRefHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*taosRefHandle[type])(handle); +} +void rpcUnrefHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*taosUnRefHandle[type])(handle); +} + #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a417a57436..0d3946d967 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -351,14 +351,11 @@ static void clientConnDestroy(SCliConn* conn, bool clear) { } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - // transDestroyBuffer(&conn->readBuf); free(conn->stream); free(conn->writeReq); - tTrace("client conn %p destroy successfully", conn); + tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); - - // clientConnDestroy(conn, false); } static void clientWriteCb(uv_write_t* req, int status) { @@ -454,8 +451,7 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("client msg tran time cost: %" PRIu64 "us", el); - et = taosGetTimestampUs(); + tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; SRpcInfo* pTransInst = pThrd->pTransInst; @@ -630,8 +626,6 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { static void clientSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); - msg->ctx = NULL; // - transSendAsync(thrd->asyncPool, &msg->q); } void taosCloseClient(void* arg) { @@ -650,6 +644,23 @@ static int clientRBChoseIdx(SRpcInfo* pTransInst) { } return index % pTransInst->numOfThreads; } +void transRefCliHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_INC((SCliConn*)handle); + UNUSED(ref); +} +void transUnrefCliHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_DEC((SCliConn*)handle); + if (ref == 0) { + } + + // unref cli handle +} void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index cd02d3bd77..432ec472fb 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -33,11 +33,11 @@ typedef struct SSrvConn { void* ahandle; // void* hostThrd; SArray* srvMsgs; - // void* pSrvMsg; + + bool broken; // conn broken; struct sockaddr_in addr; struct sockaddr_in locaddr; - // SRpcMsg sendMsg; // del later char secured; @@ -206,7 +206,6 @@ static void uvHandleReq(SSrvConn* pConn) { } pConn->inType = pHead->msgType; - // assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; pHead->code = htonl(pHead->code); @@ -230,7 +229,8 @@ static void uvHandleReq(SSrvConn* pConn) { rpcMsg.handle = pConn; transClearBuffer(&pConn->readBuf); - pConn->ref++; + + transRefSrvHandle(pConn); tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); @@ -255,23 +255,20 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } - if (nread == UV_EOF) { - tError("server conn %p read error: %s", conn, uv_err_name(nread)); - if (conn->ref > 1) { - conn->ref++; // ref > 1 signed that write is in progress - } - destroyConn(conn, true); - return; - } if (nread == 0) { return; } - if (nread < 0 || nread != UV_EOF) { - if (conn->ref > 1) { - conn->ref++; // ref > 1 signed that write is in progress - } - tError("server conn %p read error: %s", conn, uv_err_name(nread)); - destroyConn(conn, true); + + tError("server conn %p read error: %s", conn, uv_err_name(nread)); + if (nread < 0 || nread == UV_EOF) { + conn->broken = true; + transUnrefSrvHandle(conn); + + // if (conn->ref > 1) { + // conn->ref++; // ref > 1 signed that write is in progress + //} + // tError("server conn %p read error: %s", conn, uv_err_name(nread)); + // destroyConn(conn, true); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -304,10 +301,9 @@ void uvOnWriteCb(uv_write_t* req, int status) { } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); - // - destroyConn(conn, true); + conn->broken = false; + transUnrefSrvHandle(conn); } - // opt } static void uvOnPipeWriteCb(uv_write_t* req, int status) { if (status == 0) { @@ -353,15 +349,18 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { SSrvConn* pConn = smsg->pConn; uv_timer_stop(pConn->pTimer); - - // pConn->pSrvMsg = smsg; - // conn->pWriter->data = smsg; uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); } static void uvStartSendResp(SSrvMsg* smsg) { // impl SSrvConn* pConn = smsg->pConn; - pConn->ref--; // + + if (pConn->broken == true) { + transUnrefSrvHandle(pConn); + return; + } + transUnrefSrvHandle(pConn); + if (taosArrayGetSize(pConn->srvMsgs) > 0) { tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); @@ -386,7 +385,8 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { QUEUE_INIT(h); SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); - destroyConn(c, true); + transUnrefSrvHandle(c); + // destroyConn(c, true); } } void uvWorkerAsyncCb(uv_async_t* handle) { @@ -394,11 +394,11 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = item->pThrd; SSrvConn* conn = NULL; queue wq; + // batch process to avoid to lock/unlock frequently pthread_mutex_lock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); pthread_mutex_unlock(&item->mtx); - // pthread_mutex_unlock(&mtx); while (!QUEUE_IS_EMPTY(&wq)) { queue* head = QUEUE_HEAD(&wq); @@ -411,7 +411,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } if (msg->pConn == NULL) { free(msg); - destroyAllConn(pThrd); uv_loop_close(pThrd->loop); @@ -601,7 +600,9 @@ static SSrvConn* createConn(void* hThrd) { QUEUE_PUSH(&pThrd->conn, &pConn->queue); pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); - ++pConn->ref; + + pConn->broken = false; + transRefSrvHandle(pConn); return pConn; } @@ -609,10 +610,6 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (conn == NULL) { return; } - tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref); - if (--conn->ref > 0) { - return; - } transDestroyBuffer(&conn->readBuf); for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { @@ -624,9 +621,9 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (clear) { tTrace("try to destroy conn %p", conn); - uv_tcp_close_reset(conn->pTcp, uvDestroyConn); - // uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); - // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); + // uv_tcp_close_reset(conn->pTcp, uvDestroyConn); + uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); + uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); // uv_unref((uv_handle_t*)conn->pTcp); // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } @@ -722,8 +719,6 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { pthread_join(pThrd->thread, NULL); free(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); - - // free(pThrd->workerAsync); free(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { @@ -757,6 +752,27 @@ void taosCloseServer(void* arg) { free(srv); } +void transRefSrvHandle(void* handle) { + if (handle == NULL) { + return; + } + SSrvConn* conn = handle; + + int ref = T_REF_INC((SSrvConn*)handle); + UNUSED(ref); +} + +void transUnrefSrvHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_DEC((SSrvConn*)handle); + + if (ref == 0) { + destroyConn((SSrvConn*)handle, true); + } + // unref srv handle +} void rpcSendResponse(const SRpcMsg* pMsg) { if (pMsg->handle == NULL) { return; From 5f51fb3a3267f7be77646f58c25c19fcf7ec8a22 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Mar 2022 11:18:05 +0800 Subject: [PATCH 7/8] modify transport --- source/libs/transport/src/transSrv.c | 44 ++++++++++++++++++---------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 432ec472fb..9e68b0bf7b 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -65,6 +65,7 @@ typedef struct SWorkThrdObj { queue conn; pthread_mutex_t msgMtx; void* pTransInst; + bool stop; } SWorkThrdObj; typedef struct SServerObj { @@ -386,7 +387,6 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); transUnrefSrvHandle(c); - // destroyConn(c, true); } } void uvWorkerAsyncCb(uv_async_t* handle) { @@ -411,10 +411,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } if (msg->pConn == NULL) { free(msg); - destroyAllConn(pThrd); - - uv_loop_close(pThrd->loop); - uv_stop(pThrd->loop); + bool noConn = QUEUE_IS_EMPTY(&pThrd->conn); + if (noConn == true) { + uv_loop_close(pThrd->loop); + uv_stop(pThrd->loop); + } else { + destroyAllConn(pThrd); + uv_loop_close(pThrd->loop); + pThrd->stop = true; + } } else { uvStartSendResp(msg); } @@ -422,12 +427,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; + tDebug("close server port %d", srv->port); uv_close((uv_handle_t*)&srv->server, NULL); uv_stop(srv->loop); } static void uvShutDownCb(uv_shutdown_t* req, int status) { - tDebug("conn failed to shut down: %s", uv_err_name(status)); + if (status != 0) { + tDebug("conn failed to shut down: %s", uv_err_name(status)); + } uv_close((uv_handle_t*)req->handle, uvDestroyConn); free(req); } @@ -509,14 +517,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { int addrlen = sizeof(pConn->addr); if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { tError("server conn %p failed to get peer info", pConn); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); return; } addrlen = sizeof(pConn->locaddr); if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) { tError("server conn %p failed to get local info", pConn); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); return; } @@ -524,7 +532,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { } else { tDebug("failed to create new connection"); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); } } @@ -602,6 +610,7 @@ static SSrvConn* createConn(void* hThrd) { tTrace("conn %p created", pConn); pConn->broken = false; + transRefSrvHandle(pConn); return pConn; } @@ -617,25 +626,26 @@ static void destroyConn(SSrvConn* conn, bool clear) { destroySmsg(msg); } conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); - QUEUE_REMOVE(&conn->queue); - if (clear) { tTrace("try to destroy conn %p", conn); - // uv_tcp_close_reset(conn->pTcp, uvDestroyConn); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); - // uv_unref((uv_handle_t*)conn->pTcp); - // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } } static void uvDestroyConn(uv_handle_t* handle) { - SSrvConn* conn = handle->data; + SSrvConn* conn = handle->data; + SWorkThrdObj* thrd = conn->hostThrd; + tDebug("server conn %p destroy", conn); uv_timer_stop(conn->pTimer); - // free(conn->pTimer); + QUEUE_REMOVE(&conn->queue); free(conn->pTcp); free(conn->pWriter); free(conn); + + if (thrd->stop && QUEUE_IS_EMPTY(&thrd->conn)) { + uv_stop(thrd->loop); + } } static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) { STransMsgHead* pHead = (STransMsgHead*)msg; @@ -670,6 +680,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + thrd->stop = false; srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); @@ -767,6 +778,7 @@ void transUnrefSrvHandle(void* handle) { return; } int ref = T_REF_DEC((SSrvConn*)handle); + tDebug("handle %p ref count: %d", handle, ref); if (ref == 0) { destroyConn((SSrvConn*)handle, true); From 5262b4ad3b78c0f5cce8ff4a98f1e7f880588386 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Mar 2022 11:46:05 +0800 Subject: [PATCH 8/8] modify transport --- include/libs/transport/trpc.h | 1 - source/client/src/clientEnv.c | 3 +-- source/dnode/mgmt/impl/src/dndTransport.c | 11 +++++------ source/libs/transport/inc/transportInt.h | 1 - source/libs/transport/src/rpcMain.c | 1 - source/libs/transport/src/trans.c | 1 - source/libs/transport/src/transCli.c | 14 +++----------- 7 files changed, 9 insertions(+), 23 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 101092f13e..6ccb6c0dc4 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -64,7 +64,6 @@ typedef struct SRpcInit { int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int idleTime; // milliseconds, 0 means idle timer is disabled - bool noPool; // create conn pool or not // the following is for client app ecurity only char *user; // user name char spi; // security parameter index diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 08285c9d26..edb3bf4f11 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -72,8 +72,6 @@ static void deregisterRequest(SRequestObj *pRequest) { taosReleaseRef(clientConnRefPool, pTscObj->id); } - - // todo close the transporter properly void closeTransporter(STscObj *pTscObj) { if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { @@ -241,6 +239,7 @@ void taos_init_imp(void) { clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); + // transDestroyBuffer(&conn->readBuf); taosGetAppName(appInfo.appName, NULL); pthread_mutex_init(&appInfo.mutex, NULL); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 157bad26a6..db1b15dd58 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -25,8 +25,8 @@ #include "dndMnode.h" #include "dndVnodes.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dndInitMsgFp(STransMgmt *pMgmt) { @@ -155,7 +155,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode *pDnode = parent; + SDnode * pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -193,7 +193,6 @@ static int32_t dndInitClient(SDnode *pDnode) { rpcInit.ckey = INTERNAL_CKEY; rpcInit.spi = 1; rpcInit.parent = pDnode; - rpcInit.noPool = true; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); @@ -219,7 +218,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode *pDnode = param; + SDnode * pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -313,7 +312,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void *pReq = rpcMallocCont(contLen); + void * pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 73137487eb..4e4dcf7aa4 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -54,7 +54,6 @@ typedef struct { int8_t connType; int64_t index; char label[TSDB_LABEL_LEN]; - bool noPool; // pool or not char user[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index e1319da162..7056d998f9 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -64,7 +64,6 @@ typedef struct { void (*cfp)(void *parent, SRpcMsg *, SEpSet *); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); - bool noPool; int32_t refCount; void * parent; void * idPool; // handle to ID pool diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f3e0417397..4d244665c7 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -41,7 +41,6 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->numOfThreads = pInit->numOfThreads; } - pRpc->noPool = pInit->noPool; pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0d3946d967..13a5d57dfe 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -170,13 +170,7 @@ static void clientHandleResp(SCliConn* conn) { // user owns conn->persist = 1 if (conn->persist == 0) { - if (pTransInst->noPool == true) { - destroyCmsg(conn->data); - clientConnDestroy(conn, true); - return; - } else { - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); - } + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); } destroyCmsg(conn->data); conn->data = NULL; @@ -463,10 +457,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); } } else { - if (pTransInst->noPool == false) { - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); - } + conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } if (conn != NULL) {