Merge pull request #27915 from taosdata/fix/TD-32138
fix(stream):fix mem leak and adj malloc res
This commit is contained in:
commit
eb5ee966a3
|
@ -2620,12 +2620,16 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
|
||||||
}
|
}
|
||||||
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
|
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
*pDataBuf = dumpBuf;
|
*pDataBuf = dumpBuf;
|
||||||
dumpBuf = NULL;
|
dumpBuf = NULL;
|
||||||
_exit:
|
} else {
|
||||||
|
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||||
if (dumpBuf) {
|
if (dumpBuf) {
|
||||||
taosMemoryFree(dumpBuf);
|
taosMemoryFree(dumpBuf);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2833,6 +2833,7 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
|
||||||
qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
|
qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (qDebugFlag & DEBUG_DEBUG) {
|
||||||
char* pBuf = NULL;
|
char* pBuf = NULL;
|
||||||
int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr);
|
int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
@ -2840,6 +2841,7 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
|
||||||
taosMemoryFree(pBuf);
|
taosMemoryFree(pBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
|
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
|
||||||
if (!pBlock) {
|
if (!pBlock) {
|
||||||
|
|
|
@ -314,6 +314,8 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
||||||
}
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
|
} else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
|
char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
|
||||||
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
|
STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
code = colDataSetVal(pColInfo, i, tmp, false);
|
code = colDataSetVal(pColInfo, i, tmp, false);
|
||||||
|
|
|
@ -81,6 +81,9 @@ bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
|
||||||
|
|
||||||
SStreamStateCur* createSessionStateCursor(SStreamFileState* pFileState) {
|
SStreamStateCur* createSessionStateCursor(SStreamFileState* pFileState) {
|
||||||
SStreamStateCur* pCur = createStreamStateCursor();
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
pCur->pStreamFileState = pFileState;
|
pCur->pStreamFileState = pFileState;
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
|
@ -533,6 +536,9 @@ static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, co
|
||||||
|
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pCur = createSessionStateCursor(pFileState);
|
pCur = createSessionStateCursor(pFileState);
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
pCur->buffIndex = index;
|
pCur->buffIndex = index;
|
||||||
if (pIndex) {
|
if (pIndex) {
|
||||||
*pIndex = index;
|
*pIndex = index;
|
||||||
|
@ -634,6 +640,9 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
|
||||||
pBuffCur->buffIndex = 0;
|
pBuffCur->buffIndex = 0;
|
||||||
} else if (taosArrayGetSize(pWinStates) > 0) {
|
} else if (taosArrayGetSize(pWinStates) > 0) {
|
||||||
pBuffCur = createSessionStateCursor(pFileState);
|
pBuffCur = createSessionStateCursor(pFileState);
|
||||||
|
if (pBuffCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
pBuffCur->buffIndex = 0;
|
pBuffCur->buffIndex = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -530,6 +530,9 @@ void streamStateCopyBackend(SStreamState* src, SStreamState* dst) {
|
||||||
}
|
}
|
||||||
SStreamStateCur* createStreamStateCursor() {
|
SStreamStateCur* createStreamStateCursor() {
|
||||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
pCur->buffIndex = -1;
|
pCur->buffIndex = -1;
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
|
|
|
@ -631,7 +631,11 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
|
||||||
if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1;
|
if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1;
|
||||||
|
|
||||||
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen);
|
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen);
|
||||||
|
QUERY_CHECK_NULL(pInfo->pKeyBuff, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen);
|
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen);
|
||||||
|
QUERY_CHECK_NULL(pInfo->pValueBuff, code, lino, _error, terrno);
|
||||||
|
|
||||||
if (pInfo->pkColLen != 0) {
|
if (pInfo->pkColLen != 0) {
|
||||||
pInfo->comparePkRowFn = compareKeyTsAndPk;
|
pInfo->comparePkRowFn = compareKeyTsAndPk;
|
||||||
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);
|
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);
|
||||||
|
|
|
@ -96,6 +96,10 @@ int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data,
|
||||||
|
|
||||||
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
|
void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
|
||||||
SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
|
SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey));
|
||||||
|
if (pStateKey == NULL) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
SWinKey* pWinKey = pPos->pKey;
|
SWinKey* pWinKey = pPos->pKey;
|
||||||
pStateKey->key = *pWinKey;
|
pStateKey->key = *pWinKey;
|
||||||
pStateKey->opNum = num;
|
pStateKey->opNum = num;
|
||||||
|
@ -112,6 +116,10 @@ int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, i
|
||||||
|
|
||||||
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
|
void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
|
||||||
SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
|
SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey));
|
||||||
|
if (pStateKey == NULL) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
SSessionKey* pWinKey = pPos->pKey;
|
SSessionKey* pWinKey = pPos->pKey;
|
||||||
pStateKey->key = *pWinKey;
|
pStateKey->key = *pWinKey;
|
||||||
pStateKey->opNum = num;
|
pStateKey->opNum = num;
|
||||||
|
@ -120,11 +128,16 @@ void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
|
||||||
|
|
||||||
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
|
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
|
||||||
|
|
||||||
static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
|
static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
|
||||||
*pLen = sizeof(TSKEY);
|
*pLen = sizeof(TSKEY);
|
||||||
(*pVal) = taosMemoryCalloc(1, *pLen);
|
(*pVal) = taosMemoryCalloc(1, *pLen);
|
||||||
|
if ((*pVal) == NULL) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
void* buff = *pVal;
|
void* buff = *pVal;
|
||||||
int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
|
int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
|
int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp,
|
||||||
|
@ -177,6 +190,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize,
|
||||||
pFileState->stateFunctionGetFn = getSessionRowBuff;
|
pFileState->stateFunctionGetFn = getSessionRowBuff;
|
||||||
}
|
}
|
||||||
QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _error, terrno);
|
QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _error, terrno);
|
||||||
|
QUERY_CHECK_NULL(pFileState->cfName, code, lino, _error, terrno);
|
||||||
|
|
||||||
pFileState->keyLen = keySize;
|
pFileState->keyLen = keySize;
|
||||||
pFileState->rowSize = rowSize;
|
pFileState->rowSize = rowSize;
|
||||||
|
@ -480,12 +494,11 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
|
||||||
|
|
||||||
if (pFileState->curRowCount < pFileState->maxRowCount) {
|
if (pFileState->curRowCount < pFileState->maxRowCount) {
|
||||||
pBuff = taosMemoryCalloc(1, pFileState->rowSize);
|
pBuff = taosMemoryCalloc(1, pFileState->rowSize);
|
||||||
if (pBuff) {
|
QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno);
|
||||||
pPos->pRowBuff = pBuff;
|
pPos->pRowBuff = pBuff;
|
||||||
pFileState->curRowCount++;
|
pFileState->curRowCount++;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
code = clearRowBuff(pFileState);
|
code = clearRowBuff(pFileState);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
@ -712,6 +725,8 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
|
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
|
||||||
|
QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno);
|
||||||
|
|
||||||
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
|
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
|
||||||
0, buf);
|
0, buf);
|
||||||
taosMemoryFreeClear(pSKey);
|
taosMemoryFreeClear(pSKey);
|
||||||
|
@ -738,7 +753,9 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo
|
||||||
if (flushState) {
|
if (flushState) {
|
||||||
void* valBuf = NULL;
|
void* valBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
|
code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
|
qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
|
||||||
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
|
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
|
||||||
taosMemoryFree(valBuf);
|
taosMemoryFree(valBuf);
|
||||||
|
|
|
@ -177,6 +177,8 @@ int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF) {
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
||||||
|
QUERY_CHECK_NULL(pBF->buffer, code, lino, _error, terrno);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pBF->numUnits; i++) {
|
for (int32_t i = 0; i < pBF->numUnits; i++) {
|
||||||
uint64_t* pUnits = (uint64_t*)pBF->buffer;
|
uint64_t* pUnits = (uint64_t*)pBF->buffer;
|
||||||
if (tDecodeU64(pDecoder, pUnits + i) < 0) {
|
if (tDecodeU64(pDecoder, pUnits + i) < 0) {
|
||||||
|
|
Loading…
Reference in New Issue