From 15cfd98449914304d8491eb40f9f6d5eea04a014 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Tue, 15 Mar 2022 12:44:38 +0800 Subject: [PATCH 1/9] extend 3.0 ci to 4 machines --- Jenkinsfile2 | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index ad62fa7044..bea254d046 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -35,6 +35,7 @@ def abort_previous(){ def pre_test(){ sh'hostname' sh ''' + date sudo rmtaos || echo "taosd has not installed" ''' sh ''' @@ -98,7 +99,7 @@ pipeline { } stages { stage('pre_build'){ - agent{label 'slave3_0'} + agent{label " slave3_0 || slave15 || slave16 || slave17 "} options { skipDefaultCheckout() } when { changeRequest() From 607a7ac025b432a3c2722f7cb075555ef4e4b31b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 15 Mar 2022 19:54:06 +0800 Subject: [PATCH 2/9] handle except and update UT --- include/libs/transport/trpc.h | 12 +- source/libs/transport/inc/transComm.h | 7 +- source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 32 +++-- source/libs/transport/src/transSrv.c | 4 +- source/libs/transport/test/transUT.cc | 157 +++++++++++++++++++---- 7 files changed, 167 insertions(+), 47 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 8dfd736df6..231bc4af45 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -29,7 +29,6 @@ extern "C" { extern int tsRpcHeadSize; - typedef struct SRpcConnInfo { uint32_t clientIp; uint16_t clientPort; @@ -46,7 +45,6 @@ typedef struct SRpcMsg { void * ahandle; // app handle set by client } SRpcMsg; - typedef struct SRpcInit { uint16_t localPort; // local port char * label; // for debug purpose @@ -71,9 +69,11 @@ typedef struct SRpcInit { // call back to keep conn or not bool (*pfp)(void *parent, tmsg_t msgType); - // to support Send messages multiple times on a link - // - void* (*mfp)(void *parent, tmsg_t msgType); + // to support Send messages multiple times on a link + void *(*mfp)(void *parent, tmsg_t msgType); + + // call back to handle except when query/fetch in progress + void (*efp)(void *parent, tmsg_t msgType); void *parent; } SRpcInit; @@ -94,7 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); // just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle); +void rpcReleaseHandle(void *handle); void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 99f890d3a0..76347cdba2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -125,9 +125,8 @@ typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; typedef struct { - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - // struct SRpcConn* pConn; // pConn allocated + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app tmsg_t msgType; // message type uint8_t* pCont; // content provided by app int32_t contLen; // content length @@ -135,7 +134,7 @@ typedef struct { // int16_t numOfTry; // number of try for different servers // int8_t oldInUse; // server EP inUse passed by app // int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type + int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef STransMsg* pRsp; // for synchronous API diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 3924a5cf1a..e760fe1de9 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -65,6 +65,7 @@ typedef struct { int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); bool (*pfp)(void* parent, tmsg_t msgType); void* (*mfp)(void* parent, tmsg_t msgType); + void (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 58809ee3be..213782c5bd 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -36,6 +36,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->afp = pInit->afp; pRpc->pfp = pInit->pfp; pRpc->mfp = pInit->mfp; + pRpc->efp = pInit->efp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2f6ff3763f..5c50ca2bc3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -34,6 +34,10 @@ typedef struct SCliConn { // spi configure char spi; char secured; + + char* ip; + uint32_t port; + // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; @@ -79,7 +83,7 @@ typedef struct SConnList { static void* createConnPool(int size); static void* destroyConnPool(void* pool); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); -static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); +static void addConnToPool(void* pool, SCliConn* conn); // register timer in each thread to clear expire conn static void cliTimeoutCb(uv_timer_t* handle); @@ -188,6 +192,12 @@ void cliHandleResp(SCliConn* conn) { conn->secured = pHead->secured; + if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { + tTrace("except, server continue send while cli ignore it"); + // transUnrefCliHandle(conn); + return; + } + if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); @@ -197,14 +207,13 @@ void cliHandleResp(SCliConn* conn) { tsem_post(pCtx->pSem); } - uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); - if (CONN_NO_PERSIST_BY_APP(conn)) { - addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + addConnToPool(pThrd->pool, conn); } destroyCmsg(conn->data); conn->data = NULL; + uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); @@ -317,11 +326,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { QUEUE_INIT(&conn->conn); return conn; } -static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { +static void addConnToPool(void* pool, SCliConn* conn) { char key[128] = {0}; - tstrncpy(key, ip, strlen(ip)); - tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + tstrncpy(key, conn->ip, strlen(conn->ip)); + tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; @@ -395,7 +404,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } static void cliDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - + free(conn->ip); free(conn->stream); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); @@ -524,11 +533,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { conn->data = pMsg; + conn->hThrdIdx = pCtx->hThrdIdx; + transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); conn->data = pMsg; + conn->hThrdIdx = pCtx->hThrdIdx; + conn->ip = strdup(pMsg->ctx->ip); + conn->port = pMsg->ctx->port; int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { @@ -540,8 +554,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); } - - conn->hThrdIdx = pCtx->hThrdIdx; } static void cliAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index cb3bbaefec..a3cc6b6181 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -289,11 +289,13 @@ void uvOnSendCb(uv_write_t* req, int status) { if (conn->srvMsgs != NULL) { assert(taosArrayGetSize(conn->srvMsgs) >= 1); SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); + tTrace("server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); taosArrayRemove(conn->srvMsgs, 0); destroySmsg(msg); // send second data, just use for push if (taosArrayGetSize(conn->srvMsgs) > 0) { + tTrace("resent server conn %p sending msg size: %d", conn, (int)taosArrayGetSize(conn->srvMsgs)); msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); uvStartSendRespInternal(msg); } @@ -733,7 +735,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); - tDebug("send quit msg to work thread"); + tDebug("server send quit msg to work thread"); transSendAsync(pThrd->asyncPool, &srvMsg->q); } diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index fa20327003..b0fae2f8b4 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -29,7 +29,29 @@ const char *ckey = "ckey"; class Server; int port = 7000; // server process + +static bool cliPersistHandle(void *parent, tmsg_t msgType) { + // client persist handle + return msgType == 2 || msgType == 4; +} + +typedef struct CbArgs { + tmsg_t msgType; +} CbArgs; + +static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) { + if (msgType == 1 || msgType == 2) { + CbArgs *args = (CbArgs *)calloc(1, sizeof(CbArgs)); + args->msgType = msgType; + return args; + } + return NULL; +} +// server except +static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {} typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); + +static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); // client process; static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -61,17 +83,17 @@ class Client { rpcInit_.cfp = cb; this->transCli = rpcOpen(&rpcInit_); } - void setPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { + void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.pfp = pfp; this->transCli = rpcOpen(&rpcInit_); } - void setConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { + void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.mfp = mfp; this->transCli = rpcOpen(&rpcInit_); } - void setPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { + void SetPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.pfp = pfp; @@ -88,6 +110,7 @@ class Client { SemWait(); *resp = this->resp; } + void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {} void SemWait() { tsem_wait(&this->sem); } void SemPost() { tsem_post(&this->sem); } void Reset() {} @@ -105,19 +128,20 @@ class Client { class Server { public: Server() { - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = port; - rpcInit.label = (char *)label; - rpcInit.numOfThreads = 5; - rpcInit.cfp = processReq; - rpcInit.user = (char *)user; - rpcInit.secret = (char *)secret; - rpcInit.ckey = (char *)ckey; - rpcInit.spi = 1; - rpcInit.connType = TAOS_CONN_SERVER; + memset(&rpcInit_, 0, sizeof(rpcInit_)); + rpcInit_.localPort = port; + rpcInit_.label = (char *)label; + rpcInit_.numOfThreads = 5; + rpcInit_.cfp = processReq; + rpcInit_.efp = NULL; + rpcInit_.user = (char *)user; + rpcInit_.secret = (char *)secret; + rpcInit_.ckey = (char *)ckey; + rpcInit_.spi = 1; + rpcInit_.connType = TAOS_CONN_SERVER; } void Start() { - this->transSrv = rpcOpen(&this->rpcInit); + this->transSrv = rpcOpen(&this->rpcInit_); taosMsleep(1000); } void Stop() { @@ -125,6 +149,16 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } + void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + this->Stop(); + rpcInit_.efp = efp; + this->Start(); + } + void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { + this->Stop(); + rpcInit_.cfp = cfp; + this->Start(); + } void Restart() { this->Stop(); this->Start(); @@ -135,7 +169,7 @@ class Server { } private: - SRpcInit rpcInit; + SRpcInit rpcInit_; void * transSrv; }; static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -146,6 +180,20 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { rpcMsg.code = 0; rpcSendResponse(&rpcMsg); } + +static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + for (int i = 0; i < 9; i++) { + rpcRefHandle(pMsg->handle, TAOS_CONN_SERVER); + } + for (int i = 0; i < 10; i++) { + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = rpcMallocCont(100); + rpcMsg.contLen = 100; + rpcMsg.handle = pMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); + } +} // client process; static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { Client *client = (Client *)parent; @@ -170,7 +218,7 @@ static void initEnv() { tsAsyncLog = 0; std::string path = "/tmp/transport"; - taosRemoveDir(path.c_str()); + // taosRemoveDir(path.c_str()); taosMkDir(path.c_str()); tstrncpy(tsLogDir, path.c_str(), PATH_MAX); @@ -178,6 +226,7 @@ static void initEnv() { printf("failed to init log file\n"); } } + class TransObj { public: TransObj() { @@ -188,22 +237,38 @@ class TransObj { srv->Start(); } - void RestartCli(CB cb) { cli->Restart(cb); } - void StopSrv() { srv->Stop(); } + void RestartCli(CB cb) { + // + cli->Restart(cb); + } + void StopSrv() { + // + srv->Stop(); + } void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) { // do nothing - cli->setPersistFP(pfp); + cli->SetPersistFP(pfp); } void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing - cli->setConstructFP(mfp); + cli->SetConstructFP(mfp); } - void SetMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { + void SetCliMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { // do nothing - cli->setPAndMFp(pfp, mfp); + cli->SetPAndMFp(pfp, mfp); + } + // call when link broken, and notify query or fetch stop + void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + //////// + srv->SetExceptFp(efp); + } + void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { + /////// + srv->SetSrvContinueSend(cfp); } void RestartSrv() { srv->Restart(); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } + ~TransObj() { delete cli; delete srv; @@ -256,13 +321,50 @@ TEST_F(TransEnv, 02StopServer) { tr->cliSendAndRecv(&req, &resp); assert(resp.code != 0); } -TEST_F(TransEnv, clientUserDefined) {} +TEST_F(TransEnv, clientUserDefined) { + tr->RestartSrv(); + for (int i = 0; i < 10; i++) { + SRpcMsg req = {0}, resp = {0}; + req.msgType = 0; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + assert(resp.code == 0); + } + + ////////////////// +} TEST_F(TransEnv, cliPersistHandle) { - // impl late + tr->SetCliPersistFp(cliPersistHandle); + SRpcMsg resp = {0}; + for (int i = 0; i < 10; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + if (i == 5) { + std::cout << "stop server" << std::endl; + tr->StopSrv(); + } + if (i >= 6) { + EXPECT_TRUE(resp.code != 0); + } + } + + ////////////////// } -TEST_F(TransEnv, srvPersistHandle) { - // impl later +TEST_F(TransEnv, srvContinueSend) { + tr->SetSrvContinueSend(processContinueSend); + for (int i = 0; i < 10; i++) { + SRpcMsg req = {0}, resp = {0}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + } + taosMsleep(2000); } TEST_F(TransEnv, srvPersisHandleExcept) { @@ -282,3 +384,6 @@ TEST_F(TransEnv, multiSrvPersisHandleExcept) { TEST_F(TransEnv, queryExcept) { // query and conn is broken } +TEST_F(TransEnv, noResp) { + // no resp +} From 23bef711fca8a687246a42296c91467c1a14df58 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 15 Mar 2022 21:35:04 +0800 Subject: [PATCH 3/9] Feature/sangshuduo/td 13063 3.0 windows (#10720) * [TD-13063]: 3.0 on Windows * add pthread in contrib * fix linux compile * fix osSemaphore * add gnu regex for Windows * fix compile error for Windows * support arm platform * port more OS files * fix for Windows compile * port more files * fix macOS on x86_64 * port osFile * port osSemaphone.h * port osSocket.c * port tconfig.c * port ttimer.c * add couple files --- Jenkinsfile2 | 3 +- cmake/cmake.define | 11 ++- cmake/cmake.platform | 57 +++++++++---- include/os/os.h | 10 +++ include/os/osEok.h | 93 ++++++++++++++++++++++ include/os/osFile.h | 19 ++++- include/os/osSemaphore.h | 12 +-- include/os/osSocket.h | 35 +++++++- include/os/osSysinfo.h | 3 +- source/common/src/tdatablock.c | 9 ++- source/common/src/ttszip.c | 4 + source/libs/catalog/src/catalog.c | 8 +- source/libs/index/src/index.c | 1 + source/libs/index/src/index_cache.c | 3 + source/libs/index/src/index_fst.c | 6 ++ source/libs/parser/src/parInsert.c | 2 + source/libs/planner/src/planLogicCreater.c | 1 + source/libs/scalar/src/filter.c | 4 +- source/libs/scalar/src/scalar.c | 3 + source/libs/scheduler/src/schFlowCtrl.c | 4 +- source/libs/sync/src/syncRaftEntry.c | 16 ++-- source/libs/sync/src/syncVoteMgr.c | 22 ++--- source/libs/tfs/src/tfs.c | 4 +- source/libs/transport/src/transCli.c | 3 + source/libs/transport/src/transComm.c | 6 +- source/libs/transport/src/transSrv.c | 4 + source/libs/wal/src/walRead.c | 2 +- source/os/src/osFile.c | 17 ++-- source/os/src/osSocket.c | 12 ++- source/util/src/tconfig.c | 3 +- source/util/src/ttimer.c | 2 +- 31 files changed, 297 insertions(+), 82 deletions(-) create mode 100644 include/os/osEok.h diff --git a/Jenkinsfile2 b/Jenkinsfile2 index ad62fa7044..bf55e016b6 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -60,8 +60,9 @@ def pre_test(){ sh ''' cd ${WKC} git checkout 3.0 + [ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../.. ''' - } + } else{ sh ''' cd ${WKC} diff --git a/cmake/cmake.define b/cmake/cmake.define index b97c10a4b7..9c2d5dc04c 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -36,7 +36,14 @@ IF (TD_WINDOWS) ENDIF () ELSE () - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -fPIC -gdwarf-2 -msse4.2 -mfma -g3") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -fPIC -gdwarf-2 -msse4.2 -mfma -g3") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -fPIC -gdwarf-2 -g3") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -fPIC -gdwarf-2 -g3") + +MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}") +IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64") + ADD_DEFINITIONS("-D_TD_ARM_") +ELSE () + ADD_DEFINITIONS("-msse4.2 -mfma") +ENDIF () ENDIF () diff --git a/cmake/cmake.platform b/cmake/cmake.platform index 7ef259ba54..0312f92a5b 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -7,31 +7,53 @@ SET(TD_LINUX FALSE) SET(TD_WINDOWS FALSE) SET(TD_DARWIN FALSE) -IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux") +MESSAGE("Compiler ID: ${CMAKE_CXX_COMPILER_ID}") +if(CMAKE_COMPILER_IS_GNUCXX MATCHES 1) + set(CXX_COMPILER_IS_GNU TRUE) +else() + set(CXX_COMPILER_IS_GNU FALSE) +endif() - SET(TD_LINUX TRUE) - SET(OSTYPE "Linux") - ADD_DEFINITIONS("-DLINUX") +MESSAGE("Current system name is ${CMAKE_SYSTEM_NAME}.") - IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8) - SET(TD_LINUX_64 TRUE) +IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin") + + IF (${CXX_COMPILER_IS_GNU}) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu99") ELSE () - SET(TD_LINUX_32 TRUE) + ADD_DEFINITIONS("-Wno-tautological-constant-out-of-range-compare -Wno-pointer-sign -Wno-unknown-warning-option") + set(CMAKE_SHARED_LIBRARY_CREATE_C_FLAGS "${CMAKE_SHARED_LIBRARY_CREATE_C_FLAGS} -undefined dynamic_lookup") + set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS "${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -undefined dynamic_lookup") ENDIF () -ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") + IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux") - SET(TD_DARWIN TRUE) - SET(OSTYPE "macOS") - ADD_DEFINITIONS("-DDARWIN") - IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64") - MESSAGE("Current system arch is arm64") - SET(TD_DARWIN_64 TRUE) - ADD_DEFINITIONS("-D_TD_DARWIN_64") + SET(TD_LINUX TRUE) + SET(OSTYPE "Linux") + ADD_DEFINITIONS("-DLINUX") + + IF (${CMAKE_SIZEOF_VOID_P} MATCHES 8) + SET(TD_LINUX_64 TRUE) + ELSE () + SET(TD_LINUX_32 TRUE) + ENDIF () + + ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") + + SET(TD_DARWIN TRUE) + SET(OSTYPE "macOS") + ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare") + + MESSAGE("Current system processor is ${CMAKE_SYSTEM_PROCESSOR}.") + IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64") + MESSAGE("Current system arch is arm64") + SET(TD_DARWIN_64 TRUE) + ADD_DEFINITIONS("-D_TD_DARWIN_64") + ENDIF () + + ADD_DEFINITIONS("-DHAVE_UNISTD_H") ENDIF () - ADD_DEFINITIONS("-DHAVE_UNISTD_H") - ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Windows") SET(TD_WINDOWS TRUE) @@ -45,6 +67,7 @@ ELSEIF (${CMAKE_SYSTEM_NAME} MATCHES "Windows") SET(TD_WINDOWS_32 TRUE) ADD_DEFINITIONS("-D_TD_WINDOWS_32") ENDIF () + ENDIF() MESSAGE("C Compiler ID: ${CMAKE_C_COMPILER_ID}") diff --git a/include/os/os.h b/include/os/os.h index fd272b15a3..12f4e733c2 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -22,6 +22,8 @@ extern "C" { #include #include +#include +#include #if !defined(WINDOWS) #include @@ -34,7 +36,12 @@ extern "C" { #include #include #include + +#if defined(DARWIN) +#else #include +#include +#endif #endif @@ -54,9 +61,12 @@ extern "C" { #include #include #include +#include #include +#include #include #include +#include #include diff --git a/include/os/osEok.h b/include/os/osEok.h new file mode 100644 index 0000000000..3ca476f840 --- /dev/null +++ b/include/os/osEok.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_OS_EOK_H +#define TDENGINE_OS_EOK_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __APPLE__ + +enum EPOLL_EVENTS + { + EPOLLIN = 0x001, +#define EPOLLIN EPOLLIN + EPOLLPRI = 0x002, +#define EPOLLPRI EPOLLPRI + EPOLLOUT = 0x004, +#define EPOLLOUT EPOLLOUT + EPOLLRDNORM = 0x040, +#define EPOLLRDNORM EPOLLRDNORM + EPOLLRDBAND = 0x080, +#define EPOLLRDBAND EPOLLRDBAND + EPOLLWRNORM = 0x100, +#define EPOLLWRNORM EPOLLWRNORM + EPOLLWRBAND = 0x200, +#define EPOLLWRBAND EPOLLWRBAND + EPOLLMSG = 0x400, +#define EPOLLMSG EPOLLMSG + EPOLLERR = 0x008, +#define EPOLLERR EPOLLERR + EPOLLHUP = 0x010, +#define EPOLLHUP EPOLLHUP + EPOLLRDHUP = 0x2000, +#define EPOLLRDHUP EPOLLRDHUP + EPOLLEXCLUSIVE = 1u << 28, +#define EPOLLEXCLUSIVE EPOLLEXCLUSIVE + EPOLLWAKEUP = 1u << 29, +#define EPOLLWAKEUP EPOLLWAKEUP + EPOLLONESHOT = 1u << 30, +#define EPOLLONESHOT EPOLLONESHOT + EPOLLET = 1u << 31 +#define EPOLLET EPOLLET + }; + +/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */ +#define EPOLL_CTL_ADD 1 /* Add a file descriptor to the interface. */ +#define EPOLL_CTL_DEL 2 /* Remove a file descriptor from the interface. */ +#define EPOLL_CTL_MOD 3 /* Change file descriptor epoll_event structure. */ + + +typedef union epoll_data +{ + void *ptr; + int fd; + uint32_t u32; + uint64_t u64; +} epoll_data_t; + +struct epoll_event +{ + uint32_t events; /* Epoll events */ + epoll_data_t data; /* User data variable */ +}; + +int epoll_create(int size); +int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); +int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); +int epoll_close(int epfd); + +#endif // __APPLE__ + +#ifdef __cplusplus +} +#endif + +#endif // _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ + diff --git a/include/os/osFile.h b/include/os/osFile.h index b0e97e7b54..58a9df2504 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -49,6 +49,15 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count); #define PATH_MAX 256 #endif +typedef int32_t FileFd; + +typedef struct TdFile { + pthread_rwlock_t rwlock; + int refId; + FileFd fd; + FILE *fp; +} * TdFilePtr, TdFile; + typedef struct TdFile *TdFilePtr; #define TD_FILE_CTEATE 0x0001 @@ -101,8 +110,16 @@ int64_t taosCopyFile(const char *from, const char *to); int32_t taosRemoveFile(const char *path); void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); - + +#if defined(_TD_DARWIN_64) +typedef int32_t SocketFd; + +int64_t taosSendFile(SocketFd fdDst, FileFd pFileSrc, int64_t *offset, int64_t size); +int64_t taosFSendFile(FILE *pFileOut, FILE *pFileIn, int64_t *offset, int64_t size); +#else +int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size); int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size); +#endif void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length); bool taosValidFile(TdFilePtr pFile); diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index 9904170d61..4bac81754d 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -38,12 +38,12 @@ extern "C" { #endif #if defined (_TD_DARWIN_64) - #define pthread_rwlock_t pthread_mutex_t - #define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL) - #define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock) - #define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock) - #define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock) - #define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock) +// #define pthread_rwlock_t pthread_mutex_t +// #define pthread_rwlock_init(lock, NULL) pthread_mutex_init(lock, NULL) +// #define pthread_rwlock_destroy(lock) pthread_mutex_destroy(lock) +// #define pthread_rwlock_wrlock(lock) pthread_mutex_lock(lock) +// #define pthread_rwlock_rdlock(lock) pthread_mutex_lock(lock) +// #define pthread_rwlock_unlock(lock) pthread_mutex_unlock(lock) #define pthread_spinlock_t pthread_mutex_t #define pthread_spin_init(lock, NULL) pthread_mutex_init(lock, NULL) diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 3faed855ba..520d3af331 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -33,8 +33,15 @@ #include #include #else - #include - #include + #include + #include + + #if defined(_TD_DARWIN_64) + #include + #else + #include + #include + #endif #endif #ifdef __cplusplus @@ -49,7 +56,29 @@ extern "C" { #endif #if defined(_TD_DARWIN_64) - #define htobe64 htonll +// #define htobe64 htonll + +# include + +# define htobe16(x) OSSwapHostToBigInt16(x) +# define htole16(x) OSSwapHostToLittleInt16(x) +# define be16toh(x) OSSwapBigToHostInt16(x) +# define le16toh(x) OSSwapLittleToHostInt16(x) + +# define htobe32(x) OSSwapHostToBigInt32(x) +# define htole32(x) OSSwapHostToLittleInt32(x) +# define be32toh(x) OSSwapBigToHostInt32(x) +# define le32toh(x) OSSwapLittleToHostInt32(x) + +# define htobe64(x) OSSwapHostToBigInt64(x) +# define htole64(x) OSSwapHostToLittleInt64(x) +# define be64toh(x) OSSwapBigToHostInt64(x) +# define le64toh(x) OSSwapLittleToHostInt64(x) + +# define __BYTE_ORDER BYTE_ORDER +# define __BIG_ENDIAN BIG_ENDIAN +# define __LITTLE_ENDIAN LITTLE_ENDIAN +# define __PDP_ENDIAN PDP_ENDIAN #endif #define TAOS_EPOLL_WAIT_TIME 500 diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index d7cf05a594..7777466973 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -16,6 +16,7 @@ #ifndef _TD_OS_SYSINFO_H_ #define _TD_OS_SYSINFO_H_ +#include #include "os.h" #ifdef __cplusplus @@ -52,7 +53,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen); char *taosGetCmdlineByPID(int32_t pid); void taosSetCoreDump(bool enable); -#if defined(WINDOWS) +#if !defined(LINUX) #define _UTSNAME_LENGTH 65 #define _UTSNAME_MACHINE_LENGTH _UTSNAME_LENGTH diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7e4f1d9025..0d1f762db6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -864,7 +864,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs qsort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn); int64_t p1 = taosGetTimestampUs(); - printf("sort:%ld, rows:%d\n", p1 - p0, pDataBlock->info.rows); + printf("sort:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows); return TSDB_CODE_SUCCESS; } else { // var data type @@ -912,7 +912,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); - printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld, rows:%d\n", p1-p0, p2 - p1, p3 - p2, p4-p3, rows); + printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1-p0, p2 - p1, p3 - p2, p4-p3, rows); destroyTupleIndex(index); return TSDB_CODE_SUCCESS; @@ -1017,7 +1017,7 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) { } int32_t varColSort(SColumnInfoData* pColumnInfoData, SBlockOrderInfo* pOrder) { - + return 0; } int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { @@ -1055,8 +1055,9 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); - printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld, rows:%d\n", p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); + printf("sort:%" PRId64 ", create:%" PRId64", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); // destroyTupleIndex(index); + return 0; } void blockDataClearup(SSDataBlock* pDataBlock) { diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c index 464c29d287..5f0a353226 100644 --- a/source/common/src/ttszip.c +++ b/source/common/src/ttszip.c @@ -845,7 +845,11 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { int64_t offset = getDataStartOffset(); int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset; +#if defined(_TD_DARWIN_64) + int64_t written = taosFSendFile(pDestBuf->pFile->fp, pSrcBuf->pFile->fp, &offset, size); +#else int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size); +#endif if (written == -1 || written != size) { return -1; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index e1ccb03c66..80a0b3554c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1062,7 +1062,7 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) { - int16_t widx = abs(id % mgmt->slotNum); + int16_t widx = abs((int)(id % mgmt->slotNum)); SCtgRentSlot *slot = &mgmt->slots[widx]; int32_t code = 0; @@ -1092,11 +1092,11 @@ _return: } int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { - int16_t widx = abs(id % mgmt->slotNum); + int16_t widx = abs((int)(id % mgmt->slotNum)); SCtgRentSlot *slot = &mgmt->slots[widx]; int32_t code = 0; - + CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); @@ -1133,7 +1133,7 @@ _return: } int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { - int16_t widx = abs(id % mgmt->slotNum); + int16_t widx = abs((int)(id % mgmt->slotNum)); SCtgRentSlot *slot = &mgmt->slots[widx]; int32_t code = 0; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 744f6ca70b..d25fed6816 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -240,6 +240,7 @@ int indexRebuild(SIndex* index, SIndexOpts* opts){ #ifdef USE_INVERTED_INDEX #endif + return 0; } SIndexOpts* indexOptsCreate() { diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index b40ded9e9a..34f009dd7e 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -190,7 +190,10 @@ int indexCacheSchedToMerge(IndexCache* pCache) { schedMsg.msg = NULL; taosScheduleTask(indexQhandle, &schedMsg); + + return 0; } + static void indexCacheMakeRoomForWrite(IndexCache* cache) { while (true) { if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) { diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index a6cabbd439..18cde151d2 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -571,6 +571,8 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) { } fstSliceDestroy(&t); } + + return 0; } // fst node function @@ -1027,6 +1029,8 @@ Fst* fstCreate(FstSlice* slice) { FST_CREAT_FAILED: free(fst->meta); free(fst); + + return NULL; } void fstDestroy(Fst* fst) { if (fst) { @@ -1286,6 +1290,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { } return false; } + + return false; } StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallback callback) { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 43cc308483..28f680cfbb 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -763,6 +763,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema, // todo construct payload tfree(row); + + return 0; } // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 3fae580de9..cd9857c7f6 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -425,6 +425,7 @@ static SLogicNode* createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt) { default: break; } + return NULL; // to avoid compiler error } int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index a7aea5a7e5..33ec8ab6ef 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -792,7 +792,7 @@ int32_t filterDetachCnfGroups(SArray* group, SArray* left, SArray* right) { } SFilterGroup *gp = NULL; - while (gp = (SFilterGroup *)taosArrayPop(right)) { + while ((gp = (SFilterGroup *)taosArrayPop(right)) != NULL) { taosArrayPush(group, gp); } @@ -801,7 +801,7 @@ int32_t filterDetachCnfGroups(SArray* group, SArray* left, SArray* right) { if (taosArrayGetSize(right) <= 0) { SFilterGroup *gp = NULL; - while (gp = (SFilterGroup *)taosArrayPop(left)) { + while ((gp = (SFilterGroup *)taosArrayPop(left)) != NULL) { taosArrayPush(group, gp); } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index b8cdda9ed1..8e7eaa0f8c 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -239,6 +239,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t break; } + + default: + break; } if (param->num > *rowNum) { diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 9fba6523b6..993521da87 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -282,8 +282,10 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SCH_ERR_RET(schLaunchTasksInFlowCtrlListImpl(pJob, ctrl)); + int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);; + SCH_ERR_RET(code); + return code; // to avoid compiler error } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index f29b3022d8..7ececf285a 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -74,12 +74,12 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType); cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType); - snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->seqNum); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->seqNum); cJSON_AddStringToObject(pRoot, "seqNum", u64buf); cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak); - snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->term); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pEntry->index); cJSON_AddStringToObject(pRoot, "index", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); @@ -107,26 +107,26 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { // for debug ---------------------- void syncEntryPrint(const SSyncRaftEntry* pObj) { char* serialized = syncEntry2Str(pObj); - printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized); + printf("syncEntryPrint | len:%zu | %s \n", strlen(serialized), serialized); fflush(NULL); free(serialized); } void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj) { char* serialized = syncEntry2Str(pObj); - printf("syncEntryPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + printf("syncEntryPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); free(serialized); } void syncEntryLog(const SSyncRaftEntry* pObj) { char* serialized = syncEntry2Str(pObj); - sTrace("syncEntryLog | len:%lu | %s", strlen(serialized), serialized); + sTrace("syncEntryLog | len:%zu | %s", strlen(serialized), serialized); free(serialized); } void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) { char* serialized = syncEntry2Str(pObj); - sTrace("syncEntryLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + sTrace("syncEntryLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); free(serialized); -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 5c8e70979c..6cb6a28805 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -97,7 +97,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted); cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes); - snprintf(u64buf, sizeof(u64buf), "%lu", pVotesGranted->term); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pVotesGranted->term); cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum); cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader); @@ -122,27 +122,27 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) { // for debug ------------------- void voteGrantedPrint(SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized); + printf("voteGrantedPrint | len:%zu | %s \n", strlen(serialized), serialized); fflush(NULL); free(serialized); } void voteGrantedPrint2(char *s, SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + printf("voteGrantedPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); free(serialized); } void voteGrantedLog(SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized); + sTrace("voteGrantedLog | len:%zu | %s", strlen(serialized), serialized); free(serialized); } void voteGrantedLog2(char *s, SVotesGranted *pObj) { char *serialized = voteGranted2Str(pObj); - sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + sTrace("voteGrantedLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); free(serialized); } @@ -222,7 +222,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond); cJSON_AddNumberToObject(pRoot, "respondNum", respondNum); - snprintf(u64buf, sizeof(u64buf), "%lu", pVotesRespond->term); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pVotesRespond->term); cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode); cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); @@ -242,26 +242,26 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) { // for debug ------------------- void votesRespondPrint(SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized); + printf("votesRespondPrint | len:%zu | %s \n", strlen(serialized), serialized); fflush(NULL); free(serialized); } void votesRespondPrint2(char *s, SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + printf("votesRespondPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); free(serialized); } void votesRespondLog(SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized); + sTrace("votesRespondLog | len:%zu | %s", strlen(serialized), serialized); free(serialized); } void votesRespondLog2(char *s, SVotesRespond *pObj) { char *serialized = votesRespond2Str(pObj); - sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + sTrace("votesRespondLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); free(serialized); -} \ No newline at end of file +} diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index c46989dc5d..01e9808aa6 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -558,4 +558,6 @@ int32_t tfsGetMonitorInfo(STfs *pTfs, SMonDiskInfo *pInfo) { } } tfsUnLock(pTfs); -} \ No newline at end of file + + return 0; +} diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2f6ff3763f..5be1b7ea90 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -291,6 +291,7 @@ void* destroyConnPool(void* pool) { connList = taosHashIterate((SHashObj*)pool, connList); } taosHashCleanup(pool); + return NULL; } static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { @@ -576,6 +577,8 @@ static void* cliWorkThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); + + return NULL; } void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 367cb33fc9..7123593a33 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -155,14 +155,16 @@ bool transReadComplete(SConnBuffer* connBuf) { } return false; } -int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {} +int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) {return 0;} -int transUnpackMsg(STransMsgHead* msgHead) {} +int transUnpackMsg(STransMsgHead* msgHead) {return 0;} int transDestroyBuffer(SConnBuffer* buf) { if (buf->cap > 0) { tfree(buf->buf); } transClearBuffer(buf); + + return 0; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index cb3bbaefec..d26c5ec0f4 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -538,6 +538,8 @@ void* acceptThread(void* arg) { setThreadName("trans-accept"); SServerObj* srv = (SServerObj*)arg; uv_run(srv->loop, UV_RUN_DEFAULT); + + return NULL; } static bool addHandleToWorkloop(void* arg) { SWorkThrdObj* pThrd = arg; @@ -593,6 +595,8 @@ void* workerThread(void* arg) { setThreadName("trans-worker"); SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); + + return NULL; } static SSrvConn* createConn(void* hThrd) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 9e1ffeae7f..8d3acef9e7 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -169,7 +169,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } if (pRead->pHead->head.version != ver) { - wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver); + wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver); pRead->curVersion = -1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 6472202bf9..bcbd95e160 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -14,6 +14,7 @@ */ #define ALLOW_FORBID_FUNC #include "os.h" +#include "osSemaphore.h" #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include @@ -35,26 +36,18 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */ #else #include #include -#include + +#if !defined(_TD_DARWIN_64) + #include +#endif #include #include #define LINUX_FILE_NO_TEXT_OPTION 0 #define O_TEXT LINUX_FILE_NO_TEXT_OPTION #endif -typedef int32_t FileFd; - #define FILE_WITH_LOCK 1 -typedef struct TdFile { -#if FILE_WITH_LOCK - pthread_rwlock_t rwlock; -#endif - int refId; - FileFd fd; - FILE *fp; -} * TdFilePtr, TdFile; - void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) const char *tdengineTmpFileNamePrefix = "tdengine-"; diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 698ceded16..e3b1015d88 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -17,7 +17,7 @@ #define ALLOW_FORBID_FUNC #include "os.h" -#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +#if defined(WINDOWS) #include #include #include @@ -37,8 +37,14 @@ #include #include #include -#include #include + +#if defined(DARWIN) + #include + #include "osEok.h" +#else + #include +#endif #endif typedef int32_t SocketFd; @@ -210,7 +216,7 @@ int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) { #endif } -#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) +#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) #if defined(_TD_GO_DLL_) uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); } #endif diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index d4bcc27f60..404c9c8f71 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -570,6 +570,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { case CFG_DTYPE_LOCALE: case CFG_DTYPE_CHARSET: case CFG_DTYPE_TIMEZONE: + case CFG_DTYPE_NONE: if (dump) { printf("%s %s %s", src, name, pItem->str); printf("\n"); @@ -655,4 +656,4 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { uInfo("load from apoll url %s", url); return 0; -} \ No newline at end of file +} diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 7bdcf3cc64..46bca6e6cb 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -628,7 +628,7 @@ void taosTmrCleanUp(void* handle) { tmrCtrls = NULL; unusedTmrCtrl = NULL; -#if !defined(WINDOWS) +#if defined(LINUX) tmrModuleInit = PTHREAD_ONCE_INIT; // to support restart #endif } From 4f330fab1fe48f8eb925a2f57e6b9f7d71de6422 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 15 Mar 2022 23:47:37 +0800 Subject: [PATCH 4/9] handle except --- include/libs/transport/trpc.h | 2 +- source/libs/transport/inc/transComm.h | 3 ++ source/libs/transport/src/trans.c | 13 ++++-- source/libs/transport/src/transCli.c | 63 +++++++++++++++++++++++---- source/libs/transport/src/transSrv.c | 7 ++- source/libs/transport/test/transUT.cc | 53 +++++++++++++++++++++- 6 files changed, 126 insertions(+), 15 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 231bc4af45..b5b8d6ab66 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -94,7 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); // just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle); +void rpcReleaseHandle(void *handle, int8_t type); void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 76347cdba2..8ea65b193d 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -252,6 +252,9 @@ void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); +void transReleaseCliHandle(void* handle); +void transReleaseSrvHandle(void* handle); + void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* pMsg); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 213782c5bd..2cab03f133 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -22,6 +22,11 @@ void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThre void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; +void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; +void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; + +void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; + void* rpcOpen(const SRpcInit* pInit) { SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { @@ -127,9 +132,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } -void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; -void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; - void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosRefHandle[type])(handle); @@ -140,6 +142,11 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } +void rpcReleaseHandle(void* handle, int8_t type) { + assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); + (*transReleaseHandle[type])(handle); +} + int32_t rpcInit() { // impl later return 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5c50ca2bc3..931f58097e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,6 +17,11 @@ #include "transComm.h" +// Normal(default): send/recv msg +// Quit: quit rpc inst +// Release: release handle to rpc inst +typedef enum { Normal, Quit, Release } SCliMsgType; + typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -49,6 +54,7 @@ typedef struct SCliMsg { STransMsg msg; queue q; uint64_t st; + SCliMsgType type; } SCliMsg; typedef struct SCliThrdObj { @@ -108,6 +114,8 @@ static void cliHandleExcept(SCliConn* conn); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd); + static void cliSendQuit(SCliThrdObj* thrd); static void destroyUserdata(STransMsg* userdata); @@ -121,8 +129,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) - -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) +#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ @@ -344,6 +352,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; + // avoid conn + QUEUE_REMOVE(&conn->conn); transAllocBuffer(pBuf, buf); } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -506,6 +516,21 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->quit = true; uv_stop(pThrd->loop); } +static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { + SCliConn* conn = pMsg->msg.handle; + tDebug("%s cli conn %p release to inst", CONN_GET_INST_LABEL(conn), conn); + + destroyCmsg(pMsg); + conn->data = NULL; + + transDestroyBuffer(&conn->readBuf); + if (conn->persist && T_REF_VAL_GET(conn) >= 2) { + transUnrefCliHandle(conn); + addConnToPool(pThrd->pool, conn); + } else { + transUnrefCliHandle(conn); + } +} SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; @@ -517,7 +542,9 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } else { STransConnCtx* pCtx = pMsg->ctx; conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); - if (conn != NULL) tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + if (conn != NULL) { + tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + } } return conn; } @@ -572,10 +599,13 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - if (pMsg->ctx == NULL) { - cliHandleQuit(pMsg, pThrd); - } else { + + if (pMsg->type == Normal) { cliHandleReq(pMsg, pThrd); + } else if (pMsg->type == Quit) { + cliHandleQuit(pMsg, pThrd); + } else if (pMsg->type == Release) { + cliHandleRelease(pMsg, pThrd); } count++; } @@ -671,8 +701,10 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); + msg->type = Quit; transSendAsync(thrd->asyncPool, &msg->q); } + int cliRBChoseIdx(STrans* pTransInst) { int64_t index = pTransInst->index; if (pTransInst->index++ >= pTransInst->numOfThreads) { @@ -702,10 +734,25 @@ void transUnrefCliHandle(void* handle) { return; } int ref = T_REF_DEC((SCliConn*)handle); + tDebug("%s cli conn %p ref %d", CONN_GET_INST_LABEL((SCliConn*)handle), handle, ref); if (ref == 0) { cliDestroyConn((SCliConn*)handle, true); } } +void transReleaseCliHandle(void* handle) { + SCliThrdObj* thrd = CONN_GET_HOST_THREAD(handle); + if (thrd == NULL) { + return; + } + + STransMsg tmsg = {.handle = handle}; + + SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); + cmsg->type = Release; + cmsg->msg = tmsg; + + transSendAsync(thrd->asyncPool, &cmsg->q); +} void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { STrans* pTransInst = (STrans*)shandle; @@ -728,7 +775,7 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p assert(pTransInst->connType == TAOS_CONN_CLIENT); // atomic or not - SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); + SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->msg = *pMsg; cliMsg->st = taosGetTimestampUs(); @@ -753,7 +800,7 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq pCtx->pRsp = pRsp; tsem_init(pCtx->pSem, 0, 0); - SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); + SCliMsg* cliMsg = calloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index a3cc6b6181..960e064b8f 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -641,7 +641,7 @@ static void uvDestroyConn(uv_handle_t* handle) { uv_timer_stop(&conn->pTimer); QUEUE_REMOVE(&conn->queue); free(conn->pTcp); - free(conn); + // free(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { uv_loop_close(thrd->loop); @@ -786,6 +786,11 @@ void transUnrefSrvHandle(void* handle) { } // unref srv handle } + +void transReleaseSrvHandle(void* handle) { + // do nothing currently + // +} void transSendResponse(const STransMsg* pMsg) { if (pMsg->handle == NULL) { return; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index b0fae2f8b4..46f6424b0b 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -110,6 +110,14 @@ class Client { SemWait(); *resp = this->resp; } + void SendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { + if (req->handle != NULL) { + rpcReleaseHandle(req->handle, TAOS_CONN_CLIENT); + req->handle = NULL; + } + SendAndRecv(req, resp); + } + void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {} void SemWait() { tsem_wait(&this->sem); } void SemPost() { tsem_post(&this->sem); } @@ -268,6 +276,7 @@ class TransObj { } void RestartSrv() { srv->Restart(); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } + void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); } ~TransObj() { delete cli; @@ -352,7 +361,47 @@ TEST_F(TransEnv, cliPersistHandle) { EXPECT_TRUE(resp.code != 0); } } + ////////////////// +} +TEST_F(TransEnv, cliReleaseHandle) { + tr->SetCliPersistFp(cliPersistHandle); + + SRpcMsg resp = {0}; + for (int i = 0; i < 10; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecvNoHandle(&req, &resp); + // if (i == 5) { + // std::cout << "stop server" << std::endl; + // tr->StopSrv(); + //} + // if (i >= 6) { + EXPECT_TRUE(resp.code == 0); + //} + } + ////////////////// +} +TEST_F(TransEnv, cliReleaseHandleExcept) { + tr->SetCliPersistFp(cliPersistHandle); + + SRpcMsg resp = {0}; + for (int i = 0; i < 10; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecvNoHandle(&req, &resp); + if (i == 5) { + std::cout << "stop server" << std::endl; + tr->StopSrv(); + } + if (i >= 6) { + EXPECT_TRUE(resp.code != 0); + } + } ////////////////// } TEST_F(TransEnv, srvContinueSend) { @@ -367,11 +416,11 @@ TEST_F(TransEnv, srvContinueSend) { taosMsleep(2000); } -TEST_F(TransEnv, srvPersisHandleExcept) { +TEST_F(TransEnv, srvPersistHandleExcept) { // conn breken // } -TEST_F(TransEnv, cliPersisHandleExcept) { +TEST_F(TransEnv, cliPersistHandleExcept) { // conn breken } From 6e9bc0cbd9c6d4bd88a572ca877b2ad8286f20bd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 16 Mar 2022 09:08:42 +0800 Subject: [PATCH 5/9] handle except --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 931f58097e..3696e86d22 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -525,6 +525,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { transDestroyBuffer(&conn->readBuf); if (conn->persist && T_REF_VAL_GET(conn) >= 2) { + conn->persist = false; transUnrefCliHandle(conn); addConnToPool(pThrd->pool, conn); } else { @@ -746,8 +747,7 @@ void transReleaseCliHandle(void* handle) { } STransMsg tmsg = {.handle = handle}; - - SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); + SCliMsg* cmsg = calloc(1, sizeof(SCliMsg)); cmsg->type = Release; cmsg->msg = tmsg; From 3c6c518470772adbf44695d826e86347e9eff533 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Wed, 16 Mar 2022 11:37:31 +0800 Subject: [PATCH 6/9] [TD-13063]: 3.0 on windows (#10765) * [TD-13063]: 3.0 on Windows * add pthread in contrib * fix linux compile * fix osSemaphore * add gnu regex for Windows * fix compile error for Windows * support arm platform * port more OS files * fix for Windows compile * port more files * fix macOS on x86_64 * port osFile * port osSemaphone.h * port osSocket.c * port tconfig.c * port ttimer.c * add couple files * merge with 3.0 --- cmake/cmake.platform | 2 +- source/client/src/clientMsgHandler.c | 6 +-- source/dnode/mnode/impl/src/mndConsumer.c | 12 ++--- source/dnode/mnode/impl/src/mndSubscribe.c | 14 ++--- source/dnode/mnode/sdb/src/sdbFile.c | 2 +- source/libs/executor/src/dataSinkMgt.c | 1 + source/libs/executor/src/tlinearhash.c | 4 +- source/libs/sync/src/syncAppendEntriesReply.c | 2 +- source/libs/sync/src/syncEnv.c | 6 +-- source/libs/sync/src/syncMain.c | 28 +++++----- source/libs/sync/src/syncMessage.c | 54 +++++++++---------- source/libs/sync/src/syncRaftLog.c | 10 ++-- source/libs/sync/src/syncRequestVoteReply.c | 2 +- source/libs/sync/src/syncUtil.c | 4 +- 14 files changed, 76 insertions(+), 71 deletions(-) diff --git a/cmake/cmake.platform b/cmake/cmake.platform index 0312f92a5b..dfb79c0fae 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -42,7 +42,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin SET(TD_DARWIN TRUE) SET(OSTYPE "macOS") - ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare") + ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare -Wno-return-type") MESSAGE("Current system processor is ${CMAKE_SYSTEM_PROCESSOR}.") IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64") diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 9534c11646..010f4e6c12 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -33,7 +33,7 @@ int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) { setErrno(pRequest, code); free(pMsg->pData); - sem_post(&pRequest->body.rspSem); + tsem_post(&pRequest->body.rspSem); return code; } @@ -42,7 +42,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { free(pMsg->pData); setErrno(pRequest, code); - sem_post(&pRequest->body.rspSem); + tsem_post(&pRequest->body.rspSem); return code; } @@ -78,7 +78,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->numOfConns); free(pMsg->pData); - sem_post(&pRequest->body.rspSem); + tsem_post(&pRequest->body.rspSem); return 0; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 0f4b538cde..4a9d23aa15 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -96,12 +96,12 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { CM_ENCODE_OVER: tfree(buf); if (terrno != 0) { - mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); + mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); sdbFreeRaw(pRaw); return NULL; } - mTrace("consumer:%ld, encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer); + mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer); return pRaw; } @@ -140,7 +140,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { CM_DECODE_OVER: tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { - mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); + mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); tfree(pRow); return NULL; } @@ -149,17 +149,17 @@ CM_DECODE_OVER: } static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mTrace("consumer:%ld, perform insert action", pConsumer->consumerId); + mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId); return 0; } static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mTrace("consumer:%ld, perform delete action", pConsumer->consumerId); + mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId); return 0; } static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { - mTrace("consumer:%ld, perform update action", pOldConsumer->consumerId); + mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId); // TODO handle update /*taosWLockLatch(&pOldConsumer->lock);*/ diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index fa4fed59d0..1dcd07829d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -448,7 +448,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i); - mInfo("mq remove lost consumer %ld", lostConsumerId); + mInfo("mq remove lost consumer %" PRId64 "", lostConsumerId); for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); @@ -479,7 +479,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { else vgThisConsumerAfterRb = vgEachConsumer; - mInfo("mq consumer:%ld, connectted vgroup number change from %d to %d", pSubConsumer->consumerId, + mInfo("mq consumer:%" PRId64 ", connectted vgroup number change from %d to %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb); while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { @@ -503,7 +503,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); } - mInfo("mq consumer:%ld, status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); + mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); @@ -537,13 +537,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { mndSplitSubscribeKey(pSub->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic, + mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, topic, pConsumerEp->consumerId); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndReleaseTopic(pMnode, pTopic); } else { - mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId, + mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); @@ -1099,7 +1099,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName); bool createSub = false; if (pSub == NULL) { - mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName); + mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup, newTopicName); pSub = mndCreateSubscription(pMnode, pTopic, cgroup); createSub = true; @@ -1118,7 +1118,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { pConsumerEp->consumerId = consumerId; taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); if (pConsumerEp->oldConsumerId == -1) { - mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, newTopicName, + mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName, pConsumerEp->consumerId); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); } else { diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index a3fe4dfb14..2849da5c2e 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -196,7 +196,7 @@ int32_t sdbReadFile(SSdb *pSdb) { } int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t); - if (!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen) != 0) { + if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) { code = TSDB_CODE_CHECKSUM_ERROR; mError("failed to read file:%s since %s", file, tstrerror(code)); break; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 343b3a3c95..4e8583eb2a 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -23,6 +23,7 @@ static SDataSinkManager gDataSinkManager = {0}; int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { gDataSinkManager.cfg = *cfg; pthread_mutex_init(&gDataSinkManager.mutex, NULL); + return 0; // to avoid compiler eror } int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) { diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 9f3d694c42..28319469cc 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -413,7 +413,7 @@ int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) { void tHashPrint(const SLHashObj* pHashObj, int32_t type) { printf("==================== linear hash ====================\n"); - printf("total bucket:%d, size:%ld, ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO); + printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO); dBufSetPrintInfo(pHashObj->pBuf); @@ -425,4 +425,4 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) { } else { dBufPrintStatis(pHashObj->pBuf); } -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 61eb4884e2..a75e85601f 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -39,7 +39,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm); + sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); return ret; } diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index dd7161800d..4b650ec087 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -55,7 +55,7 @@ static void syncEnvTick(void *param, void *tmrId) { if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { ++(pSyncEnv->envTickTimerCounter); sTrace( - "syncEnvTick do ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, " + "syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " "envTickTimerMS:%d, tmrId:%p", pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, pSyncEnv->envTickTimerMS, tmrId); @@ -64,7 +64,7 @@ static void syncEnvTick(void *param, void *tmrId) { taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); } else { sTrace( - "syncEnvTick pass ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, " + "syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " "envTickTimerMS:%d, tmrId:%p", pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, pSyncEnv->envTickTimerMS, tmrId); @@ -74,7 +74,7 @@ static void syncEnvTick(void *param, void *tmrId) { static SSyncEnv *doSyncEnvStart() { SSyncEnv *pSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); assert(pSyncEnv != NULL); - memset(pSyncEnv, 0, sizeof(pSyncEnv)); + memset(pSyncEnv, 0, sizeof(SSyncEnv)); pSyncEnv->envTickTimerCounter = 0; pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index dd2c142104..b919e014a0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -420,46 +420,46 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // tla+ log vars cJSON_AddItemToObject(pRoot, "pLogStore", logStore2Json(pSyncNode->pLogStore)); - snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->commitIndex); + snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", pSyncNode->commitIndex); cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); // ping timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer); cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf); cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClock); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClock); cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClockUser); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimerCB); cJSON_AddStringToObject(pRoot, "FpPingTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerCounter); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->pingTimerCounter); cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf); // elect timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer); cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf); cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClock); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClock); cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClockUser); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimerCB); cJSON_AddStringToObject(pRoot, "FpElectTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerCounter); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->electTimerCounter); cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf); // heartbeat timer snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer); cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf); cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClock); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClockUser); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerLogicClockUser); cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimerCB); cJSON_AddStringToObject(pRoot, "FpHeartbeatTimerCB", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerCounter); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pSyncNode->heartbeatTimerCounter); cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf); // callback @@ -634,7 +634,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", + sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "", pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } } @@ -655,7 +655,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%lu, electTimerLogicClockUser:%lu", + sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "", pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); } } @@ -676,7 +676,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { - sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", + sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 "", pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } } @@ -713,4 +713,4 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg } return ret; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 509ede274b..e28f780831 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -218,7 +218,7 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->logicClock); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->logicClock); cJSON_AddStringToObject(pRoot, "logicClock", u64buf); cJSON_AddNumberToObject(pRoot, "timerMS", pMsg->timerMS); snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); @@ -239,7 +239,7 @@ char* syncTimeout2Str(const SyncTimeout* pMsg) { // for debug ---------------------- void syncTimeoutPrint(const SyncTimeout* pMsg) { char* serialized = syncTimeout2Str(pMsg); - printf("syncTimeoutPrint | len:%lu | %s \n", strlen(serialized), serialized); + printf("syncTimeoutPrint | len:%zu | %s \n", strlen(serialized), serialized); fflush(NULL); free(serialized); } @@ -349,7 +349,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) { cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; @@ -364,7 +364,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; @@ -512,7 +512,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; @@ -527,7 +527,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; @@ -565,27 +565,27 @@ char* syncPingReply2Str(const SyncPingReply* pMsg) { // for debug ---------------------- void syncPingReplyPrint(const SyncPingReply* pMsg) { char* serialized = syncPingReply2Str(pMsg); - printf("syncPingReplyPrint | len:%lu | %s \n", strlen(serialized), serialized); + printf("syncPingReplyPrint | len:%zu | %s \n", strlen(serialized), serialized); fflush(NULL); free(serialized); } void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg) { char* serialized = syncPingReply2Str(pMsg); - printf("syncPingReplyPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + printf("syncPingReplyPrint2 | len:%zu | %s | %s \n", strlen(serialized), s, serialized); fflush(NULL); free(serialized); } void syncPingReplyLog(const SyncPingReply* pMsg) { char* serialized = syncPingReply2Str(pMsg); - sTrace("syncPingReplyLog | len:%lu | %s", strlen(serialized), serialized); + sTrace("syncPingReplyLog | len:%zu | %s", strlen(serialized), serialized); free(serialized); } void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) { char* serialized = syncPingReply2Str(pMsg); - sTrace("syncPingReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized); free(serialized); } @@ -670,7 +670,7 @@ cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->seqNum); cJSON_AddStringToObject(pRoot, "seqNum", u64buf); cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); @@ -792,7 +792,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; @@ -820,11 +820,11 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogIndex); cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->lastLogTerm); cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); cJSON* pJson = cJSON_CreateObject(); @@ -936,7 +936,7 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; @@ -964,7 +964,7 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted); @@ -1079,7 +1079,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; @@ -1094,7 +1094,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; @@ -1108,16 +1108,16 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogIndex); cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->prevLogTerm); cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->commitIndex); cJSON_AddStringToObject(pRoot, "commit_index", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); @@ -1238,7 +1238,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->srcId.addr); cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; @@ -1253,7 +1253,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->destId.addr); cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; @@ -1267,10 +1267,10 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "destId", pDestId); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "success", pMsg->success); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pMsg->matchIndex); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 6ebeba1991..3ae2fc4721 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -34,6 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->getLastTerm = logStoreLastTerm; pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; pLogStore->getCommitIndex = logStoreGetCommitIndex; + return pLogStore; // to avoid compiler error } void logStoreDestory(SSyncLogStore* pLogStore) { @@ -58,6 +59,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); free(serialized); + return code; // to avoid compiler error } SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { @@ -79,6 +81,7 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walRollback(pWal, fromIndex); + return 0; // to avoid compiler error } SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { @@ -102,6 +105,7 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walCommit(pWal, index); + return 0; // to avoid compiler error } SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { @@ -130,9 +134,9 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%" PRId64 "", logStoreLastIndex(pLogStore)); cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", logStoreLastTerm(pLogStore)); cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); cJSON* pEntries = cJSON_CreateArray(); @@ -181,4 +185,4 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) { char* serialized = logStore2Str(pLogStore); sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized); free(serialized); -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 7cdeace166..60a2c008fe 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -41,7 +41,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg); if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%lu, current term:%lu", pMsg->term, ths->pRaftStore->currentTerm); + sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); return ret; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index ba8a76c190..90889a7af0 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -119,7 +119,7 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) { char u64buf[128]; cJSON* pRoot = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%lu", p->addr); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", p->addr); cJSON_AddStringToObject(pRoot, "addr", u64buf); char host[128]; uint16_t port; @@ -196,4 +196,4 @@ SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b) { SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b) { SyncIndex r = a > b ? a : b; return r; -} \ No newline at end of file +} From f43efb9fea446835bdd5613fa9cc2f5a867d583a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 16 Mar 2022 13:30:03 +0800 Subject: [PATCH 7/9] handle except and update UT --- include/libs/transport/trpc.h | 2 +- source/libs/transport/inc/transportInt.h | 2 +- source/libs/transport/src/transSrv.c | 24 +++++++++--- source/libs/transport/test/transUT.cc | 47 ++++++++++++++++++------ 4 files changed, 55 insertions(+), 20 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index b5b8d6ab66..5795cdd919 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -73,7 +73,7 @@ typedef struct SRpcInit { void *(*mfp)(void *parent, tmsg_t msgType); // call back to handle except when query/fetch in progress - void (*efp)(void *parent, tmsg_t msgType); + bool (*efp)(void *parent, tmsg_t msgType); void *parent; } SRpcInit; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index e760fe1de9..e739380467 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -65,7 +65,7 @@ typedef struct { int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); bool (*pfp)(void* parent, tmsg_t msgType); void* (*mfp)(void* parent, tmsg_t msgType); - void (*efp)(void* parent, tmsg_t msgType); + bool (*efp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 960e064b8f..59bc85e91d 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -106,6 +106,8 @@ static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); +static void uvNotifyLinkBrokenToApp(SSrvConn* conn); + static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static SSrvConn* createConn(void* hThrd); @@ -233,7 +235,7 @@ static void uvHandleReq(SSrvConn* pConn) { ntohs(pConn->locaddr.sin_port), transMsg.contLen); STrans* pTransInst = (STrans*)p->shandle; - (*((STrans*)p->shandle)->cfp)(pTransInst->parent, &transMsg, NULL); + (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type @@ -261,13 +263,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tError("server conn %p read error: %s", conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; - transUnrefSrvHandle(conn); + uvNotifyLinkBrokenToApp(conn); - // if (conn->ref > 1) { - // conn->ref++; // ref > 1 signed that write is in progress + // STrans* pTransInst = conn->pTransInst; + // if (pTransInst->efp != NULL && (pTransInst->efp)(NULL, conn->inType)) { //} - // tError("server conn %p read error: %s", conn, uv_err_name(nread)); - // destroyConn(conn, true); + transUnrefSrvHandle(conn); } } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -373,6 +374,17 @@ static void uvStartSendResp(SSrvMsg* smsg) { uvStartSendRespInternal(smsg); return; } + +static void uvNotifyLinkBrokenToApp(SSrvConn* conn) { + STrans* pTransInst = conn->pTransInst; + if (pTransInst->efp != NULL && (*pTransInst->efp)(NULL, conn->inType) && T_REF_VAL_GET(conn) >= 2) { + STransMsg transMsg = {0}; + transMsg.msgType = conn->inType; + transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + // transRefSrvHandle(conn); + (*pTransInst->cfp)(pTransInst->parent, &transMsg, 0); + } +} static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 46f6424b0b..b3fbade050 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -15,6 +15,7 @@ #include #include #include +#include "rpcLog.h" #include "tdatablock.h" #include "tglobal.h" #include "tlog.h" @@ -48,7 +49,9 @@ static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) { return NULL; } // server except -static void NotifyAppLinkBroken(void *parent, tmsg_t msgType) {} +static bool handleExcept(void *parent, tmsg_t msgType) { + return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP; +} typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -83,6 +86,10 @@ class Client { rpcInit_.cfp = cb; this->transCli = rpcOpen(&rpcInit_); } + void Stop() { + rpcClose(this->transCli); + this->transCli = NULL; + } void SetPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { rpcClose(this->transCli); rpcInit_.pfp = pfp; @@ -157,7 +164,7 @@ class Server { rpcClose(this->transSrv); this->transSrv = NULL; } - void SetExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + void SetExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { this->Stop(); rpcInit_.efp = efp; this->Start(); @@ -207,6 +214,7 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { Client *client = (Client *)parent; client->SetResp(pMsg); client->SemPost(); + tDebug("received resp"); } static void initEnv() { @@ -266,7 +274,7 @@ class TransObj { cli->SetPAndMFp(pfp, mfp); } // call when link broken, and notify query or fetch stop - void SetSrvExceptFp(void (*efp)(void *parent, tmsg_t msgType)) { + void SetSrvExceptFp(bool (*efp)(void *parent, tmsg_t msgType)) { //////// srv->SetExceptFp(efp); } @@ -275,6 +283,10 @@ class TransObj { srv->SetSrvContinueSend(cfp); } void RestartSrv() { srv->Restart(); } + void cliStop() { + /////// + cli->Stop(); + } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } void cliSendAndRecvNoHandle(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecvNoHandle(req, resp); } @@ -417,20 +429,31 @@ TEST_F(TransEnv, srvContinueSend) { } TEST_F(TransEnv, srvPersistHandleExcept) { - // conn breken + tr->SetSrvContinueSend(processContinueSend); + tr->SetCliPersistFp(cliPersistHandle); + SRpcMsg resp = {0}; + for (int i = 0; i < 5; i++) { + SRpcMsg req = {.handle = resp.handle}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + if (i > 2) { + tr->cliStop(); + break; + } + } + taosMsleep(2000); + // conn broken // } -TEST_F(TransEnv, cliPersistHandleExcept) { - // conn breken -} -TEST_F(TransEnv, multiCliPersisHandleExcept) { - // conn breken -} -TEST_F(TransEnv, multiSrvPersisHandleExcept) { - // conn breken +TEST_F(TransEnv, multiCliPersistHandleExcept) { + // conn broken } TEST_F(TransEnv, queryExcept) { + tr->SetSrvExceptFp(handleExcept); + // query and conn is broken } TEST_F(TransEnv, noResp) { From 8286d6efcffc13a835f185a607a4b814dd2e7957 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 16 Mar 2022 14:00:41 +0800 Subject: [PATCH 8/9] migrate TS-1278 from 2.x --- source/common/src/trow.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/common/src/trow.c b/source/common/src/trow.c index 861b4dc093..db4bc49425 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -353,10 +353,10 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i for (int i = 0; i < src2->numOfCols; i++) { SCellVal sVal = {0}; ASSERT(target->cols[i].type == src2->cols[i].type); - if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) { - if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1) < 0) { - TASSERT(0); - } + if (tdGetColDataOfRow(&sVal, src2->cols + i, *iter2) < 0) { + TASSERT(0); + } + if (src2->cols[i].len > 0 && !tdValTypeIsNull(sVal.valType)) { tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints); } else if (!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1) < 0) { From 4046173c48d05f3a6f849ef5443dacb6a7cbc7b9 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 16 Mar 2022 14:17:03 +0800 Subject: [PATCH 9/9] [add interval case] --- tests/script/tsim/query/interval.sim | 207 +++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 tests/script/tsim/query/interval.sim diff --git a/tests/script/tsim/query/interval.sim b/tests/script/tsim/query/interval.sim new file mode 100644 index 0000000000..35e7c938d8 --- /dev/null +++ b/tests/script/tsim/query/interval.sim @@ -0,0 +1,207 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c wal -v 1 +system sh/exec.sh -n dnode1 -s start +sleep 2000 +sql connect + +$dbPrefix = m_in_db +$tbPrefix = m_in_tb +$mtPrefix = m_in_mt +$tbNum = 10 +$rowNum = 20 +$totalNum = 200 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +sql drop database $db -x step1 +step1: +sql create database $db +sql use $db +sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) + +print ====== start create child tables and insert data +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $mt tags( $i ) + + $x = 0 + while $x < $rowNum + $cc = $x * 60000 + $ms = 1601481600000 + $cc + + sql insert into $tb values ($ms , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +print =============== step2 +$i = 1 +$tb = $tbPrefix . $i + +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m) +print ===> $rows +if $rows < $rowNum then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data05 != 1 then + return -1 +endi + +print =============== step3 +$cc = 4 * 60000 +$ms = 1601481600000 + $cc +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m) +print ===> $rows +if $rows > 10 then + return -1 +endi +if $rows < 3 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data05 != 1 then + return -1 +endi + +print =============== step4 +$cc = 40 * 60000 +$ms = 1601481600000 + $cc + +$cc = 1 * 60000 +$ms2 = 1601481600000 - $cc + +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) +print ===> $rows +if $rows < 18 then + return -1 +endi +if $rows > 22 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data05 != 1 then + return -1 +endi + +print =============== step5 +$cc = 40 * 60000 +$ms = 1601481600000 + $cc + +$cc = 1 * 60000 +$ms2 = 1601481600000 - $cc + +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0) +print ===> $rows +if $rows < 30 then + return -1 +endi +if $rows > 50 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data25 != 1 then + return -1 +endi + +print =============== step6 +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m) +print ===> $rows +if $rows < 18 then + return -1 +endi +if $rows > 22 then + return -1 +endi +if $data11 > 15 then + return -1 +endi +if $data11 < 5 then + return -1 +endi + +print =============== step7 +$cc = 4 * 60000 +$ms = 1601481600000 + $cc +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m) +print ===> $rows +if $rows < 3 then + return -1 +endi +if $rows > 7 then + return -1 +endi +if $data11 > 15 then + return -1 +endi +if $data11 < 5 then + return -1 +endi + +print =============== step8 +$cc = 40 * 60000 +$ms1 = 1601481600000 + $cc + +$cc = 1 * 60000 +$ms2 = 1601481600000 - $cc + +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) +print ===> $rows +if $rows < 18 then + return -1 +endi +if $rows > 22 then + return -1 +endi +if $data11 > 15 then + return -1 +endi +if $data11 < 5 then + return -1 +endi + +print =============== step9 +$cc = 40 * 60000 +$ms1 = 1601481600000 + $cc + +$cc = 1 * 60000 +$ms2 = 1601481600000 - $cc + +sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0) +if $rows < 30 then + return -1 +endi +if $rows > 50 then + return -1 +endi +if $data11 > 15 then + return -1 +endi +if $data11 < 5 then + return -1 +endi + +print =============== clear +sql drop database $db +sql show databases +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file