From 4fdc98d3fb4987fc87e67cb34fff943ca70589ef Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 22:15:06 +0800 Subject: [PATCH 01/13] opt http module --- source/libs/transport/src/thttp.c | 92 ++++++++++++++++++++++++++-- source/libs/transport/src/transCli.c | 30 +-------- 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 92989a45f5..6715290acf 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -20,11 +20,80 @@ #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "transComm.h" // clang-format on #define HTTP_RECV_BUF_SIZE 1024 +typedef struct SHttpModule { + uv_loop_t* loop; + SAsyncPool* asyncPool; + TdThread thread; +} SHttpModule; + +typedef struct SHttpMsg { + queue q; + char* server; + int32_t port; + char* cont; + int32_t len; + EHttpCompFlag flag; + +} SHttpMsg; + +static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; +static SHttpModule* http = NULL; + +static void* httpThread(void* arg) { + SHttpModule* http = (SHttpModule*)arg; + setThreadName("http-cli-send-thread"); + uv_run(http->loop, UV_RUN_DEFAULT); + return NULL; +} + +static void httpAsyncCb(uv_async_t* handle) { + SAsyncItem* item = handle->data; + SHttpModule* http = item->pThrd; + + SHttpMsg* msg = NULL; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + } +} + +static void transHttpEnvInit() { + http = taosMemoryMalloc(sizeof(SHttpModule)); + + http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + uv_loop_init(http->loop); + + http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); + + int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); + if (err != 0) { + taosMemoryFree(http->loop); + taosMemoryFree(http); + http = NULL; + } +} +static void transHttpEnvDestroy() { + if (http == NULL) return; + + transAsyncPoolDestroy(http->asyncPool); + taosMemoryFree(http->loop); + taosMemoryFree(http); +} + typedef struct SHttpClient { uv_connect_t conn; uv_tcp_t tcp; @@ -127,6 +196,8 @@ _OVER: } static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { + taosMemoryFree(cli->wbuf[0].base); + taosMemoryFree(cli->wbuf[1].base); taosMemoryFree(cli->wbuf); taosMemoryFree(cli->rbuf); taosMemoryFree(cli->addr); @@ -230,12 +301,13 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 } terrno = 0; - char header[2048] = {0}; - int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); + int32_t len = 2048; + char* header = taosMemoryCalloc(1, len); + int32_t headLen = taosBuildHttpHeader(server, contLen, header, len, flag); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); - wb[0] = uv_buf_init((char*)header, headLen); // stack var - wb[1] = uv_buf_init((char*)pCont, contLen); // heap var + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var + wb[1] = uv_buf_init((char*)pCont, contLen); // heap var SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); cli->conn.data = cli; @@ -281,3 +353,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 taosMemoryFree(loop); return terrno; } +int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); + + msg->server = strdup(server); + msg->port = port; + msg->cont = taosMemoryMalloc(contLen); + memcpy(msg->cont, pCont, contLen); + msg->flag = flag; + + transAsyncSend(http->asyncPool, &(msg->q)); + return 0; +} diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 28660934f8..97915e1ded 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -187,18 +187,8 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ } while (0) -#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \ - do { \ - if (exh == NULL) { \ - idx = -1; \ - } else { \ - ASYNC_CHECK_HANDLE((exh), refId); \ - pThrd = (SCliThrd*)(exh)->pThrd; \ - } \ - } while (0) -#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) -#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -217,6 +207,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) + #define CONN_GET_NEXT_SENDMSG(conn) \ do { \ int i = 0; \ @@ -231,21 +222,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ } while (0) -#define CONN_HANDLE_THREAD_QUIT(thrd) \ - do { \ - if (thrd->quit) { \ - return; \ - } \ - } while (0) - -#define CONN_HANDLE_BROKEN(conn) \ - do { \ - if (conn->broken) { \ - cliHandleExcept(conn); \ - return; \ - } \ - } while (0) - #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ if (conn->status == ConnNormal) { \ From 9322928a570354a54f82f184df3fdae591140d90 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 22:41:25 +0800 Subject: [PATCH 02/13] opt http module --- source/libs/transport/src/thttp.c | 65 ++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 6715290acf..7dffb8d192 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -45,6 +45,8 @@ typedef struct SHttpMsg { static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static SHttpModule* http = NULL; +static void httpHandleReq(SHttpMsg* msg); + static void* httpThread(void* arg) { SHttpModule* http = (SHttpModule*)arg; setThreadName("http-cli-send-thread"); @@ -52,6 +54,13 @@ static void* httpThread(void* arg) { return NULL; } +static void httpDestroyMsg(SHttpMsg* msg) { + if (msg == NULL) return; + + taosMemoryFree(msg->server); + taosMemoryFree(msg->cont); + taosMemoryFree(msg); +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; @@ -68,6 +77,7 @@ static void httpAsyncCb(uv_async_t* handle) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); msg = QUEUE_DATA(h, SHttpMsg, q); + httpHandleReq(msg); } } @@ -355,7 +365,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 } int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); - msg->server = strdup(server); msg->port = port; msg->cont = taosMemoryMalloc(contLen); @@ -365,3 +374,57 @@ int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, i transAsyncSend(http->asyncPool, &(msg->q)); return 0; } +static void httpHandleReq(SHttpMsg* msg) { + struct sockaddr_in dest = {0}; + if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { + httpDestroyMsg(msg); + return; + } + if (msg->flag == HTTP_GZIP) { + int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); + if (dstLen > 0) { + msg->len = dstLen; + } else { + msg->flag = HTTP_FLAT; + } + } + + terrno = 0; + + int32_t len = 2048; + char* header = taosMemoryCalloc(1, len); + int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag); + + uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var + wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var + + SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + cli->conn.data = cli; + cli->tcp.data = cli; + cli->req.data = cli; + cli->wbuf = wb; + cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); + cli->addr = msg->server; + cli->port = msg->port; + + taosMemoryFree(msg); + + uv_tcp_init(http->loop, &cli->tcp); + // set up timeout to avoid stuck; + int32_t fd = taosCreateSocketWithTimeout(5); + + int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + if (ret != 0) { + uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + destroyHttpClient(cli); + return; + } + + ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); + if (ret != 0) { + uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, + cli->port); + destroyHttpClient(cli); + } +} From c5fba9ccbd6a38e26b653b6bf651834fdcce67de Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 23:17:22 +0800 Subject: [PATCH 03/13] opt http module --- source/libs/transport/src/thttp.c | 74 +++---------------------------- 1 file changed, 7 insertions(+), 67 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 7dffb8d192..0c4492e1f0 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -45,7 +45,9 @@ typedef struct SHttpMsg { static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static SHttpModule* http = NULL; -static void httpHandleReq(SHttpMsg* msg); +static void httpHandleReq(SHttpMsg* msg); +static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag); static void* httpThread(void* arg) { SHttpModule* http = (SHttpModule*)arg; @@ -297,73 +299,11 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, return 0; } int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - struct sockaddr_in dest = {0}; - if (taosBuildDstAddr(server, port, &dest) < 0) { - return -1; - } - if (flag == HTTP_GZIP) { - int32_t dstLen = taosCompressHttpRport(pCont, contLen); - if (dstLen > 0) { - contLen = dstLen; - } else { - flag = HTTP_FLAT; - } - } - terrno = 0; - - int32_t len = 2048; - char* header = taosMemoryCalloc(1, len); - int32_t headLen = taosBuildHttpHeader(server, contLen, header, len, flag); - - uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); - wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var - wb[1] = uv_buf_init((char*)pCont, contLen); // heap var - - SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); - cli->conn.data = cli; - cli->tcp.data = cli; - cli->req.data = cli; - cli->wbuf = wb; - cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); - cli->addr = tstrdup(server); - cli->port = port; - - uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); - int err = uv_loop_init(loop); - if (err != 0) { - uError("http-report failed to init uv_loop, reason: %s", uv_strerror(err)); - taosMemoryFree(loop); - terrno = TAOS_SYSTEM_ERROR(err); - destroyHttpClient(cli); - return terrno; - } - uv_tcp_init(loop, &cli->tcp); - // set up timeout to avoid stuck; - int32_t fd = taosCreateSocketWithTimeout(5); - - int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); - if (ret != 0) { - uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); - destroyHttpClient(cli); - uv_stop(loop); - terrno = TAOS_SYSTEM_ERROR(ret); - } else { - ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); - if (ret != 0) { - uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, - cli->port); - destroyHttpClient(cli); - uv_stop(loop); - terrno = TAOS_SYSTEM_ERROR(ret); - } - } - - uv_run(loop, UV_RUN_DEFAULT); - uv_loop_close(loop); - taosMemoryFree(loop); - return terrno; + taosThreadOnce(&transHttpInit, transHttpEnvInit); + return taosSendHttpReportImpl(server, port, pCont, contLen, flag); } -int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { +static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag) { SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); msg->server = strdup(server); msg->port = port; From 8831f386e68fe76d1bcf3dbbd527fb0163c00e37 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 31 Oct 2022 11:57:58 +0800 Subject: [PATCH 04/13] opt http module --- source/libs/transport/src/thttp.c | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 0c4492e1f0..fc61fb5122 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -107,13 +107,14 @@ static void transHttpEnvDestroy() { } typedef struct SHttpClient { - uv_connect_t conn; - uv_tcp_t tcp; - uv_write_t req; - uv_buf_t* wbuf; - char* rbuf; - char* addr; - uint16_t port; + uv_connect_t conn; + uv_tcp_t tcp; + uv_write_t req; + uv_buf_t* wbuf; + char* rbuf; + char* addr; + uint16_t port; + struct sockaddr_in dest; } SHttpClient; static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, @@ -240,8 +241,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const static void clientSentCb(uv_write_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - terrno = TAOS_SYSTEM_ERROR(status); - uError("http-report failed to send data %s", uv_strerror(status)); + uError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } else { @@ -253,7 +253,6 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { - terrno = TAOS_SYSTEM_ERROR(status); uError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); @@ -265,7 +264,6 @@ static void clientSentCb(uv_write_t* req, int32_t status) { static void clientConnCb(uv_connect_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - terrno = TAOS_SYSTEM_ERROR(status); uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); @@ -276,7 +274,6 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { } status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { - terrno = TAOS_SYSTEM_ERROR(status); uError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); @@ -289,7 +286,6 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { - terrno = TAOS_SYSTEM_ERROR(errno); uError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); return -1; } @@ -309,6 +305,7 @@ static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* p msg->port = port; msg->cont = taosMemoryMalloc(contLen); memcpy(msg->cont, pCont, contLen); + msg->len = contLen; msg->flag = flag; transAsyncSend(http->asyncPool, &(msg->q)); @@ -329,8 +326,6 @@ static void httpHandleReq(SHttpMsg* msg) { } } - terrno = 0; - int32_t len = 2048; char* header = taosMemoryCalloc(1, len); int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag); @@ -347,6 +342,7 @@ static void httpHandleReq(SHttpMsg* msg) { cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); cli->addr = msg->server; cli->port = msg->port; + cli->dest = dest; taosMemoryFree(msg); @@ -361,7 +357,7 @@ static void httpHandleReq(SHttpMsg* msg) { return; } - ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); + ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); From 6b744129948c2ccd356548b5848083d92df853b8 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 31 Oct 2022 16:30:42 +0800 Subject: [PATCH 05/13] refactor(sync): print fatal log when commit error --- source/libs/sync/src/syncMain.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 02e5c643a4..82ae8d7e51 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3380,6 +3380,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // ASSERT(pEntry != NULL); if (code != 0 || pEntry == NULL) { syncNodeErrorLog(ths, "get log entry error"); + sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr()); continue; } } From aa2b54c57c11fcf37cccb269b370f5e43041e0ee Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 31 Oct 2022 16:45:39 +0800 Subject: [PATCH 06/13] enhance: add configuration for udfd LD_LIBRARY_PATH to taos.cfg --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 6 ++++-- source/libs/function/src/tudf.c | 21 ++++++++++++++++++++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index cb4426f8a9..7df6124152 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -119,6 +119,7 @@ extern SDiskCfg tsDiskCfg[]; // udf extern bool tsStartUdfd; extern char tsUdfdResFuncs[]; +extern char tsUdfdLdLibPath[]; // schemaless extern char tsSmlChildTableName[]; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index fbb9e04a25..660730f0dc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -163,7 +163,8 @@ int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds -char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits +char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits +char tsUdfdLdLibPath[512] = ""; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -424,6 +425,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; + if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; GRANT_CFG_ADD; return 0; } @@ -722,7 +724,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); - + tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index ea59e92e98..eff3c6cc7b 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -117,10 +117,29 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { char dnodeIdEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0}; snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); + float numCpuCores = 4; taosGetCpuCores(&numCpuCores); snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2); - char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; + + char pathTaosdLdLib[512] = {0}; + size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); + uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); + + char udfdPathLdLib[1024] = {0}; + size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath); + strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen); + udfdPathLdLib[udfdLdLibPathLen] = ':'; + strncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen); + if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) { + fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib); + } else { + fnError("can not set correct udfd LD_LIBRARY_PATH"); + } + char ldLibPathEnvItem[1024 + 32] = {0}; + snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", ldLibPathEnvItem); + + char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, NULL}; options.env = envUdfd; int err = uv_spawn(&pData->loop, &pData->process, &options); From d246514da28d742614cbffdf45623b2e017bb650 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 31 Oct 2022 18:16:58 +0800 Subject: [PATCH 07/13] refactor(sync): check msgcb, putToQueueFp NULL --- source/dnode/mnode/impl/src/mndSync.c | 37 ++++++++++++++++++++++++-- source/dnode/vnode/src/vnd/vnodeSync.c | 16 +++++++++-- source/libs/sync/src/syncMain.c | 8 +++--- source/libs/sync/src/syncTimeout.c | 2 +- 4 files changed, 54 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index df4b526775..6853ff5ccc 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,11 +17,44 @@ #include "mndSync.h" #include "mndTrans.h" -static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { +static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return -1; + } + + int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg); + if (code != 0) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; +} + +static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + return -1; + } + int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); @@ -212,7 +245,7 @@ int32_t mndInitSync(SMnode *pMnode) { .msgcb = NULL, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg, - .FpEqCtrlMsg = NULL, + .FpEqCtrlMsg = mndSyncEqCtrlMsg, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7acf5b4003..4af89dfa38 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -365,7 +365,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (msgcb == NULL) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; return -1; } @@ -378,7 +384,13 @@ static int32_t vnodeSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { } static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - if (msgcb == NULL) { + if (pMsg == NULL || pMsg->pCont == NULL) { + return -1; + } + + if (msgcb == NULL || msgcb->putToQueueFp == NULL) { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; return -1; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 82ae8d7e51..b4d939e980 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -508,7 +508,7 @@ int32_t syncEndSnapshot(int64_t rid) { SSyncLogStoreData* pData = pSyncNode->pLogStore->data; code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno)); + sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr()); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return -1; @@ -2793,7 +2793,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { + if (pSyncNode->FpEqMsg != NULL && pSyncNode->msgcb != NULL && pSyncNode->msgcb->putToQueueFp != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); @@ -3379,8 +3379,8 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // ASSERT(code == 0); // ASSERT(pEntry != NULL); if (code != 0 || pEntry == NULL) { - syncNodeErrorLog(ths, "get log entry error"); - sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr()); + syncNodeErrorLog(ths, "get log entry error"); + sFatal("vgId:%d, get log entry %" PRId64 " error when commit since %s", ths->vgId, i, terrstr()); continue; } } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 5ff73a6406..1c08217099 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -76,7 +76,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) { SSyncLogStoreData* pData = ths->pLogStore->data; int32_t code = walEndSnapshot(pData->pWal); if (code != 0) { - sError("vgId:%d, wal snapshot end error since:%s", ths->vgId, terrstr(terrno)); + sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr()); return -1; } else { do { From 06c06d290f8878b30a34e9061d82dcbdf3ee3918 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 31 Oct 2022 18:45:10 +0800 Subject: [PATCH 08/13] opt http module --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/thttp.c | 280 +++++++++++++++++--------- source/libs/transport/src/trans.c | 2 + 3 files changed, 183 insertions(+), 100 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index b83f84e3f2..2354f0f959 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -428,6 +428,7 @@ void transDestoryExHandle(void* handle); int32_t transGetRefMgt(); int32_t transGetInstMgt(); +void transHttpEnvDestroy(); #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index fc61fb5122..9526ad472a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -39,73 +39,11 @@ typedef struct SHttpMsg { char* cont; int32_t len; EHttpCompFlag flag; + int8_t quit; + SHttpModule* http; } SHttpMsg; -static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; -static SHttpModule* http = NULL; - -static void httpHandleReq(SHttpMsg* msg); -static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag); - -static void* httpThread(void* arg) { - SHttpModule* http = (SHttpModule*)arg; - setThreadName("http-cli-send-thread"); - uv_run(http->loop, UV_RUN_DEFAULT); - return NULL; -} - -static void httpDestroyMsg(SHttpMsg* msg) { - if (msg == NULL) return; - - taosMemoryFree(msg->server); - taosMemoryFree(msg->cont); - taosMemoryFree(msg); -} -static void httpAsyncCb(uv_async_t* handle) { - SAsyncItem* item = handle->data; - SHttpModule* http = item->pThrd; - - SHttpMsg* msg = NULL; - - queue wq; - taosThreadMutexLock(&item->mtx); - QUEUE_MOVE(&item->qmsg, &wq); - taosThreadMutexUnlock(&item->mtx); - - int count = 0; - while (!QUEUE_IS_EMPTY(&wq)) { - queue* h = QUEUE_HEAD(&wq); - QUEUE_REMOVE(h); - msg = QUEUE_DATA(h, SHttpMsg, q); - httpHandleReq(msg); - } -} - -static void transHttpEnvInit() { - http = taosMemoryMalloc(sizeof(SHttpModule)); - - http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); - uv_loop_init(http->loop); - - http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - - int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); - if (err != 0) { - taosMemoryFree(http->loop); - taosMemoryFree(http); - http = NULL; - } -} -static void transHttpEnvDestroy() { - if (http == NULL) return; - - transAsyncPoolDestroy(http->asyncPool); - taosMemoryFree(http->loop); - taosMemoryFree(http); -} - typedef struct SHttpClient { uv_connect_t conn; uv_tcp_t tcp; @@ -117,6 +55,17 @@ typedef struct SHttpClient { struct sockaddr_in dest; } SHttpClient; +static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; +static SHttpModule* thttp = NULL; +static void transHttpEnvInit(); + +static void httpHandleReq(SHttpMsg* msg); +static void httpHandleQuit(SHttpMsg* msg); +static int32_t httpSendQuit(); + +static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag); + static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { if (flag == HTTP_FLAT) { @@ -135,6 +84,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH "Content-Length: %d\n\n", server, contLen); } else { + terrno = TSDB_CODE_INVALID_CFG; return -1; } } @@ -208,6 +158,57 @@ _OVER: return code; } +static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { + uint32_t ip = taosGetIpv4FromFqdn(server); + if (ip == 0xffffffff) { + tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); + return -1; + } + char buf[128] = {0}; + tinet_ntoa(buf, ip); + uv_ip4_addr(buf, port, dest); + return 0; +} + +static void* httpThread(void* arg) { + SHttpModule* http = (SHttpModule*)arg; + setThreadName("http-cli-send-thread"); + uv_run(http->loop, UV_RUN_DEFAULT); + return NULL; +} + +static void httpDestroyMsg(SHttpMsg* msg) { + if (msg == NULL) return; + + taosMemoryFree(msg->server); + taosMemoryFree(msg->cont); + taosMemoryFree(msg); +} +static void httpAsyncCb(uv_async_t* handle) { + SAsyncItem* item = handle->data; + SHttpModule* http = item->pThrd; + + SHttpMsg *msg = NULL, *quitMsg = NULL; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + if (msg->quit) { + quitMsg = msg; + } else { + httpHandleReq(msg); + } + } + if (quitMsg) httpHandleQuit(quitMsg); +} + static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { taosMemoryFree(cli->wbuf[0].base); taosMemoryFree(cli->wbuf[1].base); @@ -216,88 +217,82 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { taosMemoryFree(cli->addr); taosMemoryFree(cli); } + static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) { SHttpClient* cli = handle->data; destroyHttpClient(cli); } + static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SHttpClient* cli = handle->data; buf->base = cli->rbuf; buf->len = HTTP_RECV_BUF_SIZE; } + static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { SHttpClient* cli = handle->data; if (nread < 0) { - uError("http-report recv error:%s", uv_err_name(nread)); + tError("http-report recv error:%s", uv_err_name(nread)); } else { - uTrace("http-report succ to recv %d bytes", (int32_t)nread); + tTrace("http-report succ to recv %d bytes", (int32_t)nread); } if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); } } static void clientSentCb(uv_write_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - uError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); } return; } else { - uTrace("http-report succ to send data"); + tTrace("http-report succ to send data"); } status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { - uError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); } } } static void clientConnCb(uv_connect_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); } return; } status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { - uError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); - } else { - destroyHttpClient(cli); } } } -static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { - uint32_t ip = taosGetIpv4FromFqdn(server); - if (ip == 0xffffffff) { - uError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); +int32_t httpSendQuit() { + SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); + msg->quit = 1; + + SHttpModule* load = atomic_load_ptr(&thttp); + if (load == NULL) { + httpDestroyMsg(msg); + tError("http-report already released"); return -1; + } else { + msg->http = load; } - char buf[128] = {0}; - tinet_ntoa(buf, ip); - uv_ip4_addr(buf, port, dest); + transAsyncSend(load->asyncPool, &(msg->q)); return 0; } -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - taosThreadOnce(&transHttpInit, transHttpEnvInit); - return taosSendHttpReportImpl(server, port, pCont, contLen, flag); -} + static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); @@ -307,15 +302,49 @@ static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* p memcpy(msg->cont, pCont, contLen); msg->len = contLen; msg->flag = flag; + msg->quit = 0; + + SHttpModule* load = atomic_load_ptr(&thttp); + if (load == NULL) { + httpDestroyMsg(msg); + tError("http-report already released"); + return -1; + } else { + msg->http = load; + transAsyncSend(load->asyncPool, &(msg->q)); + } - transAsyncSend(http->asyncPool, &(msg->q)); return 0; } + +static void httpDestroyClientCb(uv_handle_t* handle) { + SHttpClient* http = handle->data; + destroyHttpClient(http); +} +static void httpWalkCb(uv_handle_t* handle, void* arg) { + // impl later + if (!uv_is_closing(handle)) { + uv_handle_type type = uv_handle_get_type(handle); + if (uv_handle_get_type(handle) == UV_TCP) { + uv_close(handle, httpDestroyClientCb); + } else { + uv_close(handle, NULL); + } + } + return; +} +static void httpHandleQuit(SHttpMsg* msg) { + SHttpModule* http = msg->http; + taosMemoryFree(msg); + + uv_walk(http->loop, httpWalkCb, NULL); +} static void httpHandleReq(SHttpMsg* msg) { + SHttpModule* http = msg->http; + struct sockaddr_in dest = {0}; - if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { - httpDestroyMsg(msg); - return; + if (taosBuildDstAddr(msg->server, msg->port + 1, &dest) < 0) { + goto END; } if (msg->flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); @@ -324,11 +353,18 @@ static void httpHandleReq(SHttpMsg* msg) { } else { msg->flag = HTTP_FLAT; } + if (dstLen < 0) { + goto END; + } } int32_t len = 2048; char* header = taosMemoryCalloc(1, len); int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag); + if (headLen < 0) { + taosMemoryFree(header); + goto END; + } uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var @@ -347,20 +383,64 @@ static void httpHandleReq(SHttpMsg* msg) { taosMemoryFree(msg); uv_tcp_init(http->loop, &cli->tcp); + // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5); - - int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); destroyHttpClient(cli); return; } ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { - uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, + tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); destroyHttpClient(cli); } + return; + +END: + tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + httpDestroyMsg(msg); +} + +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + taosThreadOnce(&transHttpInit, transHttpEnvInit); + return taosSendHttpReportImpl(server, port, pCont, contLen, flag); +} + +static void transHttpEnvInit() { + SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); + + http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + uv_loop_init(http->loop); + + http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); + + int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); + if (err != 0) { + taosMemoryFree(http->loop); + taosMemoryFree(http); + http = NULL; + } + atomic_store_ptr(&thttp, http); +} + +void transHttpEnvDestroy() { + SHttpModule* load = atomic_load_ptr(&thttp); + if (load == NULL) { + return; + } + httpSendQuit(); + taosThreadJoin(load->thread, NULL); + + TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg); + transAsyncPoolDestroy(load->asyncPool); + uv_loop_close(load->loop); + taosMemoryFree(load->loop); + taosMemoryFree(load); + + atomic_store_ptr(&thttp, NULL); } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 756a8ff2cf..d3db4879d1 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -172,6 +172,8 @@ int32_t rpcInit() { } void rpcCleanup(void) { transCleanup(); + transHttpEnvDestroy(); + return; } From d21d88becc44ec8789b8acda81fb6af1ff3c5c60 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 31 Oct 2022 18:50:19 +0800 Subject: [PATCH 09/13] opt http module --- source/libs/transport/src/thttp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 9526ad472a..00854b5ee5 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -343,7 +343,7 @@ static void httpHandleReq(SHttpMsg* msg) { SHttpModule* http = msg->http; struct sockaddr_in dest = {0}; - if (taosBuildDstAddr(msg->server, msg->port + 1, &dest) < 0) { + if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { goto END; } if (msg->flag == HTTP_GZIP) { From 392564cc7dd44b3ccce1d806ca84acec0a4e4ba9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 31 Oct 2022 23:40:43 +0800 Subject: [PATCH 10/13] enh: refact syncEnv code --- include/libs/sync/sync.h | 6 +- include/libs/sync/syncTools.h | 4 - include/util/tref.h | 3 +- source/dnode/mnode/impl/src/mndMain.c | 9 +- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- source/libs/sync/inc/syncEnv.h | 16 +- source/libs/sync/inc/syncInt.h | 8 +- source/libs/sync/src/syncEnv.c | 184 +++++----- source/libs/sync/src/syncMain.c | 320 ++++++------------ source/libs/sync/test/syncElectTest.cpp | 2 +- source/libs/sync/test/syncEncodeTest.cpp | 2 +- source/libs/sync/test/syncEnqTest.cpp | 2 +- source/libs/sync/test/syncEnvTest.cpp | 6 +- source/libs/sync/test/syncIOSendMsgTest.cpp | 2 +- source/libs/sync/test/syncInitTest.cpp | 4 +- source/libs/sync/test/syncPingSelfTest.cpp | 2 +- source/libs/sync/test/syncPingTimerTest.cpp | 2 +- source/libs/sync/test/syncPingTimerTest2.cpp | 2 +- source/libs/sync/test/syncSnapshotTest.cpp | 2 +- .../libs/sync/test/syncVotesGrantedTest.cpp | 2 +- .../libs/sync/test/syncVotesRespondTest.cpp | 2 +- source/libs/sync/test/syncWriteTest.cpp | 2 +- source/util/src/tref.c | 2 +- 23 files changed, 224 insertions(+), 364 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 95ee2ca2bc..1a94dcf426 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -200,10 +200,10 @@ typedef struct SSyncInfo { int32_t syncInit(); void syncCleanUp(); +bool syncIsInit(); int64_t syncOpen(SSyncInfo* pSyncInfo); void syncStart(int64_t rid); void syncStop(int64_t rid); -int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); bool syncIsReady(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); @@ -216,7 +216,6 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); // int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); -bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); @@ -234,6 +233,9 @@ int32_t syncEndSnapshot(int64_t rid); int32_t syncStepDown(int64_t rid, SyncTerm newTerm); +SSyncNode* syncNodeAcquire(int64_t rid); +void syncNodeRelease(SSyncNode* pNode); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index a1cff2b738..9cb8a9d564 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -28,10 +28,6 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -// ------------------ control ------------------- -SSyncNode* syncNodeAcquire(int64_t rid); -void syncNodeRelease(SSyncNode* pNode); - int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); diff --git a/include/util/tref.h b/include/util/tref.h index c2cc54cb07..c4b2ec8fa7 100644 --- a/include/util/tref.h +++ b/include/util/tref.h @@ -25,7 +25,8 @@ extern "C" { // open a reference set, max is the mod used by hash, fp is the pointer to free resource function // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately -int32_t taosOpenRef(int32_t max, void (*fp)(void *)); +typedef void (*RefFp)(void *); +int32_t taosOpenRef(int32_t max, RefFp fp); // close the reference set, refId is the return value by taosOpenRef // return 0 if success. On error, -1 is returned, and terrno is set appropriately diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 98a24286f6..53e7b1cd26 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -481,7 +481,7 @@ int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) { mInfo("vgId:%d, process sync ctrl msg", 1); - if (!syncEnvIsStart()) { + if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -518,7 +518,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -581,11 +581,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { - code = syncSetStandby(pMgmt->sync); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - } else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) { SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg); code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index bf665fd6db..339be16c91 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -234,7 +234,7 @@ int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); terrno = TSDB_CODE_APP_ERROR; return -1; @@ -277,7 +277,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = 0; const STraceId *trace = &pMsg->info.traceId; - if (!syncEnvIsStart()) { + if (!syncIsInit()) { vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg); terrno = TSDB_CODE_APP_ERROR; return -1; diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 06da0eb3df..068ccaf029 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -20,13 +20,7 @@ extern "C" { #endif -#include -#include -#include #include "syncInt.h" -#include "taosdef.h" -#include "trpc.h" -#include "ttimer.h" #define TIMER_MAX_MS 0x7FFFFFFF #define ENV_TICK_TIMER_MS 1000 @@ -57,12 +51,12 @@ typedef struct SSyncEnv { } SSyncEnv; -extern SSyncEnv* gSyncEnv; +SSyncEnv* syncEnv(); -int32_t syncEnvStart(); -int32_t syncEnvStop(); -int32_t syncEnvStartTimer(); -int32_t syncEnvStopTimer(); +int64_t syncNodeAdd(SSyncNode* pNode); +void syncNodeRemove(SSyncNode* pNode); +SSyncNode* syncNodeAcquire(int64_t rid); +void syncNodeRelease(SSyncNode* pNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index f4949e1016..843dee1342 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -24,6 +24,8 @@ extern "C" { #include "syncTools.h" #include "tlog.h" #include "ttimer.h" +#include "taosdef.h" +#include "ttimer.h" // clang-format off #define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) @@ -255,9 +257,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, Sync SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); char* syncNodePeerState2Str(const SSyncNode* pSyncNode); -SSyncNode* syncNodeAcquire(int64_t rid); -void syncNodeRelease(SSyncNode* pNode); - // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); @@ -302,9 +301,6 @@ bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); -void syncStartNormal(int64_t rid); -void syncStartStandBy(int64_t rid); - bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index fb0bfb8bef..4d4cb8ab69 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -13,118 +13,114 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncEnv.h" -// #include +#include "tref.h" -SSyncEnv *gSyncEnv = NULL; +static SSyncEnv gSyncEnv = {0}; +static int32_t gNodeRefId = -1; +bool gRaftDetailLog = false; +static void syncEnvTick(void *param, void *tmrId); -// local function ----------------- -static SSyncEnv *doSyncEnvStart(); -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); -static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv); -static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv); -static void syncEnvTick(void *param, void *tmrId); -// -------------------------------- +SSyncEnv *syncEnv() { return &gSyncEnv; } -bool syncEnvIsStart() { - if (gSyncEnv == NULL) { - return false; - } +bool syncIsInit() { return atomic_load_8(&gSyncEnv.isStart); } - return atomic_load_8(&(gSyncEnv->isStart)); -} +int32_t syncInit() { + if (syncIsInit()) return 0; -int32_t syncEnvStart() { - int32_t ret = 0; uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF); taosSeedRand(seed); - // gSyncEnv = doSyncEnvStart(gSyncEnv); - gSyncEnv = doSyncEnvStart(); - ASSERT(gSyncEnv != NULL); - sTrace("sync env start ok"); - return ret; -} -int32_t syncEnvStop() { - int32_t ret = doSyncEnvStop(gSyncEnv); - return ret; -} - -int32_t syncEnvStartTimer() { - int32_t ret = doSyncEnvStartTimer(gSyncEnv); - return ret; -} - -int32_t syncEnvStopTimer() { - int32_t ret = doSyncEnvStopTimer(gSyncEnv); - return ret; -} - -// local function ----------------- -static void syncEnvTick(void *param, void *tmrId) { - SSyncEnv *pSyncEnv = (SSyncEnv *)param; - if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { - ++(pSyncEnv->envTickTimerCounter); - sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 - ", envTickTimerCounter:%" PRIu64 - ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); - - // do something, tick ... - taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); - } else { - sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 - ", envTickTimerCounter:%" PRIu64 - ", " - "envTickTimerMS:%d, tmrId:%p", - pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS, tmrId); - } -} - -static SSyncEnv *doSyncEnvStart() { - SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv)); - ASSERT(pSyncEnv != NULL); - memset(pSyncEnv, 0, sizeof(SSyncEnv)); - - pSyncEnv->envTickTimerCounter = 0; - pSyncEnv->envTickTimerMS = ENV_TICK_TIMER_MS; - pSyncEnv->FpEnvTickTimer = syncEnvTick; - atomic_store_64(&pSyncEnv->envTickTimerLogicClock, 0); - atomic_store_64(&pSyncEnv->envTickTimerLogicClockUser, 0); + memset(&gSyncEnv, 0, sizeof(SSyncEnv)); + gSyncEnv.envTickTimerCounter = 0; + gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS; + gSyncEnv.FpEnvTickTimer = syncEnvTick; + atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0); + atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0); // start tmr thread - pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + atomic_store_8(&gSyncEnv.isStart, 1); - atomic_store_8(&(pSyncEnv->isStart), 1); - return pSyncEnv; -} - -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { - ASSERT(pSyncEnv == gSyncEnv); - if (pSyncEnv != NULL) { - atomic_store_8(&(pSyncEnv->isStart), 0); - taosTmrCleanUp(pSyncEnv->pTimerManager); - taosMemoryFree(pSyncEnv); + gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose); + if (gNodeRefId < 0) { + sError("failed to init node ref"); + syncCleanUp(); + return -1; } - gSyncEnv = NULL; + + sDebug("sync rsetId:%d is open", gNodeRefId); return 0; } -static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { - int32_t ret = 0; - taosTmrReset(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, - &pSyncEnv->pEnvTickTimer); - atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser); - return ret; +void syncCleanUp() { + atomic_store_8(&gSyncEnv.isStart, 0); + taosTmrCleanUp(gSyncEnv.pTimerManager); + memset(&gSyncEnv, 0, sizeof(SSyncEnv)); + + if (gNodeRefId != -1) { + sDebug("sync rsetId:%d is closed", gNodeRefId); + taosCloseRef(gNodeRefId); + gNodeRefId = -1; + } } -static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { +int64_t syncNodeAdd(SSyncNode *pNode) { + pNode->rid = taosAddRef(gNodeRefId, pNode); + if (pNode->rid < 0) return -1; + + sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId); + return 0; +} + +void syncNodeRemove(SSyncNode *pNode) { + taosRemoveRef(gNodeRefId, pNode->rid); + sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId); +} + +SSyncNode *syncNodeAcquire(int64_t rid) { + SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid); + if (pNode == NULL) { + sTrace("failed to acquire node from refId:%" PRId64, rid); + } + + return pNode; +} + +void syncNodeRelease(SSyncNode *pNode) { taosReleaseRef(gNodeRefId, pNode->rid); } + +#if 0 +void syncEnvStartTimer() { + taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager, + &gSyncEnv.pEnvTickTimer); + atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser); +} + +void syncEnvStopTimer() { int32_t ret = 0; - atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); - taosTmrStop(pSyncEnv->pEnvTickTimer); - pSyncEnv->pEnvTickTimer = NULL; + atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1); + taosTmrStop(gSyncEnv.pEnvTickTimer); + gSyncEnv.pEnvTickTimer = NULL; return ret; } +#endif + +static void syncEnvTick(void *param, void *tmrId) { + SSyncEnv *pSyncEnv = param; + if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) { + gSyncEnv.envTickTimerCounter++; + sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p", + gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, + gSyncEnv.envTickTimerMS, tmrId); + + // do something, tick ... + taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer); + } else { + sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 + ", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p", + gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter, + gSyncEnv.envTickTimerMS, tmrId); + } +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7142e8fb22..9ff8917b0f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -33,11 +33,6 @@ #include "syncTimeout.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "tref.h" - -bool gRaftDetailLog = false; - -static int32_t tsNodeRefId = -1; // ------ local funciton --------- // enqueue message ---- @@ -52,138 +47,36 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); -// --------------------------------- -static void syncNodeFreeCb(void* param) { - syncNodeClose(param); - param = NULL; -} - -int32_t syncInit() { - int32_t ret = 0; - - if (!syncEnvIsStart()) { - tsNodeRefId = taosOpenRef(200, syncNodeFreeCb); - if (tsNodeRefId < 0) { - sError("failed to init node ref"); - syncCleanUp(); - ret = -1; - } else { - sDebug("sync rsetId:%d is open", tsNodeRefId); - ret = syncEnvStart(); - } - } - - return ret; -} - -void syncCleanUp() { - int32_t ret = syncEnvStop(); - ASSERT(ret == 0); - - if (tsNodeRefId != -1) { - sDebug("sync rsetId:%d is closed", tsNodeRefId); - taosCloseRef(tsNodeRefId); - tsNodeRefId = -1; - } -} - int64_t syncOpen(SSyncInfo* pSyncInfo) { - SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); - if (pSyncNode == NULL) { + SSyncNode* pNode = syncNodeOpen(pSyncInfo); + if (pNode == NULL) { sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; } - pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode); - if (pSyncNode->rid < 0) { - syncNodeClose(pSyncNode); - pSyncNode = NULL; + pNode->rid = syncNodeAdd(pNode); + if (pNode->rid < 0) { + syncNodeClose(pNode); return -1; } - sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId); - return pSyncNode->rid; + return pNode->rid; } void syncStart(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode != NULL) { + syncNodeStart(pNode); + syncNodeRelease(pNode); } - - if (pSyncNode->pRaftCfg->isStandBy) { - syncNodeStartStandBy(pSyncNode); - } else { - syncNodeStart(pSyncNode); - } - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncStartNormal(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; - } - syncNodeStart(pSyncNode); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); -} - -void syncStartStandBy(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return; - } - syncNodeStartStandBy(pSyncNode); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); } void syncStop(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) return; - int32_t vgId = pSyncNode->vgId; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - - taosRemoveRef(tsNodeRefId, rid); - sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", vgId, rid, tsNodeRefId); -} - -int32_t syncSetStandby(int64_t rid) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - sError("failed to set standby since accquire ref error, rid:%" PRId64, rid); - return -1; + SSyncNode* pNode = syncNodeAcquire(rid); + if (pNode != NULL) { + syncNodeRelease(pNode); + syncNodeRemove(pNode); } - - if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_SYN_IS_LEADER; - } else { - terrno = TSDB_CODE_SYN_STANDBY_NOT_READY; - } - sError("failed to set standby since it is not follower, state:%s rid:%" PRId64, syncStr(pSyncNode->state), rid); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return -1; - } - - // state change - pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - syncNodeStopHeartbeatTimer(pSyncNode); - - // reset elect timer, long enough - int32_t electMS = TIMER_MAX_MS; - int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); - ASSERT(ret == 0); - - pSyncNode->pRaftCfg->isStandBy = 1; - raftCfgPersist(pSyncNode->pRaftCfg); - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); - sInfo("vgId:%d, set to standby", pSyncNode->vgId); - return 0; } bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { @@ -204,7 +97,7 @@ bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { } int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -213,7 +106,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t ret = 0; if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; @@ -227,12 +120,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig); taosMemoryFree(newconfig); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -240,7 +133,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { ASSERT(rid == pSyncNode->rid); if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; sError("invalid new config. vgId:%d", pSyncNode->vgId); return -1; @@ -259,7 +152,7 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { taosMemoryFree(newconfig); ret = syncNodePropose(pSyncNode, &rpcMsg, false); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; #else syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg); @@ -275,13 +168,13 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { syncNodeReplicate(pSyncNode); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; #endif } int32_t syncLeaderTransfer(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -289,12 +182,12 @@ int32_t syncLeaderTransfer(int64_t rid) { ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodeLeaderTransfer(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -302,7 +195,7 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } @@ -358,7 +251,7 @@ char* syncNodePeerState2Str(const SSyncNode* pSyncNode) { } int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -382,7 +275,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { logNum, isEmpty); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -411,7 +304,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { syncNodeEventLog(pSyncNode, logBuf); } while (0); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } } @@ -424,7 +317,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { lastApplyIndex, pSyncNode->minMatchIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -433,7 +326,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%" PRId64 " candidate, do not delete wal", lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } else { @@ -442,7 +335,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { lastApplyIndex); syncNodeEventLog(pSyncNode, logBuf); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -491,12 +384,12 @@ _DEL_WAL: } } while (0); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return code; } int32_t syncEndSnapshot(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -510,7 +403,7 @@ int32_t syncEndSnapshot(int64_t rid) { if (code != 0) { sError("vgId:%d, wal snapshot end error since:%s", pSyncNode->vgId, terrstr(terrno)); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return -1; } else { do { @@ -524,12 +417,12 @@ int32_t syncEndSnapshot(int64_t rid) { } } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return code; } int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -538,7 +431,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { syncNodeStepDown(pSyncNode, newTerm); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -583,19 +476,19 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { } bool syncCanLeaderTransfer(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); if (pSyncNode->replicaNum == 1) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return false; } if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return true; } @@ -610,7 +503,7 @@ bool syncCanLeaderTransfer(int64_t rid) { } } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return matchOK; } @@ -620,25 +513,25 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) { } ESyncState syncGetMyRole(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); ESyncState state = pSyncNode->state; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return state; } bool syncIsReady(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); // if false, set error code if (false == b) { @@ -652,14 +545,14 @@ bool syncIsReady(int64_t rid) { } bool syncIsRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool b = pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return b; } @@ -668,7 +561,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho return -1; } - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -680,7 +573,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho if (pEntry != NULL) { syncEntryDestory(pEntry); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return -1; } ASSERT(pEntry != NULL); @@ -691,12 +584,12 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); syncEntryDestory(pEntry); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -705,12 +598,12 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return -1; } @@ -729,7 +622,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex, sMeta->lastConfigIndex); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return 0; } @@ -755,67 +648,67 @@ const char* syncGetMyRoleStr(int64_t rid) { } bool syncRestoreFinish(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return false; } ASSERT(rid == pSyncNode->rid); bool restoreFinish = pSyncNode->restoreFinish; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return restoreFinish; } SyncTerm syncGetMyTerm(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); SyncTerm term = pSyncNode->pRaftStore->currentTerm; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return term; } SyncIndex syncGetLastIndex(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return SYNC_INDEX_INVALID; } ASSERT(rid == pSyncNode->rid); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return lastIndex; } SyncIndex syncGetCommitIndex(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return SYNC_INDEX_INVALID; } ASSERT(rid == pSyncNode->rid); SyncIndex cmtIndex = pSyncNode->commitIndex; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return cmtIndex; } SyncGroupId syncGetVgId(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } ASSERT(rid == pSyncNode->rid); SyncGroupId vgId = pSyncNode->vgId; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return vgId; } void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { memset(pEpSet, 0, sizeof(*pEpSet)); return; @@ -831,11 +724,11 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { memset(pEpSet, 0, sizeof(*pEpSet)); return; @@ -854,11 +747,11 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { } sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } @@ -870,12 +763,12 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); } - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } @@ -888,12 +781,12 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) } sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid); return; @@ -901,24 +794,24 @@ void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { ASSERT(rid == pSyncNode->rid); pSyncNode->msgcb = msgcb; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } char* sync2SimpleStr(int64_t rid) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { sTrace("syncSetRpc get pSyncNode is NULL, rid:%" PRId64, rid); return NULL; } ASSERT(rid == pSyncNode->rid); char* s = syncNode2SimpleStr(pSyncNode); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return s; } void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } @@ -926,22 +819,22 @@ void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { pSyncNode->pingBaseLine = pingTimerMS; pSyncNode->pingTimerMS = pingTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void setElectTimerMS(int64_t rid, int32_t electTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } ASSERT(rid == pSyncNode->rid); pSyncNode->electBaseLine = electTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { return; } @@ -949,20 +842,20 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { pSyncNode->hbBaseLine = hbTimerMS; pSyncNode->heartbeatTimerMS = hbTimerMS; - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); } int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { - SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); + SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - taosReleaseRef(tsNodeRefId, rid); + syncNodeRelease(pSyncNode); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } ASSERT(rid == pSyncNode->rid); int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak); - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + syncNodeRelease(pSyncNode); return ret; } @@ -1084,7 +977,7 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; - if (syncEnvIsStart()) { + if (syncIsInit()) { SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); pData->pSyncNode = pSyncNode; pData->pTimer = pSyncTimer; @@ -1092,7 +985,7 @@ int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->logicClock = pSyncTimer->logicClock; pSyncTimer->pData = pData; - taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer); + taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); } @@ -1556,8 +1449,8 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); } else { @@ -1576,7 +1469,7 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; - if (syncEnvIsStart()) { + if (syncIsInit()) { pSyncNode->electTimerMS = ms; SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); @@ -1584,7 +1477,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { pElectTimer->pSyncNode = pSyncNode; pElectTimer->pData = NULL; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, gSyncEnv->pTimerManager, + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); } else { @@ -1632,8 +1525,8 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - if (syncEnvIsStart()) { - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); } else { @@ -2325,17 +2218,6 @@ _END: return; } -SSyncNode* syncNodeAcquire(int64_t rid) { - SSyncNode* pNode = taosAcquireRef(tsNodeRefId, rid); - if (pNode == NULL) { - sTrace("failed to acquire node from refId:%" PRId64, rid); - } - - return pNode; -} - -void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid); } - // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { @@ -2786,8 +2668,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pPingTimer); } else { sError("sync env is stop, syncNodeEqPingTimer"); @@ -2832,9 +2714,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { #if 0 // reset timer ms - if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) { + if (syncIsInit() && pSyncNode->electBaseLine > 0) { pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); } else { sError("sync env is stop, syncNodeEqElectTimer"); @@ -2869,8 +2751,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } syncTimeoutDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { sError("sync env is stop, syncNodeEqHeartbeatTimer"); @@ -2930,8 +2812,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { syncHeartbeatDestroy(pSyncMsg); - if (syncEnvIsStart()) { - taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, + if (syncIsInit()) { + taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("sync env is stop, syncNodeEqHeartbeatTimer"); diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index 862f7bd0ba..d09879a699 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -98,7 +98,7 @@ int main(int argc, char** argv) { init(); int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); char walPath[128]; diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 9f1a81e7ed..fdb5cf7ac8 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -152,7 +152,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); taosRemoveDir("./wal_test"); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 8461bfe9b7..191e245b1e 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index a7a819e046..404ab32d58 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -22,7 +22,7 @@ int main() { logTest(); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); for (int i = 0; i < 5; ++i) { @@ -37,8 +37,6 @@ int main() { taosMsleep(5000); } - ret = syncEnvStop(); - assert(ret == 0); - + syncCleanUp(); return 0; } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 630d96054b..4a457136c2 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index d0843151f4..d654ad06fe 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); @@ -91,7 +91,7 @@ int main(int argc, char** argv) { initRaftId(pSyncNode); syncNodeClose(pSyncNode); - syncEnvStop(); + syncCleanUp(); // syncIOStop(); // taosCloseLog(); diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index 99287bf7b0..f44cbb04d5 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index cd9440e3e2..fd6342aa84 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index fa09d04368..295003dff3 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 3dc8518072..50771ac476 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -179,7 +179,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); // taosRemoveDir(pWalDir); diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index d4885d0316..e2e8748697 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 77262dfc65..881a5331b1 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -82,7 +82,7 @@ int main(int argc, char** argv) { int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); SSyncNode* pSyncNode = syncInitTest(); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index b185f52f75..fee98ddd52 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -154,7 +154,7 @@ int main(int argc, char **argv) { int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); assert(ret == 0); - ret = syncEnvStart(); + ret = syncInit(); assert(ret == 0); taosRemoveDir("./wal_test"); diff --git a/source/util/src/tref.c b/source/util/src/tref.c index c984ef3f34..aa741b909a 100644 --- a/source/util/src/tref.c +++ b/source/util/src/tref.c @@ -57,7 +57,7 @@ static void taosIncRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet); static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove); -int32_t taosOpenRef(int32_t max, void (*fp)(void *)) { +int32_t taosOpenRef(int32_t max, RefFp fp) { SRefNode **nodeList; SRefSet *pSet; int64_t *lockedBy; From 91abd170e3911236108b9e31f2dafdc6ed1de416 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 31 Oct 2022 23:45:17 +0800 Subject: [PATCH 11/13] enh: refact syncEnv code --- source/libs/sync/src/syncEnv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 4d4cb8ab69..5b8acfa5ef 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -71,7 +71,7 @@ int64_t syncNodeAdd(SSyncNode *pNode) { if (pNode->rid < 0) return -1; sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId); - return 0; + return pNode->rid; } void syncNodeRemove(SSyncNode *pNode) { From 4356fbc3c5888b126f69a4890c728265d00db31e Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 1 Nov 2022 08:58:52 +0800 Subject: [PATCH 12/13] fix: error of setting ld_library_path --- source/libs/function/src/tudf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index eff3c6cc7b..459ee583a4 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -137,7 +137,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { fnError("can not set correct udfd LD_LIBRARY_PATH"); } char ldLibPathEnvItem[1024 + 32] = {0}; - snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", ldLibPathEnvItem); + snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib); char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, NULL}; options.env = envUdfd; From fbb1fa53030c536c7a61228f8d4b4ec2bd65f7ac Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Nov 2022 09:02:48 +0800 Subject: [PATCH 13/13] enh: refact syncEnv code --- source/libs/sync/inc/syncEnv.h | 2 +- source/libs/sync/src/syncEnv.c | 5 +---- source/libs/sync/src/syncMain.c | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 068ccaf029..55ce1470ce 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -54,7 +54,7 @@ typedef struct SSyncEnv { SSyncEnv* syncEnv(); int64_t syncNodeAdd(SSyncNode* pNode); -void syncNodeRemove(SSyncNode* pNode); +void syncNodeRemove(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 5b8acfa5ef..c55cd4fdac 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -74,10 +74,7 @@ int64_t syncNodeAdd(SSyncNode *pNode) { return pNode->rid; } -void syncNodeRemove(SSyncNode *pNode) { - taosRemoveRef(gNodeRefId, pNode->rid); - sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId); -} +void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); } SSyncNode *syncNodeAcquire(int64_t rid) { SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 42018ed750..89499a7c7d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -76,7 +76,7 @@ void syncStop(int64_t rid) { SSyncNode* pNode = syncNodeAcquire(rid); if (pNode != NULL) { syncNodeRelease(pNode); - syncNodeRemove(pNode); + syncNodeRemove(rid); } }