fix(stream): memory leak
This commit is contained in:
parent
6f8d4d6955
commit
a57a305035
|
@ -67,7 +67,11 @@ void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
|
||||||
|
|
||||||
streamTaskInput(pTask, (SStreamQueueItem*)trigger);
|
if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) {
|
||||||
|
taosFreeQitem(trigger);
|
||||||
|
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
sprintf(streamPath, "%s/%s", path, "stream");
|
sprintf(streamPath, "%s/%s", path, "stream");
|
||||||
pMeta->path = strdup(streamPath);
|
pMeta->path = strdup(streamPath);
|
||||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
|
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
|
||||||
|
taosMemoryFree(streamPath);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,7 +59,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
return pMeta;
|
return pMeta;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
if (pMeta->path) taosMemoryFree(pMeta->path);
|
taosMemoryFree(pMeta->path);
|
||||||
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
|
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
|
||||||
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
|
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
|
||||||
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
|
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
|
||||||
|
@ -250,6 +251,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
|
||||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
|
@ -257,10 +260,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
|
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
||||||
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
sprintf(statePath, "%s/%d", path, pTask->taskId);
|
||||||
} else {
|
} else {
|
||||||
memset(statePath, 0, 300);
|
memset(statePath, 0, 300);
|
||||||
strncpy(statePath, path, 300);
|
tstrncpy(statePath, path, 300);
|
||||||
}
|
}
|
||||||
if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) {
|
if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
Loading…
Reference in New Issue