From fab0adde99fda3dbeb3a7eed1bdac2b8fa32825d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 28 Sep 2022 21:44:37 +0800 Subject: [PATCH] fix memory error --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tq.c | 11 +++++++++-- source/dnode/vnode/src/tq/tqPush.c | 20 ++++++++++++++------ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 04b0813445..f96afe6fba 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -116,7 +116,7 @@ typedef struct { typedef struct { SMqDataRsp dataRsp; SMqRspHead rspHead; - STqHandle* pHandle; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; SRpcHandleInfo pInfo; } STqPushEntry; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 070abadee2..d0e7d2c766 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -554,8 +554,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (dataRsp.blockNum == 0) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); if (pPushEntry != NULL) { - pPushEntry->pHandle = pHandle; pPushEntry->pInfo = pMsg->info; + memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); dataRsp.withTbName = 0; memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); pPushEntry->rspHead.consumerId = consumerId; @@ -704,7 +704,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + taosWLockLatch(&pTq->pushLock); + int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); + if (code != 0) { + tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); + } + taosWUnLockLatch(&pTq->pushLock); + + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index e7d6c1eb47..4bd47f4e83 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -213,11 +213,12 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - tqDebug("vgId:%d tq push msg ver %ld", pTq->pVnode->config.vgId, ver); + tqDebug("vgId:%d tq push msg ver %ld, type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType)); if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); + tqDebug("vgId:%d push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); if (taosHashGetSize(pTq->pPushMgr) != 0) { SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); @@ -235,10 +236,17 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) while (1) { pIter = taosHashIterate(pTq->pPushMgr, pIter); if (pIter == NULL) break; - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - STqExecHandle* pExec = &pPushEntry->pHandle->execHandle; + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; + + STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); + if (pHandle == NULL) { + tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey); + continue; + } + STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; - SMqDataRsp* pRsp = &pPushEntry->dataRsp; + + SMqDataRsp* pRsp = &pPushEntry->dataRsp; // prepare scan mem data qStreamScanMemData(task, pReq); @@ -259,8 +267,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) pRsp->blockNum++; } - tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, - pPushEntry->pHandle->subKey, pRsp->blockNum); + tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey, + pRsp->blockNum); if (pRsp->blockNum > 0) { // set offset tqOffsetResetToLog(&pRsp->rspOffset, ver);