fix memory error
This commit is contained in:
parent
057edd92c3
commit
fab0adde99
|
@ -116,7 +116,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqDataRsp dataRsp;
|
SMqDataRsp dataRsp;
|
||||||
SMqRspHead rspHead;
|
SMqRspHead rspHead;
|
||||||
STqHandle* pHandle;
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
SRpcHandleInfo pInfo;
|
SRpcHandleInfo pInfo;
|
||||||
} STqPushEntry;
|
} STqPushEntry;
|
||||||
|
|
||||||
|
|
|
@ -554,8 +554,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (dataRsp.blockNum == 0) {
|
if (dataRsp.blockNum == 0) {
|
||||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||||
if (pPushEntry != NULL) {
|
if (pPushEntry != NULL) {
|
||||||
pPushEntry->pHandle = pHandle;
|
|
||||||
pPushEntry->pInfo = pMsg->info;
|
pPushEntry->pInfo = pMsg->info;
|
||||||
|
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
dataRsp.withTbName = 0;
|
dataRsp.withTbName = 0;
|
||||||
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
|
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
|
||||||
pPushEntry->rspHead.consumerId = consumerId;
|
pPushEntry->rspHead.consumerId = consumerId;
|
||||||
|
@ -704,7 +704,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
taosWLockLatch(&pTq->pushLock);
|
||||||
|
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
||||||
|
if (code != 0) {
|
||||||
|
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
|
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
|
||||||
}
|
}
|
||||||
|
|
|
@ -213,11 +213,12 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
tqDebug("vgId:%d tq push msg ver %ld", pTq->pVnode->config.vgId, ver);
|
tqDebug("vgId:%d tq push msg ver %ld, type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType));
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
// lock push mgr to avoid potential msg lost
|
// lock push mgr to avoid potential msg lost
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->pushLock);
|
||||||
|
tqDebug("vgId:%d push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
|
||||||
if (taosHashGetSize(pTq->pPushMgr) != 0) {
|
if (taosHashGetSize(pTq->pPushMgr) != 0) {
|
||||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||||
|
@ -235,10 +236,17 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
||||||
STqExecHandle* pExec = &pPushEntry->pHandle->execHandle;
|
|
||||||
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
||||||
|
if (pHandle == NULL) {
|
||||||
|
tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
|
||||||
|
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
||||||
|
|
||||||
// prepare scan mem data
|
// prepare scan mem data
|
||||||
qStreamScanMemData(task, pReq);
|
qStreamScanMemData(task, pReq);
|
||||||
|
@ -259,8 +267,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId,
|
tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
||||||
pPushEntry->pHandle->subKey, pRsp->blockNum);
|
pRsp->blockNum);
|
||||||
if (pRsp->blockNum > 0) {
|
if (pRsp->blockNum > 0) {
|
||||||
// set offset
|
// set offset
|
||||||
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
||||||
|
|
Loading…
Reference in New Issue