fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-06-24 23:43:43 +08:00
parent a8de3694fa
commit 0322fdc1fc
2 changed files with 34 additions and 16 deletions

View File

@ -1185,8 +1185,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
colDataDestroy(pColInfoData); colDataDestroy(pColInfoData);
} }
taosArrayDestroy(pBlock->pDataBlock); pBlock->pDataBlock = taosArrayDestroy(pBlock->pDataBlock);
pBlock->pDataBlock = NULL;
taosMemoryFreeClear(pBlock->pBlockAgg); taosMemoryFreeClear(pBlock->pBlockAgg);
memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
} }

View File

@ -17,6 +17,8 @@
#include "streamInc.h" #include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024 #define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300
#define MAX_CONTINUE_RETRY_COUNT 5
typedef struct SBlockName { typedef struct SBlockName {
uint32_t hashValue; uint32_t hashValue;
@ -324,7 +326,10 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
// serialize // serialize
int32_t tlen; int32_t tlen;
tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code); tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code);
if (code < 0) goto FAIL; if (code < 0) {
goto FAIL;
}
code = -1; code = -1;
buf = rpcMallocCont(sizeof(SMsgHead) + tlen); buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {
@ -346,13 +351,13 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
msg.msgType = pTask->msgInfo.msgType; msg.msgType = pTask->msgInfo.msgType;
qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg); return tmsgSendReq(pEpSet, &msg);
code = 0;
return 0;
FAIL: FAIL:
if (buf) rpcFreeCont(buf); if (buf) {
rpcFreeCont(buf);
}
return code; return code;
} }
@ -403,13 +408,16 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
for (j = 0; j < vgSz; j++) { for (j = 0; j < vgSz; j++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
ASSERT(pVgInfo->vgId > 0); ASSERT(pVgInfo->vgId > 0);
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
return -1; return -1;
} }
if (pReqs[j].blockNum == 0) { if (pReqs[j].blockNum == 0) {
atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
} }
pReqs[j].blockNum++; pReqs[j].blockNum++;
found = true; found = true;
break; break;
@ -510,7 +518,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId,
pReqs[i].blockNum, pVgInfo->vgId); pReqs[i].blockNum, pVgInfo->vgId);
if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
} }
@ -536,7 +545,9 @@ static void doRetryDispatchData(void* param, void* tmrId) {
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamRetryDispatchStreamBlock(pTask, 300); qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} }
} }
@ -584,12 +595,20 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} }
qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr,
tstrerror(code), pTask->outputStatus, retryCount); tstrerror(terrno), pTask->outputStatus, retryCount);
if (++retryCount > 5) { // add to timer to retry // todo deal with only partially success dispatch case
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, retry in %dms", pTask->id.idStr, atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
retryCount, tstrerror(code), 300); if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore
streamRetryDispatchStreamBlock(pTask, 300); destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
return code;
}
if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry
qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr,
retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
break; break;
} }
} }