fix mem leak
This commit is contained in:
parent
04948080bc
commit
8cef033f92
|
@ -765,17 +765,12 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
SSubmitReq2* pReq = NULL;
|
SSubmitReq2* pReq = NULL;
|
||||||
SArray* tagArray = NULL;
|
SArray* tagArray = NULL;
|
||||||
SArray* createTbArray = NULL;
|
|
||||||
SArray* pVals = NULL;
|
SArray* pVals = NULL;
|
||||||
|
|
||||||
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
|
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(createTbArray = taosArrayInit(blockSz, POINTER_BYTES))) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
|
if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -824,91 +819,77 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// create table req
|
|
||||||
if (createTb) {
|
|
||||||
for (int32_t i = 0; i < blockSz; ++i) {
|
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
|
||||||
SVCreateTbReq* pCreateTbReq = NULL;
|
|
||||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
|
||||||
taosArrayPush(createTbArray, &pCreateTbReq);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
|
||||||
goto _end;
|
|
||||||
};
|
|
||||||
|
|
||||||
// don't move to the end of loop as to destroy in the end of func when error occur
|
|
||||||
taosArrayPush(createTbArray, &pCreateTbReq);
|
|
||||||
|
|
||||||
// set const
|
|
||||||
pCreateTbReq->flags = 0;
|
|
||||||
pCreateTbReq->type = TSDB_CHILD_TABLE;
|
|
||||||
pCreateTbReq->ctb.suid = suid;
|
|
||||||
|
|
||||||
// set super table name
|
|
||||||
SName name = {0};
|
|
||||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
|
||||||
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
|
||||||
|
|
||||||
// set tag content
|
|
||||||
taosArrayClear(tagArray);
|
|
||||||
STagVal tagVal = {
|
|
||||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
|
||||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
|
||||||
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
|
||||||
};
|
|
||||||
taosArrayPush(tagArray, &tagVal);
|
|
||||||
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
|
|
||||||
|
|
||||||
STag* pTag = NULL;
|
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
|
||||||
if (pTag == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
|
||||||
|
|
||||||
// set tag name
|
|
||||||
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
|
||||||
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
|
||||||
strcpy(tagNameStr, "group_id");
|
|
||||||
taosArrayPush(tagName, tagNameStr);
|
|
||||||
pCreateTbReq->ctb.tagName = tagName;
|
|
||||||
|
|
||||||
// set table name
|
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
|
||||||
pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
|
|
||||||
} else {
|
|
||||||
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SSubmitTbData req
|
|
||||||
int32_t rows = pDataBlock->info.rows;
|
|
||||||
|
|
||||||
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
||||||
if (!pTbData) {
|
if (!pTbData) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
// create table req
|
||||||
|
if (createTb) {
|
||||||
|
SVCreateTbReq* pCreateTbReq = NULL;
|
||||||
|
|
||||||
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
|
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
||||||
taosMemoryFree(pTbData);
|
goto _end;
|
||||||
goto _end;
|
};
|
||||||
|
|
||||||
|
// set const
|
||||||
|
pCreateTbReq->flags = 0;
|
||||||
|
pCreateTbReq->type = TSDB_CHILD_TABLE;
|
||||||
|
pCreateTbReq->ctb.suid = suid;
|
||||||
|
|
||||||
|
// set super table name
|
||||||
|
SName name = {0};
|
||||||
|
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
||||||
|
|
||||||
|
// set tag content
|
||||||
|
taosArrayClear(tagArray);
|
||||||
|
STagVal tagVal = {
|
||||||
|
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||||
|
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||||
|
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
||||||
|
};
|
||||||
|
taosArrayPush(tagArray, &tagVal);
|
||||||
|
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
|
||||||
|
|
||||||
|
STag* pTag = NULL;
|
||||||
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
|
if (pTag == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||||
|
|
||||||
|
// set tag name
|
||||||
|
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||||
|
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||||
|
strcpy(tagNameStr, "group_id");
|
||||||
|
taosArrayPush(tagName, tagNameStr);
|
||||||
|
pCreateTbReq->ctb.tagName = tagName;
|
||||||
|
|
||||||
|
// set table name
|
||||||
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
|
pCreateTbReq->name = strdup(pDataBlock->info.parTbName);
|
||||||
|
} else {
|
||||||
|
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
||||||
|
}
|
||||||
|
pTbData->pCreateTbReq = pCreateTbReq;
|
||||||
|
pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTbData->suid = suid;
|
pTbData->suid = suid;
|
||||||
pTbData->uid = 0; // uid is assigned by vnode
|
pTbData->uid = 0; // uid is assigned by vnode
|
||||||
pTbData->sver = pTSchema->version;
|
pTbData->sver = pTSchema->version;
|
||||||
|
|
||||||
tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
|
||||||
if (createTb) {
|
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
|
||||||
pTbData->pCreateTbReq = taosArrayGetP(createTbArray, i);
|
taosMemoryFreeClear(pTbData);
|
||||||
if (pTbData->pCreateTbReq) pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
|
||||||
|
|
||||||
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
|
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
|
||||||
taosArrayDestroy(pTbData->aRowP);
|
taosArrayDestroy(pTbData->aRowP);
|
||||||
taosMemoryFree(pTbData);
|
taosMemoryFree(pTbData);
|
||||||
|
@ -979,7 +960,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_end:
|
_end:
|
||||||
taosArrayDestroy(createTbArray);
|
|
||||||
taosArrayDestroy(tagArray);
|
taosArrayDestroy(tagArray);
|
||||||
taosArrayDestroy(pVals);
|
taosArrayDestroy(pVals);
|
||||||
// TODO: change
|
// TODO: change
|
||||||
|
|
Loading…
Reference in New Issue