add queue perf msg

This commit is contained in:
Yihao Deng 2024-01-31 07:10:01 +00:00
parent 91c9abfcba
commit 89bcc8f058
3 changed files with 30 additions and 5 deletions

View File

@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn); static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE SSvrConn* createConn(void* hThrd);
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
@ -1382,7 +1382,7 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
tFreeSUpdateIpWhiteReq(req); tFreeSUpdateIpWhiteReq(req);
taosMemoryFree(req); taosMemoryFree(req);
} else { } else {
tInfo("ip-white-list disable on trans"); tDebug("ip-white-list disable on trans");
thrd->enableIpWhiteList = 0; thrd->enableIpWhiteList = 0;
} }
taosMemoryFree(msg); taosMemoryFree(msg);

View File

@ -159,6 +159,7 @@ void taosFreeQitem(void *pItem) {
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
int32_t code = 0; int32_t code = 0;
STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode)); STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
pNode->timestamp = taosGetTimestampUs();
pNode->next = NULL; pNode->next = NULL;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
@ -464,6 +465,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
qinfo->ahandle = queue->ahandle; qinfo->ahandle = queue->ahandle;
qinfo->fp = queue->itemsFp; qinfo->fp = queue->itemsFp;
qinfo->queue = queue; qinfo->queue = queue;
qinfo->timestamp = queue->head->timestamp;
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
@ -489,8 +491,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; } int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; } int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
int64_t taosQallUnAccessedItemSize(STaosQall *qall) {return qall->unAccessedNumOfItems;} int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
int64_t taosQallUnAccessedMemSize(STaosQall *qall) {return qall->unAccessMemOfItems;} int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }

View File

@ -15,10 +15,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tworker.h" #include "tworker.h"
#include "tgeosctx.h"
#include "taoserror.h" #include "taoserror.h"
#include "tgeosctx.h"
#include "tlog.h" #include "tlog.h"
#define QUEUE_THRESHOLD 1000 * 1000
typedef void *(*ThreadFp)(void *param); typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) { int32_t tQWorkerInit(SQWorkerPool *pool) {
@ -84,6 +86,13 @@ static void *tQWorkerThreadFp(SQueueWorker *worker) {
break; break;
} }
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
}
}
if (qinfo.fp != NULL) { if (qinfo.fp != NULL) {
qinfo.workerId = worker->id; qinfo.workerId = worker->id;
qinfo.threadNum = pool->num; qinfo.threadNum = pool->num;
@ -198,6 +207,13 @@ static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
break; break;
} }
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
}
}
if (qinfo.fp != NULL) { if (qinfo.fp != NULL) {
qinfo.workerId = worker->id; qinfo.workerId = worker->id;
qinfo.threadNum = taosArrayGetSize(pool->workers); qinfo.threadNum = taosArrayGetSize(pool->workers);
@ -338,6 +354,13 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
break; break;
} }
if (qinfo.timestamp != 0) {
int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
if (cost > QUEUE_THRESHOLD) {
uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost);
}
}
if (qinfo.fp != NULL) { if (qinfo.fp != NULL) {
qinfo.workerId = worker->id; qinfo.workerId = worker->id;
qinfo.threadNum = pool->num; qinfo.threadNum = pool->num;