diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 362d38b505..b324ca5f91 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg); static void uvNotifyLinkBrokenToApp(SSvrConn* conn); -static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); +static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); @@ -1382,7 +1382,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { tFreeSUpdateIpWhiteReq(req); taosMemoryFree(req); } else { - tInfo("ip-white-list disable on trans"); + tDebug("ip-white-list disable on trans"); thrd->enableIpWhiteList = 0; } taosMemoryFree(msg); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 1dfdd637b6..2cc13be6ba 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -159,6 +159,7 @@ void taosFreeQitem(void *pItem) { int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { int32_t code = 0; STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode)); + pNode->timestamp = taosGetTimestampUs(); pNode->next = NULL; taosThreadMutexLock(&queue->mutex); @@ -464,6 +465,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * qinfo->ahandle = queue->ahandle; qinfo->fp = queue->itemsFp; qinfo->queue = queue; + qinfo->timestamp = queue->head->timestamp; queue->head = NULL; queue->tail = NULL; @@ -489,8 +491,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; } int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; } -int64_t taosQallUnAccessedItemSize(STaosQall *qall) {return qall->unAccessedNumOfItems;} -int64_t taosQallUnAccessedMemSize(STaosQall *qall) {return qall->unAccessMemOfItems;} +int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; } +int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; } void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 28f7fd4783..c4b3271c65 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -15,10 +15,12 @@ #define _DEFAULT_SOURCE #include "tworker.h" -#include "tgeosctx.h" #include "taoserror.h" +#include "tgeosctx.h" #include "tlog.h" +#define QUEUE_THRESHOLD 1000 * 1000 + typedef void *(*ThreadFp)(void *param); int32_t tQWorkerInit(SQWorkerPool *pool) { @@ -84,6 +86,13 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) { break; } + if (qinfo.timestamp != 0) { + int64_t cost = taosGetTimestampUs() - qinfo.timestamp; + if (cost > QUEUE_THRESHOLD) { + uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost); + } + } + if (qinfo.fp != NULL) { qinfo.workerId = worker->id; qinfo.threadNum = pool->num; @@ -198,6 +207,13 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) { break; } + if (qinfo.timestamp != 0) { + int64_t cost = taosGetTimestampUs() - qinfo.timestamp; + if (cost > QUEUE_THRESHOLD) { + uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost); + } + } + if (qinfo.fp != NULL) { qinfo.workerId = worker->id; qinfo.threadNum = taosArrayGetSize(pool->workers); @@ -338,6 +354,13 @@ static void *tWWorkerThreadFp(SWWorker *worker) { break; } + if (qinfo.timestamp != 0) { + int64_t cost = taosGetTimestampUs() - qinfo.timestamp; + if (cost > QUEUE_THRESHOLD) { + uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost); + } + } + if (qinfo.fp != NULL) { qinfo.workerId = worker->id; qinfo.threadNum = pool->num;