fix:memory leak

This commit is contained in:
wangmm0220 2023-09-22 09:31:58 +08:00
parent 70b5509217
commit 803b17a2db
4 changed files with 41 additions and 46 deletions

View File

@ -1411,7 +1411,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
code = smlBuildOutput(pQuery, pVgHash); code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("smlBuildOutput failed"); uError("smlBuildOutput failed");
return code; goto end;
} }
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
@ -1496,7 +1496,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
code = smlBuildOutput(pQuery, pVgHash); code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("smlBuildOutput failed"); uError("smlBuildOutput failed");
return code; goto end;
} }
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);

View File

@ -178,17 +178,14 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle); code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err; if (code) goto end;
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
code = tqMetaSaveHandle(pTq, handle.subKey, &handle); code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
if (code < 0) goto _err;
tDecoderClear(pDecoder);
return code; end:
_err:
tDecoderClear(pDecoder); tDecoderClear(pDecoder);
tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); tqDestroyTqHandle(&handle);
tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code);
return code; return code;
} }

View File

@ -198,58 +198,51 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
int32_t code; int32_t code;
int32_t vlen; int32_t vlen;
void* buf = NULL;
SEncoder encoder;
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
if (code < 0) { if (code < 0) {
return -1; goto end;
} }
tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey, tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
(int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
void* buf = taosMemoryCalloc(1, vlen); buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) { if (buf == NULL) {
return -1; code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
} }
SEncoder encoder;
tEncoderInit(&encoder, buf, vlen); tEncoderInit(&encoder, buf, vlen);
if (tEncodeSTqHandle(&encoder, pHandle) < 0) { code = tEncodeSTqHandle(&encoder, pHandle);
tEncoderClear(&encoder); if (code < 0) {
taosMemoryFree(buf); goto end;
return -1;
} }
TXN* txn; TXN* txn = NULL;
code = tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < if (code < 0) {
0) { goto end;
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
} }
if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) { code = tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn);
tEncoderClear(&encoder); if (code < 0) {
taosMemoryFree(buf); goto end;
return -1;
} }
if (tdbCommit(pTq->pMetaDB, txn) < 0) { code = tdbCommit(pTq->pMetaDB, txn);
tEncoderClear(&encoder); if (code < 0) {
taosMemoryFree(buf); goto end;
return -1;
}
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
return -1;
} }
code = tdbPostCommit(pTq->pMetaDB, txn);
end:
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosMemoryFree(buf); taosMemoryFree(buf);
return 0; return code;
} }
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
@ -349,15 +342,18 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SDecoder decoder; SDecoder decoder;
int32_t code = 0;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, handle); code = tDecodeSTqHandle(&decoder, handle);
tDecoderClear(&decoder); if (code) goto end;
code = buildHandle(pTq, handle);
if(buildHandle(pTq, handle) < 0){ if (code) goto end;
return -1;
}
tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId); tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
end:
tDecoderClear(&decoder);
return code;
} }
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){

View File

@ -238,6 +238,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
return -1; return -1;
} }
} }
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) { while (pMeta->streamBackend == NULL) {