fix mem leak
This commit is contained in:
parent
9d2d9aaddd
commit
faa4bbcaca
|
@ -77,7 +77,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
||||||
int64_t streamId, int32_t taskId);
|
int64_t streamId, int32_t taskId);
|
||||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||||
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode);
|
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
|
||||||
|
|
||||||
static SArray *doExtractNodeListFromStream(SMnode *pMnode);
|
static SArray *doExtractNodeListFromStream(SMnode *pMnode);
|
||||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
|
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
|
||||||
|
@ -129,7 +129,11 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndCleanupStream(SMnode *pMnode) {}
|
void mndCleanupStream(SMnode *pMnode) {
|
||||||
|
taosArrayDestroy(execNodeList.pTaskList);
|
||||||
|
taosHashCleanup(execNodeList.pTaskMap);
|
||||||
|
taosThreadMutexDestroy(&execNodeList.lock);
|
||||||
|
}
|
||||||
|
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1167,12 +1171,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{// check if all tasks are in TASK_STATUS__NORMAL status
|
{ // check if all tasks are in TASK_STATUS__NORMAL status
|
||||||
bool ready = true;
|
bool ready = true;
|
||||||
|
|
||||||
taosThreadMutexLock(&execNodeList.lock);
|
taosThreadMutexLock(&execNodeList.lock);
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
|
||||||
STaskStatusEntry* p = taosArrayGet(execNodeList.pTaskList, i);
|
STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
|
||||||
if (p->status != TASK_STATUS__NORMAL) {
|
if (p->status != TASK_STATUS__NORMAL) {
|
||||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
|
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
|
||||||
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status));
|
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status));
|
||||||
|
@ -1197,9 +1201,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
|
mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
|
||||||
|
|
||||||
const char* pDb = mndGetStreamDB(pMnode);
|
const char *pDb = mndGetStreamDB(pMnode);
|
||||||
mndTransSetDbName(pTrans, pDb, "checkpoint");
|
mndTransSetDbName(pTrans, pDb, "checkpoint");
|
||||||
taosMemoryFree((void*)pDb);
|
taosMemoryFree((void *)pDb);
|
||||||
|
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
|
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
|
||||||
|
@ -2226,7 +2230,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) {
|
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
|
||||||
int32_t level = taosArrayGetSize(pStream->tasks);
|
int32_t level = taosArrayGetSize(pStream->tasks);
|
||||||
for (int32_t i = 0; i < level; i++) {
|
for (int32_t i = 0; i < level; i++) {
|
||||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||||
|
@ -2236,7 +2240,7 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
|
||||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||||
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
|
||||||
|
|
||||||
void* p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys));
|
void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
STaskStatusEntry entry = {
|
STaskStatusEntry entry = {
|
||||||
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
|
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
|
||||||
|
@ -2250,7 +2254,7 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: this process should be executed by the write queue worker of the mnode
|
// todo: this process should be executed by the write queue worker of the mnode
|
||||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
|
||||||
SStreamHbMsg req = {0};
|
SStreamHbMsg req = {0};
|
||||||
|
@ -2260,11 +2264,13 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
|
||||||
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
||||||
|
|
||||||
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
|
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
|
||||||
|
tDecoderClear(&decoder);
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
// int64_t now = taosGetTimestampSec();
|
// int64_t now = taosGetTimestampSec();
|
||||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||||
|
|
||||||
taosThreadMutexLock(&execNodeList.lock);
|
taosThreadMutexLock(&execNodeList.lock);
|
||||||
|
@ -2273,12 +2279,12 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
|
||||||
doExtractTasksFromStream(pMnode);
|
doExtractTasksFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
int64_t k[2] = {p->streamId, p->taskId};
|
int64_t k[2] = {p->streamId, p->taskId};
|
||||||
int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
|
||||||
|
|
||||||
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
|
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
|
||||||
pStatusEntry->status = p->status;
|
pStatusEntry->status = p->status;
|
||||||
if (p->status != TASK_STATUS__NORMAL) {
|
if (p->status != TASK_STATUS__NORMAL) {
|
||||||
mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status));
|
mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status));
|
||||||
|
@ -2286,9 +2292,11 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&execNodeList.lock);
|
taosThreadMutexUnlock(&execNodeList.lock);
|
||||||
|
|
||||||
|
taosArrayDestroy(req.pTaskStatus);
|
||||||
|
|
||||||
// bool nodeChanged = false;
|
// bool nodeChanged = false;
|
||||||
// SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
// SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||||
/*
|
/*
|
||||||
// record the timeout node
|
// record the timeout node
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
|
||||||
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
|
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
|
||||||
|
@ -2307,10 +2315,10 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
|
||||||
|
|
||||||
// check epset to identify whether the node has been transferred to other dnodes.
|
// check epset to identify whether the node has been transferred to other dnodes.
|
||||||
// node the epset is changed, which means the node transfer has occurred for this node.
|
// node the epset is changed, which means the node transfer has occurred for this node.
|
||||||
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
|
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
|
||||||
// nodeChanged = true;
|
// nodeChanged = true;
|
||||||
// break;
|
// break;
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
|
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
|
||||||
|
@ -2334,16 +2342,16 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
|
||||||
|
|
||||||
// update the related upstream and downstream tasks, todo remove this, no need this function
|
// update the related upstream and downstream tasks, todo remove this, no need this function
|
||||||
taosWLockLatch(&pStream->lock);
|
taosWLockLatch(&pStream->lock);
|
||||||
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
|
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
|
||||||
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
|
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
|
|
||||||
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
|
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
|
||||||
// if (code != TSDB_CODE_SUCCESS) {
|
// if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo
|
// todo
|
||||||
//// }
|
//// }
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue