fix mem leak

This commit is contained in:
yihaoDeng 2023-10-07 17:48:24 +08:00
parent 8ebd171ee0
commit 97fdcbc60b
3 changed files with 74 additions and 62 deletions

View File

@ -105,6 +105,7 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
pHdr->size = len;
memcpy(pHdr->data, rowData, len);
tqDebug("vgId:%d, vnode stream-state snapshot read data success", TD_VID(pReader->pTq->pVnode));
taosMemoryFree(rowData);
return code;
_err:

View File

@ -459,7 +459,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
} else {
stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp,
tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
taosMkDir(state);
}
taosMemoryFree(chkp);
@ -813,6 +813,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
}
int32_t nCf = taosArrayGetSize(pHandle);
if (nCf == 0) {
taosArrayDestroy(pHandle);
return nCf;
}
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
@ -845,6 +849,9 @@ _ERROR:
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
if (nCf == 0) {
return 0;
}
int code = 0;
char* err = NULL;
@ -910,7 +917,7 @@ int32_t streamBackendTriggerChkp(void* arg, char* dst) {
stError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst);
} else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst,
taosGetTimestampMs() - st);
taosGetTimestampMs() - st);
}
} else {
stError("stream backend:%p failed to flush db at:%s", pHandle, dst);
@ -979,15 +986,15 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
if (code == 0) {
if (code == 0 && nCf != 0) {
code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir);
if (code != 0) {
stError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir);
} else {
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir,
taosGetTimestampMs() - st);
taosGetTimestampMs() - st);
}
} else {
} else if (nCf != 0) {
stError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
}
// release all ref to cfWrapper;
@ -1711,7 +1718,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
char* status[] = {"close", "drop"};
stInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
wrapper->idstr);
wrapper->idstr);
wrapper->remove |= remove; // update by other pState
taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
}
@ -1783,35 +1790,36 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
} \
taosMemoryFree(ttlV); \
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
rocksdb_t* db = wrapper->rocksdb; \
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, \
ttlVLen); \
} \
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
@ -1821,7 +1829,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s", funcname); \
stWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
@ -1836,9 +1844,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \
if (err == NULL) { \
stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
} else { \
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
@ -1846,11 +1854,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \
code = -1; \
} else { \
stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
stTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \
@ -1864,7 +1872,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
char* err = NULL; \
int i = streamStateGetCfIdx(pState, funcname); \
if (i < 0) { \
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
break; \
} \
@ -1877,11 +1885,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
stError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
taosMemoryFree(err); \
code = -1; \
} else { \
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
} \
} while (0);

View File

@ -149,7 +149,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pReadyTaskSet == NULL) {
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
@ -208,7 +207,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->numOfPausedTasks = 0;
pMeta->numOfStreamTasks = 0;
stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
stage);
return pMeta;
_err:
@ -248,7 +247,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath,
tstrerror(terrno));
tstrerror(terrno));
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
@ -268,6 +267,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return 0;
}
@ -379,10 +380,10 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
if (code != 0) {
stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t) pTaskId->taskId,
tstrerror(terrno));
stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
tstrerror(terrno));
} else {
stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t) pTaskId->taskId);
stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
}
return code;
@ -393,7 +394,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
*pAdded = false;
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
tFreeStreamTask(pTask);
@ -434,7 +435,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
int32_t num = 0;
size_t size = taosArrayGetSize(pMeta->pTaskList);
for (int32_t i = 0; i < size; ++i) {
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
if (p == NULL) {
continue;
@ -451,7 +452,7 @@ int32_t streamMetaGetNumOfStreamTasks(SStreamMeta* pMeta) {
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
STaskId id = {.streamId = streamId, .taskId = taskId};
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) {
@ -495,7 +496,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
// pre-delete operation
taosWLockLatch(&pMeta->lock);
STaskId id = {.streamId = streamId, .taskId = taskId};
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
@ -512,7 +513,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
taosWUnLockLatch(&pMeta->lock);
stDebug("s-task:0x%x set task status:%s and start to unregister it", taskId,
streamGetTaskStatusStr(TASK_STATUS__DROPPING));
streamGetTaskStatusStr(TASK_STATUS__DROPPING));
while (1) {
taosRLockLatch(&pMeta->lock);
@ -650,7 +651,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
TBC* pCur = NULL;
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
@ -683,8 +684,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
stError(
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually", vgId, tsDataDir);
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
"stream "
"manually",
vgId, tsDataDir);
return -1;
}
tDecoderClear(&decoder);
@ -756,7 +759,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);
return 0;
}
@ -803,7 +806,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
}
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
return true;
}
@ -915,7 +918,7 @@ void metaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount);
pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg);
} else {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
@ -952,7 +955,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb%" PRId64 ", totalHb:%d", vgId,
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
(pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
taosWLockLatch(&pMeta->lock);