diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index bf34d3e2df..0f881beb66 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -61,7 +61,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); - tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, rsp->uid); + tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, + rsp->uid); if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); @@ -293,6 +294,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); tFreeClientHbBatchRsp(&pRsp); return -1; } @@ -322,6 +324,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { tFreeClientHbBatchRsp(&pRsp); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); return code; } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5e76e6bd83..ac54749ae1 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,7 +96,7 @@ typedef void* queue[2]; //#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit //#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) -#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) +#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fd42c14101..4fb00b1a6d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -25,7 +25,8 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; queue wreqQueue; - uv_timer_t* timer; + + uv_timer_t* timer; // read timer, forbidden void* hostThrd; @@ -79,6 +80,7 @@ typedef struct SCliThrd { uint64_t nextTimeout; // next timeout void* pTransInst; // + void (*destroyAhandleFp)(void* ahandle); SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; @@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); +// register conn timer +static void cliConnTimeout(uv_timer_t* handle); // register timer for read static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn @@ -155,6 +159,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, static FORCE_INLINE void destroyUserdata(STransMsg* userdata); static FORCE_INLINE void destroyCmsg(void* cmsg); +static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); @@ -258,16 +263,15 @@ static void* cliWorkThread(void* arg); static void cliReleaseUnfinishedMsg(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { conn->ctx.freeFunc(msg->ctx->ahandle); - } else if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) { + } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) { tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle); - pTransInst->destroyFp(msg->ctx->ahandle); + pThrd->destroyAhandleFp(msg->ctx->ahandle); } } destroyCmsg(msg); @@ -407,9 +411,16 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { bool once = false; do { SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); + if (pMsg == NULL && once) { break; } + + if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) { + destroyCmsg(pMsg); + break; + } + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransMsg transMsg = {0}; @@ -439,6 +450,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { continue; } } + if (pMsg == NULL || (pMsg && pMsg->type != Release)) { if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; @@ -454,6 +466,19 @@ void cliHandleExcept(SCliConn* conn) { cliHandleExceptImpl(conn, -1); } +void cliConnTimeout(uv_timer_t* handle) { + SCliConn* conn = handle->data; + SCliThrd* pThrd = conn->hostThrd; + + tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + + uv_timer_stop(handle); + handle->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + + conn->timer = NULL; + cliHandleExceptImpl(conn, -1); +} void cliReadTimeoutCb(uv_timer_t* handle) { // set up timeout cb SCliConn* conn = handle->data; @@ -545,7 +570,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->list->size >= 50) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; - arg->param2 = NULL; + arg->param2 = thrd; STrans* pTransInst = thrd->pTransInst; conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); @@ -630,8 +655,16 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - conn->connReq.data = conn; + uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; + if (timer == NULL) { + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + tDebug("no available timer, create a timer %p", timer); + uv_timer_init(pThrd->loop, timer); + } + timer->data = conn; + conn->timer = timer; + conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); @@ -661,8 +694,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } if (conn->timer != NULL) { uv_timer_stop(conn->timer); - taosArrayPush(pThrd->timerList, &conn->timer); conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; } @@ -811,6 +844,15 @@ _RETURN: void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; + SCliThrd* pThrd = pConn->hostThrd; + + if (pConn->timer != NULL) { + uv_timer_stop(pConn->timer); + pConn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &pConn->timer); + pConn->timer = NULL; + } + if (status != 0) { tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); cliHandleExcept(pConn); @@ -989,31 +1031,26 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - int ret = transSetConnOption((uv_tcp_t*)conn->stream); - if (ret) { - tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret)); - } - int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT); - if (fd == -1) { - tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn); - cliHandleExcept(conn); - return; - } - uv_tcp_open((uv_tcp_t*)conn->stream, fd); - struct sockaddr_in addr; addr.sin_family = AF_INET; - addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_port = (uint16_t)htons((uint16_t)conn->port); tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); - ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + + int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); + + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + cliHandleExcept(conn); return; } + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s conn %p ready", pTransInst->label, conn); @@ -1136,6 +1173,8 @@ static void* cliWorkThread(void* arg) { pThrd->pid = taosGetSelfPthreadId(); setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); + + tDebug("thread quit-thread:%08" PRId64, pThrd->pid); return NULL; } @@ -1177,6 +1216,25 @@ static FORCE_INLINE void destroyCmsg(void* arg) { taosMemoryFree(pMsg); } +static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { + if (param == NULL) return; + + STaskArg* arg = param; + SCliMsg* pMsg = arg->param1; + SCliThrd* pThrd = arg->param2; + + tDebug("destroy Ahandle A"); + if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + tDebug("destroy Ahandle B"); + pThrd->destroyAhandleFp(pMsg->ctx->ahandle); + } + tDebug("destroy Ahandle C"); + + transDestroyConnCtx(pMsg->ctx); + destroyUserdata(&pMsg->msg); + taosMemoryFree(pMsg); +} + static SCliThrd* createThrdObj(void* trans) { STrans* pTransInst = trans; @@ -1195,7 +1253,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->prepare->data = pThrd; // uv_prepare_start(pThrd->prepare, cliPrepareCb); - int32_t timerSize = 512; + int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -1211,6 +1269,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = trans; + pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; return pThrd; @@ -1226,9 +1285,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); transAsyncPoolDestroy(pThrd->asyncPool); - transDQDestroy(pThrd->delayQueue, destroyCmsg); + transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->timeoutQueue, NULL); + tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); taosMemoryFree(timer); @@ -1254,7 +1314,18 @@ void cliSendQuit(SCliThrd* thrd) { } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { - uv_read_stop((uv_stream_t*)handle); + if (uv_handle_get_type(handle) == UV_TIMER) { + // SCliConn* pConn = handle->data; + // if (pConn != NULL && pConn->timer != NULL) { + // SCliThrd* pThrd = pConn->hostThrd; + // uv_timer_stop((uv_timer_t*)handle); + // handle->data = NULL; + // taosArrayPush(pThrd->timerList, &pConn->timer); + // pConn->timer = NULL; + // } + } else { + uv_read_stop((uv_stream_t*)handle); + } uv_close(handle, cliDestroy); } } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 18b812f314..7710abcaa1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -497,7 +497,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { SDelayTask* task = container_of(minNode, SDelayTask, node); STaskArg* arg = task->arg; - if (freeFunc) freeFunc(arg->param1); + if (freeFunc) freeFunc(arg); taosMemoryFree(arg); taosMemoryFree(task); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index f7949048ca..a2ce5ac08c 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -435,7 +435,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) { taosGetTimeOfDay(&timeSecs); time_t curTime = timeSecs.tv_sec; - ptm = taosLocalTimeNolock(&Tm, &curTime, taosGetDaylight()); + ptm = taosLocalTime(&curTime, &Tm); return sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " %s", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId(), flags);