diff --git a/cmake/cmake.options b/cmake/cmake.options index e84d02800c..e19c10f6b2 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -47,13 +47,13 @@ option( option( BUILD_WITH_UV "If build with libuv" - OFF + ON ) option( BUILD_WITH_UV_TRANS "If build with libuv_trans " - OFF + ON ) option( diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index f913ba06d0..538aeb1a0e 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -64,6 +64,7 @@ typedef struct SRpcInit { int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int idleTime; // milliseconds, 0 means idle timer is disabled + bool noPool; // create conn pool or not // the following is for client app ecurity only char *user; // user name char spi; // security parameter index diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index a0ba71a1eb..4e35baf905 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -155,7 +155,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode *pDnode = parent; + SDnode * pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pRsp->msgType; @@ -193,6 +193,7 @@ static int32_t dndInitClient(SDnode *pDnode) { rpcInit.ckey = INTERNAL_CKEY; rpcInit.spi = 1; rpcInit.parent = pDnode; + rpcInit.noPool = true; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); @@ -218,7 +219,7 @@ static void dndCleanupClient(SDnode *pDnode) { } static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode *pDnode = param; + SDnode * pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; tmsg_t msgType = pReq->msgType; @@ -312,7 +313,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void *pReq = rpcMallocCont(contLen); + void * pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; diff --git a/source/dnode/mgmt/impl/test/sut/inc/client.h b/source/dnode/mgmt/impl/test/sut/inc/client.h index 9cf688fc02..925680d528 100644 --- a/source/dnode/mgmt/impl/test/sut/inc/client.h +++ b/source/dnode/mgmt/impl/test/sut/inc/client.h @@ -21,16 +21,21 @@ class TestClient { bool Init(const char* user, const char* pass, const char* fqdn, uint16_t port); void Cleanup(); + void DoInit(); + SRpcMsg* SendReq(SRpcMsg* pReq); void SetRpcRsp(SRpcMsg* pRsp); tsem_t* GetSem(); + void Restart(); private: char fqdn[TSDB_FQDN_LEN]; uint16_t port; + char user[128]; + char pass[128]; void* clientRpc; SRpcMsg* pRsp; tsem_t sem; }; -#endif /* _TD_TEST_CLIENT_H_ */ \ No newline at end of file +#endif /* _TD_TEST_CLIENT_H_ */ diff --git a/source/dnode/mgmt/impl/test/sut/inc/sut.h b/source/dnode/mgmt/impl/test/sut/inc/sut.h index 23913b0531..250d563a8b 100644 --- a/source/dnode/mgmt/impl/test/sut/inc/sut.h +++ b/source/dnode/mgmt/impl/test/sut/inc/sut.h @@ -20,10 +20,10 @@ #include "os.h" #include "dnode.h" -#include "tmsg.h" #include "tconfig.h" #include "tdataformat.h" #include "tglobal.h" +#include "tmsg.h" #include "tnote.h" #include "trpc.h" #include "tthread.h" @@ -39,6 +39,7 @@ class Testbase { void Restart(); void ServerStop(); void ServerStart(); + void ClientRestart(); SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen); private: @@ -100,7 +101,7 @@ class Testbase { { \ char* bytes = (char*)calloc(1, len); \ for (int32_t i = 0; i < len - 1; ++i) { \ - bytes[i] = b; \ + bytes[i] = b; \ } \ EXPECT_STREQ(test.GetShowBinary(len), bytes); \ } @@ -138,4 +139,4 @@ class Testbase { #define IgnoreTimestamp() \ { test.GetShowTimestamp(); } -#endif /* _TD_TEST_BASE_H_ */ \ No newline at end of file +#endif /* _TD_TEST_BASE_H_ */ diff --git a/source/dnode/mgmt/impl/test/sut/src/client.cpp b/source/dnode/mgmt/impl/test/sut/src/client.cpp index 8403dbf034..589c015013 100644 --- a/source/dnode/mgmt/impl/test/sut/src/client.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/client.cpp @@ -13,33 +13,38 @@ * along with this program. If not, see . */ -#include "tep.h" #include "sut.h" +#include "tep.h" static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { TestClient* client = (TestClient*)parent; client->SetRpcRsp(pRsp); - uInfo("response:%s from dnode, code:0x%x", TMSG_INFO(pRsp->msgType), pRsp->code); + uInfo("x response:%s from dnode, code:0x%x, msgSize: %d", TMSG_INFO(pRsp->msgType), pRsp->code, pRsp->contLen); tsem_post(client->GetSem()); } -void TestClient::SetRpcRsp(SRpcMsg* pRsp) { this->pRsp = pRsp; }; +void TestClient::SetRpcRsp(SRpcMsg* rsp) { + this->pRsp = (SRpcMsg*)calloc(1, sizeof(SRpcMsg)); + this->pRsp->msgType = rsp->msgType; + this->pRsp->code = rsp->code; + this->pRsp->pCont = rsp->pCont; + this->pRsp->contLen = rsp->contLen; +}; tsem_t* TestClient::GetSem() { return &sem; } -bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint16_t port) { +void TestClient::DoInit() { char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt); - SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = (char*)"DND-C"; + rpcInit.label = (char*)"shell"; rpcInit.numOfThreads = 1; rpcInit.cfp = processClientRsp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = 30 * 1000; - rpcInit.user = (char*)user; + rpcInit.user = (char*)this->user; rpcInit.ckey = (char*)"key"; rpcInit.parent = this; rpcInit.secret = (char*)secretEncrypt; @@ -47,11 +52,16 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint clientRpc = rpcOpen(&rpcInit); ASSERT(clientRpc); + tsem_init(&this->sem, 0, 0); +} - tsem_init(&sem, 0, 0); +bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint16_t port) { strcpy(this->fqdn, fqdn); + strcpy(this->user, user); + strcpy(this->pass, pass); this->port = port; - + // this->pRsp = NULL; + this->DoInit(); return true; } @@ -60,11 +70,16 @@ void TestClient::Cleanup() { rpcClose(clientRpc); } +void TestClient::Restart() { + this->Cleanup(); + this->DoInit(); +} SRpcMsg* TestClient::SendReq(SRpcMsg* pReq) { SEpSet epSet = {0}; addEpIntoEpSet(&epSet, fqdn, port); rpcSendRequest(clientRpc, &epSet, pReq, NULL); tsem_wait(&sem); + uInfo("y response:%s from dnode, code:0x%x, msgSize: %d", TMSG_INFO(pRsp->msgType), pRsp->code, pRsp->contLen); return pRsp; } diff --git a/source/dnode/mgmt/impl/test/sut/src/sut.cpp b/source/dnode/mgmt/impl/test/sut/src/sut.cpp index 09a738be3b..771c5886ef 100644 --- a/source/dnode/mgmt/impl/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/sut.cpp @@ -21,9 +21,9 @@ void Testbase::InitLog(const char* path) { mDebugFlag = 143; cDebugFlag = 0; jniDebugFlag = 0; - tmrDebugFlag = 0; - uDebugFlag = 0; - rpcDebugFlag = 0; + tmrDebugFlag = 143; + uDebugFlag = 143; + rpcDebugFlag = 143; qDebugFlag = 0; wDebugFlag = 0; sDebugFlag = 0; @@ -66,16 +66,21 @@ void Testbase::Init(const char* path, int16_t port) { void Testbase::Cleanup() { tFreeSTableMetaRsp(&metaRsp); - server.Stop(); client.Cleanup(); + taosMsleep(10); + server.Stop(); dndCleanup(); } -void Testbase::Restart() { server.Restart(); } +void Testbase::Restart() { + server.Restart(); + client.Restart(); +} void Testbase::ServerStop() { server.Stop(); } void Testbase::ServerStart() { server.DoStart(); } +void Testbase::ClientRestart() { client.Restart(); } SRpcMsg* Testbase::SendReq(tmsg_t msgType, void* pCont, int32_t contLen) { SRpcMsg rpcMsg = {0}; @@ -194,4 +199,4 @@ int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; } STableMetaRsp* Testbase::GetShowMeta() { return &metaRsp; } -SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; } \ No newline at end of file +SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 699ccab92c..64b4aa6dd7 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -77,7 +77,7 @@ static void mndTransReExecute(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } @@ -89,7 +89,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { int32_t contLen = 0; - void *pReq = mndBuildTimerMsg(&contLen); + void * pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); } @@ -404,7 +404,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { return NULL; } - if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) { + if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && + pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) { SRpcConnInfo connInfo = {0}; if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { taosFreeQitem(pMsg); @@ -439,7 +440,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; tmsg_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; + void * ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType & 1U); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); diff --git a/source/dnode/mnode/impl/test/qnode/qnode.cpp b/source/dnode/mnode/impl/test/qnode/qnode.cpp index d4e308268a..b8a0e61ca3 100644 --- a/source/dnode/mnode/impl/test/qnode/qnode.cpp +++ b/source/dnode/mnode/impl/test/qnode/qnode.cpp @@ -190,6 +190,9 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq); server2.Stop(); + taosMsleep(1000); + // test.ClientRestart(); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); @@ -226,6 +229,7 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { { // server start, wait until the rollback finished server2.DoStart(); + test.ClientRestart(); taosMsleep(1000); int32_t retry = 0; @@ -248,7 +252,6 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { ASSERT_NE(retry, retryMax); } } - TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) { { // send message first, then dnode2 crash, result is returned, and rollback is started @@ -315,4 +318,4 @@ TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) { ASSERT_NE(retry, retryMax); } -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/test/trans/trans.cpp b/source/dnode/mnode/impl/test/trans/trans.cpp index d4c40dd428..88d4fc4f75 100644 --- a/source/dnode/mnode/impl/test/trans/trans.cpp +++ b/source/dnode/mnode/impl/test/trans/trans.cpp @@ -46,8 +46,10 @@ class MndTestTrans : public ::testing::Test { free(buffer); taosFsyncFile(fd); taosCloseFile(fd); + taosMsleep(1000); test.ServerStart(); + test.ClientRestart(); } static Testbase test; @@ -284,6 +286,7 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { } } + // create db // partial create stb // drop db failed diff --git a/source/dnode/mnode/impl/test/user/user.cpp b/source/dnode/mnode/impl/test/user/user.cpp index d8ce599be1..61b99beeb7 100644 --- a/source/dnode/mnode/impl/test/user/user.cpp +++ b/source/dnode/mnode/impl/test/user/user.cpp @@ -617,6 +617,7 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) { // restart test.Restart(); + taosMsleep(1000); test.SendShowMetaReq(TSDB_MGMT_TABLE_USER, ""); CHECK_META("show users", 4); @@ -631,4 +632,4 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) { CheckTimestamp(); CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN); -} \ No newline at end of file +} diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d5a8cf5f84..2078a218ee 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -217,7 +217,7 @@ typedef struct SConnBuffer { char* buf; int len; int cap; - int left; + int total; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 3c8c922d83..a36b671eb4 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -56,6 +56,7 @@ typedef struct { int8_t connType; int64_t index; char label[TSDB_LABEL_LEN]; + bool noPool; // pool or not char user[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 3bb7d103d7..72c1ff6893 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -64,6 +64,7 @@ typedef struct { void (*cfp)(void *parent, SRpcMsg *, SEpSet *); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); + bool noPool; int32_t refCount; void * parent; void * idPool; // handle to ID pool diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c3d3cfa2ab..48c15ca286 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -27,7 +27,7 @@ void* rpcOpen(const SRpcInit* pInit) { return NULL; } if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1); } pRpc->cfp = pInit->cfp; if (pInit->connType == TAOS_CONN_SERVER) { @@ -35,6 +35,8 @@ void* rpcOpen(const SRpcInit* pInit) { } else { pRpc->numOfThreads = pInit->numOfThreads; } + + pRpc->noPool = pInit->noPool; pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f1bd1ba980..c70cd933d5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -126,6 +126,9 @@ static void clientHandleResp(SCliConn* conn) { pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + // buf's mem alread translated to rpcMsg.pCont + transClearBuffer(&conn->readBuf); + SRpcMsg rpcMsg; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = transContFromHead((char*)pHead); @@ -134,15 +137,15 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.ahandle = pCtx->ahandle; if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP || - rpcMsg.msgType == TDMT_VND_RES_READY) { + rpcMsg.msgType == TDMT_VND_RES_READY_RSP) { rpcMsg.handle = conn; conn->persist = 1; tDebug("client conn %p persist by app", conn); } - tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), - inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), - ntohs(conn->locaddr.sin_port)); + tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn, + TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), + inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); conn->secured = pHead->secured; if (conn->push != NULL && conn->ctnRdCnt != 0) { @@ -150,26 +153,26 @@ static void clientHandleResp(SCliConn* conn) { conn->push = NULL; } else { if (pCtx->pSem == NULL) { - tTrace("client conn %p handle resp", conn); + tTrace("%s client conn %p handle resp", pRpc->label, conn); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); } else { - tTrace("client conn(sync) %p handle resp", conn); + tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } } conn->ctnRdCnt += 1; - // buf's mem alread translated to rpcMsg.pCont - transClearBuffer(&conn->readBuf); - uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); SCliThrdObj* pThrd = conn->hostThrd; // user owns conn->persist = 1 if (conn->push == NULL && conn->persist == 0) { - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + if (pRpc->noPool == true) { + } else { + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + } } destroyCmsg(conn->data); conn->data = NULL; @@ -184,7 +187,6 @@ static void clientHandleExcept(SCliConn* pConn) { clientConnDestroy(pConn, true); return; } - tTrace("client conn %p start to destroy", pConn); SCliMsg* pMsg = pConn->data; tmsg_t msgType = TDMT_MND_CONNECT; @@ -213,6 +215,7 @@ static void clientHandleExcept(SCliConn* pConn) { } pConn->push = NULL; } + tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn); if (pConn->push == NULL) { destroyCmsg(pConn->data); pConn->data = NULL; @@ -226,7 +229,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tTrace("client conn timeout, try to remove expire conn from conn pool"); + tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -307,21 +310,30 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { QUEUE_PUSH(&plist->conn, &conn->conn); } static bool clientReadComplete(SConnBuffer* data) { - STransMsgHead head; - int32_t headLen = sizeof(head); - if (data->len >= headLen) { - memcpy((char*)&head, data->buf, headLen); - int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); - if (msgLen > data->len) { - data->left = msgLen - data->len; - return false; - } else if (msgLen == data->len) { - data->left = 0; - return true; - } - } else { - return false; + if (data->len >= sizeof(STransMsgHead)) { + STransMsgHead head; + memcpy((char*)&head, data->buf, sizeof(head)); + int32_t msgLen = (int32_t)htonl(head.msgLen); + data->total = msgLen; } + + if (data->len == data->cap && data->total == data->cap) { + return true; + } + return false; + // if (data->len >= headLen) { + // memcpy((char*)&head, data->buf, headLen); + // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); + // if (msgLen > data->len) { + // data->left = msgLen - data->len; + // return false; + // } else if (msgLen == data->len) { + // data->left = 0; + // return true; + // } + //} else { + // return false; + //} } static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; @@ -338,7 +350,6 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - uv_read_stop((uv_stream_t*)conn->stream); tTrace("client conn %p read complete", conn); clientHandleResp(conn); } else { @@ -346,6 +357,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf } return; } + if (nread == UV_EOF) { + tError("client conn %p read error: %s", conn, uv_err_name(nread)); + clientHandleExcept(conn); + } assert(nread <= 0); if (nread == 0) { // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb @@ -353,7 +368,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf // read(2). return; } - if (nread < 0 || nread == UV_EOF) { + if (nread < 0) { tError("client conn %p read error: %s", conn, uv_err_name(nread)); clientHandleExcept(conn); } @@ -467,6 +482,7 @@ static void clientConnCb(uv_connect_t* req, int status) { static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { tDebug("client work thread %p start to quit", pThrd); destroyCmsg(pMsg); + destroyConnPool(pThrd->pool); // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL); uv_timer_stop(pThrd->timer); pThrd->quit = true; @@ -483,7 +499,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; if (pMsg->msg.handle == NULL) { - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + if (pCtx->pTransInst->noPool == true) { + } else { + conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + } if (conn != NULL) { tTrace("client conn %p get from conn pool", conn); } @@ -512,7 +531,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - + uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1); + int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1); + if (ret) { + tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret)); + } // write req handle conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq->data = conn; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7aa5aa16f1..388e0da4e0 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -205,6 +205,7 @@ int transInitBuffer(SConnBuffer* buf) { } int transClearBuffer(SConnBuffer* buf) { memset(buf, 0, sizeof(*buf)); + buf->total = -1; return 0; } int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { @@ -214,27 +215,25 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user * info--->| */ - static const int CAPACITY = 1024; + static const int CAPACITY = sizeof(STransMsgHead); SConnBuffer* p = connBuf; if (p->cap == 0) { p->buf = (char*)calloc(CAPACITY, sizeof(char)); p->len = 0; p->cap = CAPACITY; - p->left = -1; + p->total = 0; uvBuf->base = p->buf; uvBuf->len = CAPACITY; } else { - if (p->len >= p->cap) { - if (p->left == -1) { - p->cap *= 2; - p->buf = realloc(p->buf, p->cap); - } else if (p->len + p->left > p->cap) { - p->cap = p->len + p->left; - p->buf = realloc(p->buf, p->len + p->left); - } - } + STransMsgHead head; + memcpy((char*)&head, p->buf, sizeof(head)); + int32_t msgLen = (int32_t)htonl(head.msgLen); + + p->total = msgLen; + p->cap = msgLen; + p->buf = realloc(p->buf, p->cap); uvBuf->base = p->buf + p->len; uvBuf->len = p->cap - p->len; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7ddeb99c9d..a038f72adc 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -61,6 +61,7 @@ typedef struct SWorkThrdObj { SAsyncPool* asyncPool; // uv_async_t* workerAsync; // queue msg; + queue conn; pthread_mutex_t msgMtx; void* pTransInst; } SWorkThrdObj; @@ -95,6 +96,7 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); +static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); @@ -103,7 +105,7 @@ static void uvStartSendResp(SSrvMsg* msg); static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static bool readComplete(SConnBuffer* buf); -static SSrvConn* createConn(); +static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); static void uvDestroyConn(uv_handle_t* handle); @@ -117,11 +119,6 @@ static bool addHandleToWorkloop(void* arg); static bool addHandleToAcceptloop(void* arg); void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - /* - * formate of data buffer: - * |<--------------------------data from socket------------------------------->| - * |<------STransMsgHead------->|<-------------------other data--------------->| - */ SSrvConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); @@ -131,23 +128,27 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b // static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later - STransMsgHead head; - int32_t headLen = sizeof(head); - if (data->len >= headLen) { - memcpy((char*)&head, data->buf, headLen); - int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); - if (msgLen > data->len) { - data->left = msgLen - data->len; - return false; - } else if (msgLen == data->len) { - return true; - } else if (msgLen < data->len) { - return false; - // handle other packet later - } - } else { - return false; + if (data->len == data->cap && data->total == data->cap) { + return true; } + return false; + // STransMsgHead head; + // int32_t headLen = sizeof(head); + // if (data->len >= headLen) { + // memcpy((char*)&head, data->buf, headLen); + // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); + // if (msgLen > data->len) { + // data->left = msgLen - data->len; + // return false; + // } else if (msgLen == data->len) { + // return true; + // } else if (msgLen < data->len) { + // return false; + // // handle other packet later + // } + //} else { + // return false; + //} } // static void uvDoProcess(SRecvInfo* pRecv) { @@ -241,7 +242,7 @@ static void uvHandleReq(SSrvConn* pConn) { } pConn->inType = pHead->msgType; - assert(transIsReq(pHead->msgType)); + // assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; pHead->code = htonl(pHead->code); @@ -266,9 +267,9 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->ref++; - tDebug("server conn %p %s received from %s:%d, local info: %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port)); + ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth @@ -290,6 +291,14 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } + if (nread == UV_EOF) { + tError("server conn %p read error: %s", conn, uv_err_name(nread)); + if (conn->ref > 1) { + conn->ref++; // ref > 1 signed that write is in progress + } + destroyConn(conn, true); + return; + } if (nread == 0) { return; } @@ -302,8 +311,8 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = malloc(sizeof(char)); buf->len = 2; + buf->base = calloc(1, sizeof(char) * buf->len); } void uvOnTimeoutCb(uv_timer_t* handle) { @@ -386,6 +395,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { static void uvStartSendResp(SSrvMsg* smsg) { // impl SSrvConn* pConn = smsg->pConn; + pConn->ref--; // if (taosArrayGetSize(pConn->srvMsgs) > 0) { tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); @@ -403,6 +413,16 @@ static void destroySmsg(SSrvMsg* smsg) { transFreeMsg(smsg->msg.pCont); free(smsg); } +static void destroyAllConn(SWorkThrdObj* pThrd) { + while (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* h = QUEUE_HEAD(&pThrd->conn); + QUEUE_REMOVE(h); + QUEUE_INIT(h); + + SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); + destroyConn(c, true); + } +} void uvWorkerAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SWorkThrdObj* pThrd = item->pThrd; @@ -424,8 +444,11 @@ void uvWorkerAsyncCb(uv_async_t* handle) { continue; } if (msg->pConn == NULL) { - // free(msg); + + destroyAllConn(pThrd); + + uv_loop_close(pThrd->loop); uv_stop(pThrd->loop); } else { uvStartSendResp(msg); @@ -439,9 +462,16 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; + uv_close((uv_handle_t*)&srv->server, NULL); uv_stop(srv->loop); } +static void uvShutDownCb(uv_shutdown_t* req, int status) { + tDebug("conn failed to shut down: %s", uv_err_name(status)); + uv_close((uv_handle_t*)req->handle, uvDestroyConn); + free(req); +} + void uvOnAcceptCb(uv_stream_t* stream, int status) { if (status == -1) { return; @@ -491,7 +521,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SSrvConn* pConn = createConn(); + SSrvConn* pConn = createConn(pThrd); pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ @@ -507,6 +537,9 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; + // uv_tcp_nodelay(pConn->pTcp, 1); + // uv_tcp_keepalive(pConn->pTcp, 1, 1); + // init write request, just pConn->pWriter = calloc(1, sizeof(uv_write_t)); pConn->pWriter->data = pConn; @@ -560,6 +593,9 @@ static bool addHandleToWorkloop(void* arg) { QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); + // conn set + QUEUE_INIT(&pThrd->conn); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; @@ -598,8 +634,13 @@ void* workerThread(void* arg) { uv_run(pThrd->loop, UV_RUN_DEFAULT); } -static SSrvConn* createConn() { +static SSrvConn* createConn(void* hThrd) { + SWorkThrdObj* pThrd = hThrd; + SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn)); + QUEUE_INIT(&pConn->queue); + + QUEUE_PUSH(&pThrd->conn, &pConn->queue); pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); ++pConn->ref; @@ -610,7 +651,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (conn == NULL) { return; } - tTrace("server conn %p try to destroy", conn); + tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref); if (--conn->ref > 0) { return; } @@ -621,20 +662,23 @@ static void destroyConn(SSrvConn* conn, bool clear) { destroySmsg(msg); } taosArrayDestroy(conn->srvMsgs); - - // destroySmsg(conn->pSrvMsg); - // conn->pSrvMsg = NULL; + QUEUE_REMOVE(&conn->queue); if (clear) { - uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); + tTrace("try to destroy conn %p", conn); + uv_tcp_close_reset(conn->pTcp, uvDestroyConn); + // uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); + // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); + // uv_unref((uv_handle_t*)conn->pTcp); + // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } } static void uvDestroyConn(uv_handle_t* handle) { SSrvConn* conn = handle->data; tDebug("server conn %p destroy", conn); uv_timer_stop(conn->pTimer); - free(conn->pTimer); - // free(conn->pTcp); + // free(conn->pTimer); + free(conn->pTcp); free(conn->pWriter); free(conn); } diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 6f80ea42ac..d1fefe2c72 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -16,69 +16,168 @@ #include #include #include "tep.h" +#include "tglobal.h" #include "trpc.h" +#include "ulog.h" using namespace std; -class TransObj { - public: - TransObj() { - const char *label = "APP"; - const char *secret = "secret"; - const char *user = "user"; - const char *ckey = "ckey"; +const char *label = "APP"; +const char *secret = "secret"; +const char *user = "user"; +const char *ckey = "ckey"; +class Server; +int port = 7000; +// server process +static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +// client process; +static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +class Client { + public: + void Init(int nThread) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = (char *)label; - rpcInit.numOfThreads = 5; - rpcInit.cfp = NULL; - rpcInit.sessions = 100; - rpcInit.idleTime = 100; + rpcInit.numOfThreads = nThread; + rpcInit.cfp = processResp; rpcInit.user = (char *)user; rpcInit.secret = (char *)secret; rpcInit.ckey = (char *)ckey; rpcInit.spi = 1; - } - bool startCli() { - trans = NULL; + rpcInit.parent = this; rpcInit.connType = TAOS_CONN_CLIENT; - trans = rpcOpen(&rpcInit); - return trans != NULL ? true : false; + this->transCli = rpcOpen(&rpcInit); + tsem_init(&this->sem, 0, 0); } - bool startSrv() { - trans = NULL; - rpcInit.connType = TAOS_CONN_SERVER; - trans = rpcOpen(&rpcInit); - return trans != NULL ? true : false; + void SetResp(SRpcMsg *pMsg) { + // set up resp; + this->resp = *pMsg; + } + SRpcMsg *Resp() { return &this->resp; } + + void Restart() { + rpcClose(this->transCli); + this->transCli = rpcOpen(&rpcInit); } - bool sendAndRecv() { + void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) { SEpSet epSet = {0}; epSet.inUse = 0; - addEpIntoEpSet(&epSet, "192.168.1.1", 7000); - addEpIntoEpSet(&epSet, "192.168.0.1", 7000); + addEpIntoEpSet(&epSet, "127.0.0.1", 7000); - if (trans == NULL) { - return false; - } - SRpcMsg rpcMsg = {0}, reqMsg = {0}; - reqMsg.pCont = rpcMallocCont(10); - reqMsg.contLen = 10; - reqMsg.ahandle = NULL; - rpcSendRecv(trans, &epSet, &reqMsg, &rpcMsg); - int code = rpcMsg.code; - std::cout << tstrerror(code) << std::endl; - return true; + rpcSendRequest(this->transCli, &epSet, req, NULL); + SemWait(); + *resp = this->resp; } - bool stop() { - rpcClose(trans); - trans = NULL; - return true; + void SemWait() { tsem_wait(&this->sem); } + void SemPost() { tsem_post(&this->sem); } + void Reset() {} + + ~Client() { + if (this->transCli) rpcClose(this->transCli); } private: - void * trans; + tsem_t sem; SRpcInit rpcInit; + void * transCli; + SRpcMsg resp; +}; +class Server { + public: + Server() { + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = port; + rpcInit.label = (char *)label; + rpcInit.numOfThreads = 5; + rpcInit.cfp = processReq; + rpcInit.user = (char *)user; + rpcInit.secret = (char *)secret; + rpcInit.ckey = (char *)ckey; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_SERVER; + } + void Start() { + this->transSrv = rpcOpen(&this->rpcInit); + taosMsleep(1000); + } + void Stop() { + if (this->transSrv == NULL) return; + rpcClose(this->transSrv); + this->transSrv = NULL; + } + void Restart() { + this->Stop(); + this->Start(); + } + ~Server() { + if (this->transSrv) rpcClose(this->transSrv); + this->transSrv = NULL; + } + + private: + SRpcInit rpcInit; + void * transSrv; +}; +static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); +} +// client process; +static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + Client *client = (Client *)parent; + client->SetResp(pMsg); + client->SemPost(); +} +class TransObj { + public: + TransObj() { + dDebugFlag = 143; + vDebugFlag = 0; + mDebugFlag = 143; + cDebugFlag = 0; + jniDebugFlag = 0; + tmrDebugFlag = 143; + uDebugFlag = 143; + rpcDebugFlag = 143; + qDebugFlag = 0; + wDebugFlag = 0; + sDebugFlag = 0; + tsdbDebugFlag = 0; + cqDebugFlag = 0; + tscEmbeddedInUtil = 1; + tsAsyncLog = 0; + + std::string path = "/tmp/transport"; + taosRemoveDir(path.c_str()); + taosMkDir(path.c_str()); + + char temp[PATH_MAX]; + snprintf(temp, PATH_MAX, "%s/taosdlog", path.c_str()); + if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) { + printf("failed to init log file\n"); + } + cli = new Client; + cli->Init(1); + srv = new Server; + srv->Start(); + } + void RestartCli() { cli->Restart(); } + void StopSrv() { srv->Stop(); } + void RestartSrv() { srv->Restart(); } + void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } + ~TransObj() { + delete cli; + delete srv; + } + + private: + Client *cli; + Server *srv; }; class TransEnv : public ::testing::Test { protected: @@ -93,11 +192,34 @@ class TransEnv : public ::testing::Test { TransObj *tr = NULL; }; -TEST_F(TransEnv, test_start_stop) { - assert(tr->startCli()); - assert(tr->sendAndRecv()); - assert(tr->stop()); - assert(tr->startSrv()); - assert(tr->stop()); +// TEST_F(TransEnv, 01sendAndRec) { +// for (int i = 0; i < 1; i++) { +// SRpcMsg req = {0}, resp = {0}; +// req.msgType = 0; +// req.pCont = rpcMallocCont(10); +// req.contLen = 10; +// tr->cliSendAndRecv(&req, &resp); +// assert(resp.code == 0); +// } +//} + +TEST_F(TransEnv, 02StopServer) { + for (int i = 0; i < 1; i++) { + SRpcMsg req = {0}, resp = {0}; + req.msgType = 0; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + assert(resp.code == 0); + } + SRpcMsg req = {0}, resp = {0}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->StopSrv(); + // tr->RestartSrv(); + tr->cliSendAndRecv(&req, &resp); + + assert(resp.code != 0); } diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index 7e542ef80f..bb526e0ba0 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -22,13 +22,12 @@ * windows implementation */ - -#include #include -#include +#include #include +#include -#pragma warning( disable : 4244 ) +#pragma warning(disable : 4244) typedef void (*win_timer_f)(int signo); @@ -40,8 +39,8 @@ void WINAPI taosWinOnTimer(UINT wTimerID, UINT msg, DWORD_PTR dwUser, DWORD_PTR } static MMRESULT timerId; -int taosInitTimer(win_timer_f callback, int ms) { - DWORD_PTR param = *((int64_t *) & callback); +int taosInitTimer(win_timer_f callback, int ms) { + DWORD_PTR param = *((int64_t *)&callback); timerId = timeSetEvent(ms, 1, (LPTIMECALLBACK)taosWinOnTimer, param, TIME_PERIODIC); if (timerId == 0) { @@ -50,9 +49,7 @@ int taosInitTimer(win_timer_f callback, int ms) { return 0; } -void taosUninitTimer() { - timeKillEvent(timerId); -} +void taosUninitTimer() { timeKillEvent(timerId); } #elif defined(_TD_DARWIN_64) @@ -60,32 +57,32 @@ void taosUninitTimer() { * darwin implementation */ -#include #include +#include #include static void (*timer_callback)(int); -static int timer_ms = 0; -static pthread_t timer_thread; -static int timer_kq = -1; -static volatile int timer_stop = 0; +static int timer_ms = 0; +static pthread_t timer_thread; +static int timer_kq = -1; +static volatile int timer_stop = 0; -static void* timer_routine(void *arg) { +static void* timer_routine(void* arg) { (void)arg; setThreadName("timer"); - int r = 0; + int r = 0; struct timespec to = {0}; - to.tv_sec = timer_ms / 1000; - to.tv_nsec = (timer_ms % 1000) * 1000000; + to.tv_sec = timer_ms / 1000; + to.tv_nsec = (timer_ms % 1000) * 1000000; while (!timer_stop) { struct kevent64_s kev[10] = {0}; - r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev)/sizeof(kev[0]), 0, &to); - if (r!=0) { + r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), 0, &to); + if (r != 0) { fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__); abort(); } - timer_callback(SIGALRM); // just mock + timer_callback(SIGALRM); // just mock } return NULL; @@ -93,11 +90,13 @@ static void* timer_routine(void *arg) { int taosInitTimer(void (*callback)(int), int ms) { int r = 0; - timer_ms = ms; + timer_kq = -1; + timer_stop = 0; + timer_ms = ms; timer_callback = callback; timer_kq = kqueue(); - if (timer_kq==-1) { + if (timer_kq == -1) { fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__); // since no caller of this func checks the return value for the moment abort(); @@ -144,10 +143,10 @@ static void taosDeleteTimer(void *tharg) { timer_delete(*pTimer); } -static pthread_t timerThread; -static timer_t timerId; +static pthread_t timerThread; +static timer_t timerId; static volatile bool stopTimer = false; -static void *taosProcessAlarmSignal(void *tharg) { +static void * taosProcessAlarmSignal(void *tharg) { // Block the signal sigset_t sigset; sigemptyset(&sigset); @@ -159,18 +158,18 @@ static void *taosProcessAlarmSignal(void *tharg) { setThreadName("tmr"); - #ifdef _ALPINE - sevent.sigev_notify = SIGEV_THREAD; - sevent.sigev_value.sival_int = syscall(__NR_gettid); - #else - sevent.sigev_notify = SIGEV_THREAD_ID; - sevent._sigev_un._tid = syscall(__NR_gettid); - #endif - +#ifdef _ALPINE + sevent.sigev_notify = SIGEV_THREAD; + sevent.sigev_value.sival_int = syscall(__NR_gettid); +#else + sevent.sigev_notify = SIGEV_THREAD_ID; + sevent._sigev_un._tid = syscall(__NR_gettid); +#endif + sevent.sigev_signo = SIGALRM; if (timer_create(CLOCK_REALTIME, &sevent, &timerId) == -1) { - //printf("Failed to create timer"); + // printf("Failed to create timer"); } pthread_cleanup_push(taosDeleteTimer, &timerId); @@ -182,36 +181,37 @@ static void *taosProcessAlarmSignal(void *tharg) { ts.it_interval.tv_nsec = 1000000 * MSECONDS_PER_TICK; if (timer_settime(timerId, 0, &ts, NULL)) { - //printf("Failed to init timer"); + // printf("Failed to init timer"); return NULL; } int signo; while (!stopTimer) { if (sigwait(&sigset, &signo)) { - //printf("Failed to wait signal: number %d", signo); + // printf("Failed to wait signal: number %d", signo); continue; } /* //printf("Signal handling: number %d ......\n", signo); */ callback(0); } - + pthread_cleanup_pop(1); return NULL; } int taosInitTimer(void (*callback)(int), int ms) { + stopTimer = false; pthread_attr_t tattr; pthread_attr_init(&tattr); int code = pthread_create(&timerThread, &tattr, taosProcessAlarmSignal, callback); pthread_attr_destroy(&tattr); if (code != 0) { - //printf("failed to create timer thread"); + // printf("failed to create timer thread"); return -1; } else { - //printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread)); + // printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread)); } return 0; @@ -220,7 +220,7 @@ int taosInitTimer(void (*callback)(int), int ms) { void taosUninitTimer() { stopTimer = true; - //printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread)); + // printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread)); pthread_join(timerThread, NULL); } diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 1fdc2257d7..65101a5e07 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -13,19 +13,49 @@ * along with this program. If not, see . */ +#include "ttimer.h" #include "os.h" +#include "taoserror.h" #include "tlog.h" #include "tsched.h" -#include "ttimer.h" #include "tutil.h" -#include "taoserror.h" -#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrWarn(...) { if (tmrDebugFlag & DEBUG_WARN) { taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrInfo(...) { if (tmrDebugFlag & DEBUG_INFO) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} -#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} +#define tmrFatal(...) \ + { \ + if (tmrDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrError(...) \ + { \ + if (tmrDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrWarn(...) \ + { \ + if (tmrDebugFlag & DEBUG_WARN) { \ + taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrInfo(...) \ + { \ + if (tmrDebugFlag & DEBUG_INFO) { \ + taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrDebug(...) \ + { \ + if (tmrDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } +#define tmrTrace(...) \ + { \ + if (tmrDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \ + } \ + } #define TIMER_STATE_WAITING 0 #define TIMER_STATE_EXPIRED 1 @@ -81,7 +111,7 @@ typedef struct time_wheel_t { tmr_obj_t** slots; } time_wheel_t; -int32_t tmrDebugFlag = 131; +int32_t tmrDebugFlag = 131; uint32_t tsMaxTmrCtrl = 512; static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT; @@ -91,7 +121,7 @@ static tmr_ctrl_t* unusedTmrCtrl = NULL; static void* tmrQhandle; static int numOfTmrCtrl = 0; -int taosTmrThreads = 1; +int taosTmrThreads = 1; static uintptr_t nextTimerId = 0; static time_wheel_t wheels[] = { @@ -119,7 +149,7 @@ static void timerDecRef(tmr_obj_t* timer) { static void lockTimerList(timer_list_t* list) { int64_t tid = taosGetSelfPthreadId(); - int i = 0; + int i = 0; while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) { if (++i % 1000 == 0) { sched_yield(); @@ -276,11 +306,11 @@ static void addToExpired(tmr_obj_t* head) { const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue."; while (head != NULL) { - uintptr_t id = head->id; + uintptr_t id = head->id; tmr_obj_t* next = head->next; tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param); - SSchedMsg schedMsg; + SSchedMsg schedMsg; schedMsg.fp = NULL; schedMsg.tfp = processExpiredTimer; schedMsg.msg = NULL; @@ -491,6 +521,8 @@ static void taosTmrModuleInit(void) { return; } + memset(&timerMap, 0, sizeof(timerMap)); + for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) { tmr_ctrl_t* ctrl = tmrCtrls + i; ctrl->next = ctrl + 1; @@ -570,7 +602,8 @@ void taosTmrCleanUp(void* handle) { unusedTmrCtrl = ctrl; pthread_mutex_unlock(&tmrCtrlMutex); - if (numOfTmrCtrl <=0) { + tmrDebug("time controller's tmr ctrl size: %d", numOfTmrCtrl); + if (numOfTmrCtrl <= 0) { taosUninitTimer(); taosCleanUpScheduler(tmrQhandle); @@ -585,7 +618,7 @@ void taosTmrCleanUp(void* handle) { for (size_t i = 0; i < timerMap.size; i++) { timer_list_t* list = timerMap.slots + i; - tmr_obj_t* t = list->timers; + tmr_obj_t* t = list->timers; while (t != NULL) { tmr_obj_t* next = t->mnext; free(t); @@ -595,6 +628,8 @@ void taosTmrCleanUp(void* handle) { free(timerMap.slots); free(tmrCtrls); - tmrDebug("timer module is cleaned up"); + tmrCtrls = NULL; + unusedTmrCtrl = NULL; + tmrModuleInit = PTHREAD_ONCE_INIT; // to support restart } }