diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5e3860822e..6ccb6c0dc4 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -64,7 +64,6 @@ typedef struct SRpcInit { int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int idleTime; // milliseconds, 0 means idle timer is disabled - bool noPool; // create conn pool or not // the following is for client app ecurity only char *user; // user name char spi; // security parameter index @@ -86,7 +85,7 @@ typedef struct SRpcInit { int32_t rpcInit(); void rpcCleanup(); -void *rpcOpen(const SRpcInit *pRpc); +void * rpcOpen(const SRpcInit *pRpc); void rpcClose(void *); void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); @@ -99,6 +98,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +void rpcRefHandle(void *handle, int8_t type); +void rpcUnrefHandle(void *handle, int8_t type); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 9d6b559eee..525c5f9fb8 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -72,8 +72,6 @@ static void deregisterRequest(SRequestObj *pRequest) { taosReleaseRef(clientConnRefPool, pTscObj->id); } - - // todo close the transporter properly void closeTransporter(STscObj *pTscObj) { if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { @@ -241,6 +239,7 @@ void taos_init_imp(void) { clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); + // transDestroyBuffer(&conn->readBuf); taosGetAppName(appInfo.appName, NULL); pthread_mutex_init(&appInfo.mutex, NULL); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2c83e1e113..13a4fc70f7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -215,8 +215,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t } } + int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; + SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res); if (code != TSDB_CODE_SUCCESS) { @@ -230,12 +232,12 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; - + if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); } } - + pRequest->code = res.code; return pRequest->code; } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 78bd71b919..15db36477d 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -25,8 +25,8 @@ #include "dndMnode.h" #include "dndVnodes.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dndInitMsgFp(STransMgmt *pMgmt) { @@ -156,7 +156,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode *pDnode = parent; + SDnode * pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -194,7 +194,6 @@ static int32_t dndInitClient(SDnode *pDnode) { rpcInit.ckey = INTERNAL_CKEY; rpcInit.spi = 1; rpcInit.parent = pDnode; - rpcInit.noPool = true; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); @@ -220,7 +219,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode *pDnode = param; + SDnode * pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -314,7 +313,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void *pReq = rpcMallocCont(contLen); + void * pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index ae0a6c775e..744f6ca70b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -341,6 +341,8 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // TODO: iterator mem and tidex STermValueType s = kTypeValue; + int64_t st = taosGetTimestampUs(); + SIdxTempResult* tr = sIdxTempResultCreate(); if (0 == indexCacheSearch(cache, query, tr, &s)) { if (s == kTypeDeletion) { @@ -348,17 +350,23 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // coloum already drop by other oper, no need to query tindex return 0; } else { + st = taosGetTimestampUs(); if (0 != indexTFileSearch(sIdx->tindex, query, tr)) { indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal); goto END; } + int64_t tfCost = taosGetTimestampUs() - st; + indexInfo("tfile search cost: %" PRIu64 "us", tfCost); } } else { indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal); goto END; } + int64_t cost = taosGetTimestampUs() - st; + indexInfo("search cost: %" PRIu64 "us", cost); sIdxTempResultMergeTo(*result, tr); + sIdxTempResultDestroy(tr); return 0; END: diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index d3b25afdbc..b40ded9e9a 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -276,7 +276,12 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI } else if (c->operaType == DEL_VALUE) { INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) } + } else { + break; } + } else if (qtype == QUERY_PREFIX) { + } else if (qtype == QUERY_SUFFIX) { + } else if (qtype == QUERY_RANGE) { } } } @@ -284,6 +289,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI return 0; } int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { + int64_t st = taosGetTimestampUs(); if (cache == NULL) { return 0; } @@ -312,12 +318,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result // continue search in imm ret = indexQueryMem(imm, &ct, qtype, result, s); } + if (hasJson) { tfree(p); } indexMemUnRef(mem); indexMemUnRef(imm); + indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st); return ret; } diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 89f3f8ba8a..780b7160fc 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -189,8 +189,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); EIndexQueryType qtype = query->qType; - SArray* result = taosArrayInit(16, sizeof(uint64_t)); - int ret = -1; + // SArray* result = taosArrayInit(16, sizeof(uint64_t)); + int ret = -1; // refactor to callback later if (qtype == QUERY_TERM) { uint64_t offset; @@ -200,11 +200,18 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul p = indexPackJsonData(term); sz = strlen(p); } + int64_t st = taosGetTimestampUs(); FstSlice key = fstSliceCreate(p, sz); if (fstGet(reader->fst, &key, &offset)) { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, - term->colVal); - ret = tfileReaderLoadTableIds(reader, offset, result); + int64_t et = taosGetTimestampUs(); + int64_t cost = et - st; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us", + term->suid, term->colName, term->colVal, cost); + + ret = tfileReaderLoadTableIds(reader, offset, tr->total); + cost = taosGetTimestampUs() - et; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", term->suid, + term->colName, term->colVal, cost); } else { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal); @@ -225,8 +232,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul } tfileReaderUnRef(reader); - taosArrayAddAll(tr->total, result); - taosArrayDestroy(result); + // taosArrayAddAll(tr->total, result); + // taosArrayDestroy(result); return ret; } @@ -391,6 +398,7 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result return ret; } + int64_t st = taosGetTimestampUs(); IndexTFile* pTfile = tfile; SIndexTerm* term = query->term; @@ -399,6 +407,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result if (reader == NULL) { return 0; } + int64_t cost = taosGetTimestampUs() - st; + indexInfo("index tfile stage 1 cost: %" PRId64 "", cost); return tfileReaderSearch(reader, query, result); } diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index 618e20bc4b..410d5b7021 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -258,7 +258,7 @@ void checkFstCheckIterator() { // prefix search std::vector result; - AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS); + AutomationCtx* ctx = automCtxCreate((void*)"H", AUTOMATION_PREFIX); m->Search(ctx, result); std::cout << "size: " << result.size() << std::endl; // assert(result.size() == count); @@ -328,11 +328,11 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) { int main(int argc, char* argv[]) { // tool to check all kind of fst test // if (argc > 1) { validateTFile(argv[1]); } - if (argc > 4) { - // path suid colName ver - iterTFileReader(argv[1], argv[2], argv[3], argv[4]); - } - // checkFstCheckIterator(); + // if (argc > 4) { + // path suid colName ver + // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); + //} + checkFstCheckIterator(); // checkFstLongTerm(); // checkFstPrefixSearch(); diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 699c785be5..5f339f2865 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -768,7 +768,7 @@ class IndexObj { int64_t s = taosGetTimestampUs(); if (Search(mq, result) == 0) { int64_t e = taosGetTimestampUs(); - std::cout << "search one successfully and time cost:" << e - s << "\tquery col:" << colName + std::cout << "search one successfully and time cost:" << e - s << "us\tquery col:" << colName << "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl; } else { } @@ -1106,6 +1106,7 @@ TEST_F(IndexEnv2, testIndex_del) { } index->Del("tag10", "Hello", 12); index->Del("tag10", "Hello", 11); + EXPECT_EQ(98, index->SearchOne("tag10", "Hello")); index->WriteMultiMillonData("tag10", "xxxxxxxxxxxxxx", 100 * 10000); index->Del("tag10", "Hello", 17); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 985d2f2f2f..f2ac77fe61 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -121,9 +121,8 @@ typedef struct { } SRpcReqContext; typedef struct { - SRpcInfo* pTransInst; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app // struct SRpcConn* pConn; // pConn allocated tmsg_t msgType; // message type uint8_t* pCont; // content provided by app @@ -242,8 +241,12 @@ int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); -// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); +int transSetConnOption(uv_tcp_t* stream); -// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool ); +void transRefSrvHandle(void* handle); +void transUnrefSrvHandle(void* handle); + +void transRefCliHandle(void* handle); +void transUnrefCliHandle(void* handle); #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 73137487eb..4e4dcf7aa4 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -54,7 +54,6 @@ typedef struct { int8_t connType; int64_t index; char label[TSDB_LABEL_LEN]; - bool noPool; // pool or not char user[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 615c576a9b..86dd17bb0c 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -64,7 +64,6 @@ typedef struct { void (*cfp)(void *parent, SRpcMsg *, SEpSet *); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); - bool noPool; int32_t refCount; void * parent; void * idPool; // handle to ID pool diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index b45683617f..4d244665c7 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -41,7 +41,6 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->numOfThreads = pInit->numOfThreads; } - pRpc->noPool = pInit->noPool; pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); @@ -122,4 +121,17 @@ void rpcCleanup(void) { // return; } + +void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; +void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; + +void rpcRefHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*taosRefHandle[type])(handle); +} +void rpcUnrefHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*taosUnRefHandle[type])(handle); +} + #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fb76f38fe5..13a5d57dfe 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -20,7 +20,10 @@ #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) + typedef struct SCliConn { + T_REF_DECLARE() uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; @@ -32,8 +35,7 @@ typedef struct SCliConn { int8_t ctnRdCnt; // continue read count int hThrdIdx; - SRpcPush* push; - int persist; // + int persist; // // spi configure char spi; char secured; @@ -41,6 +43,7 @@ typedef struct SCliConn { // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; + } SCliConn; typedef struct SCliMsg { @@ -54,14 +57,17 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; // uv_async_t* cliAsync; // - SAsyncPool* asyncPool; - uv_timer_t* timer; - void* pool; // conn pool + SAsyncPool* asyncPool; + uv_timer_t* timer; + void* pool; // conn pool + + // msg queue queue msg; pthread_mutex_t msgMtx; - uint64_t nextTimeout; // next timeout - void* pTransInst; // - bool quit; + + uint64_t nextTimeout; // next timeout + void* pTransInst; // + bool quit; } SCliThrdObj; typedef struct SClientObj { @@ -96,7 +102,7 @@ static void clientAsyncCb(uv_async_t* handle); static void clientDestroy(uv_handle_t* handle); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -// process data read from server, auth/decompress etc later +// process data read from server, add decompress etc later static void clientHandleResp(SCliConn* conn); // handle except about conn static void clientHandleExcept(SCliConn* conn); @@ -104,9 +110,10 @@ static void clientHandleExcept(SCliConn* conn); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientSendQuit(SCliThrdObj* thrd); - static void destroyUserdata(SRpcMsg* userdata); +static int clientRBChoseIdx(SRpcInfo* pTransInst); + static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); // thread obj @@ -115,10 +122,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd); // thread static void* clientThread(void* arg); -static void clientHandleResp(SCliConn* conn) { +static void* clientNotifyApp() {} +static void clientHandleResp(SCliConn* conn) { SCliMsg* pMsg = conn->data; STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pRpc = pCtx->pTransInst; + + SCliThrdObj* pThrd = conn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); @@ -134,56 +144,51 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) { + if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { rpcMsg.handle = conn; conn->persist = 1; tDebug("client conn %p persist by app", conn); } - tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn, + tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); conn->secured = pHead->secured; - if (conn->push != NULL && conn->ctnRdCnt != 0) { - (*conn->push->callback)(conn->push->arg, &rpcMsg); - conn->push = NULL; + + if (pCtx->pSem == NULL) { + tTrace("%s client conn %p handle resp", pTransInst->label, conn); + (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - if (pCtx->pSem == NULL) { - tTrace("%s client conn %p handle resp", pRpc->label, conn); - (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); - } else { - tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn); - memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); - tsem_post(pCtx->pSem); - } + tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn); + memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); + tsem_post(pCtx->pSem); } conn->ctnRdCnt += 1; uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); - SCliThrdObj* pThrd = conn->hostThrd; - // user owns conn->persist = 1 - if (conn->push == NULL && conn->persist == 0) { - if (pRpc->noPool == true) { - } else { - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); - } + if (conn->persist == 0) { + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); } destroyCmsg(conn->data); conn->data = NULL; + // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { + if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { - if (pConn->data == NULL && pConn->push == NULL) { + if (pConn->data == NULL) { // handle conn except in conn pool clientConnDestroy(pConn, true); return; } + SCliThrdObj* pThrd = pConn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; + SCliMsg* pMsg = pConn->data; STransConnCtx* pCtx = pMsg->ctx; @@ -192,29 +197,19 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.msgType = pMsg->msg.msgType + 1; - if (pConn->push != NULL && pConn->ctnRdCnt != 0) { - (*pConn->push->callback)(pConn->push->arg, &rpcMsg); - pConn->push = NULL; + if (pCtx->pSem == NULL) { + tTrace("%s client conn %p handle resp", pTransInst->label, pConn); + (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { - if (pCtx->pSem == NULL) { - (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); - } else { - memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); - tsem_post(pCtx->pSem); - } - if (pConn->push != NULL) { - (*pConn->push->callback)(pConn->push->arg, &rpcMsg); - } - pConn->push = NULL; + tTrace("%s client conn(sync) %p handle resp", pTransInst->label, pConn); + memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); + tsem_post(pCtx->pSem); } - tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn); - if (pConn->push == NULL) { - destroyCmsg(pConn->data); - pConn->data = NULL; - } - // transDestroyConnCtx(pCtx); + destroyCmsg(pConn->data); + pConn->data = NULL; + + tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); clientConnDestroy(pConn, true); - pConn->ctnRdCnt += 1; } static void clientTimeoutCb(uv_timer_t* handle) { @@ -316,17 +311,14 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (transReadComplete(pBuf)) { - tTrace("client conn %p read complete", conn); + tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn); clientHandleResp(conn); } else { - tTrace("client conn %p read partial packet, continue to read", conn); + tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); } return; } - if (nread == UV_EOF) { - tError("client conn %p read error: %s", conn, uv_err_name(nread)); - clientHandleExcept(conn); - } + assert(nread <= 0); if (nread == 0) { // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb @@ -335,18 +327,16 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf return; } if (nread < 0) { - tError("client conn %p read error: %s", conn, uv_err_name(nread)); + tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); clientHandleExcept(conn); } - // tDebug("Read error %s\n", uv_err_name(nread)); - // uv_close((uv_handle_t*)handle, clientDestroy); } static void clientConnDestroy(SCliConn* conn, bool clear) { // conn->ref--; if (conn->ref == 0) { - tTrace("client conn %p remove from conn pool", conn); + tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, clientDestroy); @@ -355,20 +345,18 @@ static void clientConnDestroy(SCliConn* conn, bool clear) { } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - // transDestroyBuffer(&conn->readBuf); free(conn->stream); free(conn->writeReq); - tTrace("client conn %p destroy successfully", conn); + tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); - - // clientConnDestroy(conn, false); } static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; + if (status == 0) { - tTrace("client conn %p data already was written out", pConn); + tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { // handle @@ -376,18 +364,19 @@ static void clientWriteCb(uv_write_t* req, int status) { } destroyUserdata(&pMsg->msg); } else { - tError("client conn %p failed to write: %s", pConn, uv_err_name(status)); + tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status)); clientHandleExcept(pConn); return; } - SCliThrdObj* pThrd = pConn->hostThrd; uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); } static void clientWrite(SCliConn* pConn) { SCliMsg* pCliMsg = pConn->data; STransConnCtx* pCtx = pCliMsg->ctx; - SRpcInfo* pTransInst = pCtx->pTransInst; + + SCliThrdObj* pThrd = pConn->hostThrd; + SRpcInfo* pTransInst = pThrd->pTransInst; SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); @@ -416,20 +405,18 @@ static void clientWrite(SCliConn* pConn) { pHead->msgType = pMsg->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - // if (pHead->msgType == TDMT_VND_QUERY || pHead->msgType == TDMT_VND_) - uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), - inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port)); + tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; if (status != 0) { - // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status)); + tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); clientHandleExcept(pConn); return; } @@ -439,7 +426,7 @@ static void clientConnCb(uv_connect_t* req, int status) { addrlen = sizeof(pConn->locaddr); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); - tTrace("client conn %p connect to server successfully", pConn); + tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -458,25 +445,20 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("client msg tran time cost: %" PRIu64 "us", el); - et = taosGetTimestampUs(); + tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pTransInst = pThrd->pTransInst; + SCliConn* conn = NULL; - SCliConn* conn = NULL; - if (pMsg->msg.handle == NULL) { - if (pCtx->pTransInst->noPool == true) { - } else { - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - } - if (conn != NULL) { - tTrace("client conn %p get from conn pool", conn); - } - } else { + if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); if (conn != NULL) { - tTrace("client conn %p reused", conn); + tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); } + } else { + conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } if (conn != NULL) { @@ -489,7 +471,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { return; } clientWrite(conn); - } else { conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -497,12 +478,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1); - int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1); - if (ret) { - tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret)); - } - // write req handle + conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq->data = conn; @@ -512,17 +488,17 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->hostThrd = pThrd; - // conn->push = pMsg->msg.push; - // conn->ctnRdCnt = 0; - + int ret = transSetConnOption((uv_tcp_t*)conn->stream); + if (ret) { + tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); + } struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); // handle error in callback if fail to connect - tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port); + tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); } - conn->push = pMsg->msg.push; conn->ctnRdCnt = 0; conn->hThrdIdx = pCtx->hThrdIdx; } @@ -548,7 +524,6 @@ static void clientAsyncCb(uv_async_t* handle) { } else { clientHandleReq(pMsg, pThrd); } - // clientHandleReq(pMsg, pThrd); count++; } if (count >= 2) { @@ -643,8 +618,6 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { static void clientSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); - msg->ctx = NULL; // - transSendAsync(thrd->asyncPool, &msg->q); } void taosCloseClient(void* arg) { @@ -656,37 +629,53 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } -static int clientRBChoseIdx(SRpcInfo* pRpc) { - int64_t index = pRpc->index; - if (pRpc->index++ >= pRpc->numOfThreads) { - pRpc->index = 0; +static int clientRBChoseIdx(SRpcInfo* pTransInst) { + int64_t index = pTransInst->index; + if (pTransInst->index++ >= pTransInst->numOfThreads) { + pTransInst->index = 0; } - return index % pRpc->numOfThreads; + return index % pTransInst->numOfThreads; +} +void transRefCliHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_INC((SCliConn*)handle); + UNUSED(ref); +} +void transUnrefCliHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_DEC((SCliConn*)handle); + if (ref == 0) { + } + + // unref cli handle } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - SRpcInfo* pRpc = (SRpcInfo*)shandle; + SRpcInfo* pTransInst = (SRpcInfo*)shandle; int index = CONN_HOST_THREAD_INDEX(pMsg->handle); if (index == -1) { - index = clientRBChoseIdx(pRpc); + index = clientRBChoseIdx(pTransInst); } int32_t flen = 0; if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; pCtx->ip = strdup(ip); pCtx->port = port; pCtx->hThrdIdx = index; - assert(pRpc->connType == TAOS_CONN_CLIENT); + assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); @@ -694,7 +683,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } @@ -702,15 +691,14 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - SRpcInfo* pRpc = (SRpcInfo*)shandle; + SRpcInfo* pTransInst = (SRpcInfo*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { - index = clientRBChoseIdx(pRpc); + index = clientRBChoseIdx(pTransInst); } STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); - pCtx->pTransInst = (SRpcInfo*)shandle; pCtx->ahandle = pReq->ahandle; pCtx->msgType = pReq->msgType; pCtx->ip = strdup(ip); @@ -725,7 +713,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); - SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; + SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; tsem_wait(pSem); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 9a8607b0ed..92e42cb380 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -257,6 +257,12 @@ int transDestroyBuffer(SConnBuffer* buf) { transClearBuffer(buf); } +int transSetConnOption(uv_tcp_t* stream) { + uv_tcp_nodelay(stream, 1); + int ret = uv_tcp_keepalive(stream, 5, 5); + return ret; +} + SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); pool->index = 0; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index ce78d83bdf..9e68b0bf7b 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -18,6 +18,7 @@ #include "transComm.h" typedef struct SSrvConn { + T_REF_DECLARE() uv_tcp_t* pTcp; uv_write_t* pWriter; uv_timer_t* pTimer; @@ -32,11 +33,11 @@ typedef struct SSrvConn { void* ahandle; // void* hostThrd; SArray* srvMsgs; - // void* pSrvMsg; + + bool broken; // conn broken; struct sockaddr_in addr; struct sockaddr_in locaddr; - // SRpcMsg sendMsg; // del later char secured; @@ -64,19 +65,23 @@ typedef struct SWorkThrdObj { queue conn; pthread_mutex_t msgMtx; void* pTransInst; + bool stop; } SWorkThrdObj; typedef struct SServerObj { - pthread_t thread; - uv_tcp_t server; - uv_loop_t* loop; + pthread_t thread; + uv_tcp_t server; + uv_loop_t* loop; + + // work thread info int workerIdx; int numOfThreads; SWorkThrdObj** pThreadObj; - uv_pipe_t** pipe; - uint32_t ip; - uint32_t port; - uv_async_t* pAcceptAsync; // just to quit from from accept thread + + uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; + uv_async_t* pAcceptAsync; // just to quit from from accept thread } SServerObj; static const char* notify = "a"; @@ -202,7 +207,6 @@ static void uvHandleReq(SSrvConn* pConn) { } pConn->inType = pHead->msgType; - // assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; pHead->code = htonl(pHead->code); @@ -226,7 +230,8 @@ static void uvHandleReq(SSrvConn* pConn) { rpcMsg.handle = pConn; transClearBuffer(&pConn->readBuf); - pConn->ref++; + + transRefSrvHandle(pConn); tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); @@ -251,23 +256,20 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } - if (nread == UV_EOF) { - tError("server conn %p read error: %s", conn, uv_err_name(nread)); - if (conn->ref > 1) { - conn->ref++; // ref > 1 signed that write is in progress - } - destroyConn(conn, true); - return; - } if (nread == 0) { return; } - if (nread < 0 || nread != UV_EOF) { - if (conn->ref > 1) { - conn->ref++; // ref > 1 signed that write is in progress - } - tError("server conn %p read error: %s", conn, uv_err_name(nread)); - destroyConn(conn, true); + + tError("server conn %p read error: %s", conn, uv_err_name(nread)); + if (nread < 0 || nread == UV_EOF) { + conn->broken = true; + transUnrefSrvHandle(conn); + + // if (conn->ref > 1) { + // conn->ref++; // ref > 1 signed that write is in progress + //} + // tError("server conn %p read error: %s", conn, uv_err_name(nread)); + // destroyConn(conn, true); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -300,10 +302,9 @@ void uvOnWriteCb(uv_write_t* req, int status) { } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); - // - destroyConn(conn, true); + conn->broken = false; + transUnrefSrvHandle(conn); } - // opt } static void uvOnPipeWriteCb(uv_write_t* req, int status) { if (status == 0) { @@ -349,15 +350,18 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { SSrvConn* pConn = smsg->pConn; uv_timer_stop(pConn->pTimer); - - // pConn->pSrvMsg = smsg; - // conn->pWriter->data = smsg; uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); } static void uvStartSendResp(SSrvMsg* smsg) { // impl SSrvConn* pConn = smsg->pConn; - pConn->ref--; // + + if (pConn->broken == true) { + transUnrefSrvHandle(pConn); + return; + } + transUnrefSrvHandle(pConn); + if (taosArrayGetSize(pConn->srvMsgs) > 0) { tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); @@ -382,7 +386,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { QUEUE_INIT(h); SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); - destroyConn(c, true); + transUnrefSrvHandle(c); } } void uvWorkerAsyncCb(uv_async_t* handle) { @@ -390,11 +394,11 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = item->pThrd; SSrvConn* conn = NULL; queue wq; + // batch process to avoid to lock/unlock frequently pthread_mutex_lock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); pthread_mutex_unlock(&item->mtx); - // pthread_mutex_unlock(&mtx); while (!QUEUE_IS_EMPTY(&wq)) { queue* head = QUEUE_HEAD(&wq); @@ -407,11 +411,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } if (msg->pConn == NULL) { free(msg); - - destroyAllConn(pThrd); - - uv_loop_close(pThrd->loop); - uv_stop(pThrd->loop); + bool noConn = QUEUE_IS_EMPTY(&pThrd->conn); + if (noConn == true) { + uv_loop_close(pThrd->loop); + uv_stop(pThrd->loop); + } else { + destroyAllConn(pThrd); + uv_loop_close(pThrd->loop); + pThrd->stop = true; + } } else { uvStartSendResp(msg); } @@ -419,12 +427,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; + tDebug("close server port %d", srv->port); uv_close((uv_handle_t*)&srv->server, NULL); uv_stop(srv->loop); } static void uvShutDownCb(uv_shutdown_t* req, int status) { - tDebug("conn failed to shut down: %s", uv_err_name(status)); + if (status != 0) { + tDebug("conn failed to shut down: %s", uv_err_name(status)); + } uv_close((uv_handle_t*)req->handle, uvDestroyConn); free(req); } @@ -493,13 +504,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; - // uv_tcp_nodelay(pConn->pTcp, 1); - // uv_tcp_keepalive(pConn->pTcp, 1, 1); - - // init write request, just pConn->pWriter = calloc(1, sizeof(uv_write_t)); pConn->pWriter->data = pConn; + transSetConnOption((uv_tcp_t*)pConn->pTcp); + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); @@ -508,14 +517,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { int addrlen = sizeof(pConn->addr); if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { tError("server conn %p failed to get peer info", pConn); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); return; } addrlen = sizeof(pConn->locaddr); if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) { tError("server conn %p failed to get local info", pConn); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); return; } @@ -523,7 +532,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { } else { tDebug("failed to create new connection"); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); } } @@ -599,7 +608,10 @@ static SSrvConn* createConn(void* hThrd) { QUEUE_PUSH(&pThrd->conn, &pConn->queue); pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); - ++pConn->ref; + + pConn->broken = false; + + transRefSrvHandle(pConn); return pConn; } @@ -607,10 +619,6 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (conn == NULL) { return; } - tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref); - if (--conn->ref > 0) { - return; - } transDestroyBuffer(&conn->readBuf); for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { @@ -618,25 +626,26 @@ static void destroyConn(SSrvConn* conn, bool clear) { destroySmsg(msg); } conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); - QUEUE_REMOVE(&conn->queue); - if (clear) { tTrace("try to destroy conn %p", conn); - uv_tcp_close_reset(conn->pTcp, uvDestroyConn); - // uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); - // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); - // uv_unref((uv_handle_t*)conn->pTcp); - // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); + uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); + uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } } static void uvDestroyConn(uv_handle_t* handle) { - SSrvConn* conn = handle->data; + SSrvConn* conn = handle->data; + SWorkThrdObj* thrd = conn->hostThrd; + tDebug("server conn %p destroy", conn); uv_timer_stop(conn->pTimer); - // free(conn->pTimer); + QUEUE_REMOVE(&conn->queue); free(conn->pTcp); free(conn->pWriter); free(conn); + + if (thrd->stop && QUEUE_IS_EMPTY(&thrd->conn)) { + uv_stop(thrd->loop); + } } static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) { STransMsgHead* pHead = (STransMsgHead*)msg; @@ -671,6 +680,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + thrd->stop = false; srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); @@ -720,8 +730,6 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { pthread_join(pThrd->thread, NULL); free(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); - - // free(pThrd->workerAsync); free(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { @@ -755,6 +763,28 @@ void taosCloseServer(void* arg) { free(srv); } +void transRefSrvHandle(void* handle) { + if (handle == NULL) { + return; + } + SSrvConn* conn = handle; + + int ref = T_REF_INC((SSrvConn*)handle); + UNUSED(ref); +} + +void transUnrefSrvHandle(void* handle) { + if (handle == NULL) { + return; + } + int ref = T_REF_DEC((SSrvConn*)handle); + tDebug("handle %p ref count: %d", handle, ref); + + if (ref == 0) { + destroyConn((SSrvConn*)handle, true); + } + // unref srv handle +} void rpcSendResponse(const SRpcMsg* pMsg) { if (pMsg->handle == NULL) { return; diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index f3940e34f9..f30d725f4c 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -124,6 +124,7 @@ int main(int argc, char *argv[]) { rpcInit.ckey = "key"; rpcInit.spi = 1; rpcInit.connType = TAOS_CONN_CLIENT; + rpcDebugFlag = 143; for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index da5284b5c5..3a086371b0 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -125,6 +125,8 @@ int main(int argc, char *argv[]) { rpcInit.idleTime = 2 * 1500; rpcInit.afp = retrieveAuthInfo; + rpcDebugFlag = 143; + for (int i = 1; i < argc; ++i) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { rpcInit.localPort = atoi(argv[++i]);