From c21e15d9394d7c3bc23fbf56ba2abfcc8971d05b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 17 Jan 2022 18:27:27 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/inc/qworkerInt.h | 8 ++++---- source/libs/qworker/src/qworkerMsg.c | 22 ++++++++++++---------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index b415dcf948..913057787e 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -193,15 +193,15 @@ typedef struct SQWorkerMgmt { #define QW_UNLOCK(type, _lock) do { \ if (QW_READ == (type)) { \ assert(atomic_load_32((_lock)) > 0); \ - qDebug("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ - qDebug("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) >= 0); \ } else { \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ - qDebug("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ - qDebug("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ assert(atomic_load_32((_lock)) >= 0); \ } \ } while (0) diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 81dfa48670..b9fd8e78b6 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -324,11 +324,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWMsg qwMsg = {.node = node, .msg = msg->msg, .msgLen = msg->contentLen, .connection = pMsg}; - QW_SCH_TASK_DLOG("processQuery start"); + QW_SCH_TASK_DLOG("processQuery start, node:%p", node); QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg)); - QW_SCH_TASK_DLOG("processQuery end"); + QW_SCH_TASK_DLOG("processQuery end, node:%p", node); return TSDB_CODE_SUCCESS; } @@ -357,11 +357,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; - QW_SCH_TASK_DLOG("processCQuery start"); + QW_SCH_TASK_DLOG("processCQuery start, node:%p", node); QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); - QW_SCH_TASK_DLOG("processCQuery end"); + QW_SCH_TASK_DLOG("processCQuery end, node:%p", node); return TSDB_CODE_SUCCESS; } @@ -396,6 +396,8 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); @@ -406,11 +408,11 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; - QW_SCH_TASK_DLOG("processReady start"); + QW_SCH_TASK_DLOG("processReady start, node:%p", node); QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg)); - QW_SCH_TASK_DLOG("processReady end"); + QW_SCH_TASK_DLOG("processReady end, node:%p", node); return TSDB_CODE_SUCCESS; } @@ -462,11 +464,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; - QW_SCH_TASK_DLOG("processFetch start"); + QW_SCH_TASK_DLOG("processFetch start, node:%p", node); QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); - QW_SCH_TASK_DLOG("processFetch end"); + QW_SCH_TASK_DLOG("processFetch end, node:%p", node); return TSDB_CODE_SUCCESS; } @@ -520,11 +522,11 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; - QW_SCH_TASK_DLOG("processDrop start"); + QW_SCH_TASK_DLOG("processDrop start, node:%p", node); QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); - QW_SCH_TASK_DLOG("processDrop end"); + QW_SCH_TASK_DLOG("processDrop end, node:%p", node); return TSDB_CODE_SUCCESS; }