Merge pull request #17396 from taosdata/fix/client_mem_leak
fix: client mem leak
This commit is contained in:
commit
7467a54a16
|
@ -709,6 +709,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
int64_t refId = *(int64_t*)param;
|
int64_t refId = *(int64_t*)param;
|
||||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||||
if (tmq == NULL) {
|
if (tmq == NULL) {
|
||||||
|
taosMemoryFree(param);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int64_t consumerId = tmq->consumerId;
|
int64_t consumerId = tmq->consumerId;
|
||||||
|
@ -939,10 +940,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
|
||||||
*pRefId = pTmq->refId;
|
|
||||||
|
|
||||||
if (pTmq->hbBgEnable) {
|
if (pTmq->hbBgEnable) {
|
||||||
|
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||||
|
*pRefId = pTmq->refId;
|
||||||
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
|
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,6 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqDataRsp dataRsp;
|
SMqDataRsp dataRsp;
|
||||||
SMqRspHead rspHead;
|
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
SRpcHandleInfo pInfo;
|
SRpcHandleInfo pInfo;
|
||||||
} STqPushEntry;
|
} STqPushEntry;
|
||||||
|
@ -183,6 +182,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
|
||||||
|
|
||||||
// tqSink
|
// tqSink
|
||||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||||
|
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||||
|
|
||||||
// tqOffset
|
// tqOffset
|
||||||
char* tqOffsetBuildFName(const char* path, int32_t ver);
|
char* tqOffsetBuildFName(const char* path, int32_t ver);
|
||||||
|
|
|
@ -192,7 +192,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead));
|
memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
|
||||||
|
@ -215,7 +215,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||||
TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -560,9 +560,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
dataRsp.withTbName = 0;
|
dataRsp.withTbName = 0;
|
||||||
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
|
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
|
||||||
pPushEntry->rspHead.consumerId = consumerId;
|
pPushEntry->dataRsp.head.consumerId = consumerId;
|
||||||
pPushEntry->rspHead.epoch = reqEpoch;
|
pPushEntry->dataRsp.head.epoch = reqEpoch;
|
||||||
pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||||
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
|
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
|
||||||
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
|
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
|
||||||
TD_VID(pTq->pVnode));
|
TD_VID(pTq->pVnode));
|
||||||
|
@ -924,7 +924,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
|
||||||
pTask->smaSink.smaSink = smaHandleRes;
|
pTask->smaSink.smaSink = smaHandleRes;
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||||
pTask->tbSink.vnode = pTq->pVnode;
|
pTask->tbSink.vnode = pTq->pVnode;
|
||||||
pTask->tbSink.tbSinkFunc = tqTableSink;
|
pTask->tbSink.tbSinkFunc = tqTableSink1;
|
||||||
|
|
||||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||||
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||||
|
|
|
@ -284,6 +284,212 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
|
const SArray* pBlocks = (const SArray*)data;
|
||||||
|
SVnode* pVnode = (SVnode*)vnode;
|
||||||
|
int64_t suid = pTask->tbSink.stbUid;
|
||||||
|
char* stbFullName = pTask->tbSink.stbFullName;
|
||||||
|
STSchema* pTSchema = pTask->tbSink.pTSchema;
|
||||||
|
SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;
|
||||||
|
|
||||||
|
int32_t blockSz = taosArrayGetSize(pBlocks);
|
||||||
|
bool createTb = true;
|
||||||
|
|
||||||
|
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||||
|
if (!tagArray) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
|
||||||
|
for (int32_t i = 0; i < blockSz; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
|
SBatchDeleteReq deleteReq = {0};
|
||||||
|
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||||
|
deleteReq.suid = suid;
|
||||||
|
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
|
||||||
|
|
||||||
|
int32_t len;
|
||||||
|
int32_t code;
|
||||||
|
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
||||||
|
if (code < 0) {
|
||||||
|
//
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
SEncoder encoder;
|
||||||
|
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
|
||||||
|
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
taosArrayDestroy(deleteReq.deleteReqs);
|
||||||
|
|
||||||
|
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
|
||||||
|
|
||||||
|
SRpcMsg msg = {
|
||||||
|
.msgType = TDMT_VND_BATCH_DEL,
|
||||||
|
.pCont = serializedDeleteReq,
|
||||||
|
.contLen = len + sizeof(SMsgHead),
|
||||||
|
};
|
||||||
|
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||||
|
rpcFreeCont(serializedDeleteReq);
|
||||||
|
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SVCreateTbReq createTbReq = {0};
|
||||||
|
|
||||||
|
// set const
|
||||||
|
createTbReq.flags = 0;
|
||||||
|
createTbReq.type = TSDB_CHILD_TABLE;
|
||||||
|
createTbReq.ctb.suid = suid;
|
||||||
|
|
||||||
|
// set super table name
|
||||||
|
SName name = {0};
|
||||||
|
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
||||||
|
|
||||||
|
// set tag content
|
||||||
|
taosArrayClear(tagArray);
|
||||||
|
STagVal tagVal = {
|
||||||
|
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||||
|
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||||
|
.i64 = (int64_t)pDataBlock->info.groupId,
|
||||||
|
};
|
||||||
|
taosArrayPush(tagArray, &tagVal);
|
||||||
|
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
|
||||||
|
|
||||||
|
STag* pTag = NULL;
|
||||||
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
|
if (pTag == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosArrayDestroy(tagArray);
|
||||||
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
createTbReq.ctb.pTag = (uint8_t*)pTag;
|
||||||
|
|
||||||
|
// set tag name
|
||||||
|
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||||
|
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||||
|
strcpy(tagNameStr, "group_id");
|
||||||
|
taosArrayPush(tagName, tagNameStr);
|
||||||
|
createTbReq.ctb.tagName = tagName;
|
||||||
|
|
||||||
|
// set table name
|
||||||
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
|
createTbReq.name = strdup(pDataBlock->info.parTbName);
|
||||||
|
} else {
|
||||||
|
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schemaLen;
|
||||||
|
int32_t code;
|
||||||
|
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||||
|
if (code < 0) {
|
||||||
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
|
taosArrayDestroy(tagArray);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// save schema str
|
||||||
|
void* schemaStr = taosMemoryMalloc(schemaLen);
|
||||||
|
if (schemaStr == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
|
taosArrayDestroy(tagArray);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, schemaStr, schemaLen);
|
||||||
|
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
|
||||||
|
if (code < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
|
taosArrayDestroy(tagArray);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
taosMemoryFree(schemaStr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
tdDestroySVCreateTbReq(&createTbReq);
|
||||||
|
|
||||||
|
int32_t cap = sizeof(SSubmitReq);
|
||||||
|
|
||||||
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||||
|
|
||||||
|
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
|
||||||
|
|
||||||
|
SSubmitReq* ret = rpcMallocCont(cap);
|
||||||
|
ret->header.vgId = pVnode->config.vgId;
|
||||||
|
ret->length = sizeof(SSubmitReq);
|
||||||
|
ret->numOfBlocks = htonl(1);
|
||||||
|
|
||||||
|
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||||
|
|
||||||
|
blkHead->numOfRows = htonl(pDataBlock->info.rows);
|
||||||
|
blkHead->sversion = htonl(pTSchema->version);
|
||||||
|
blkHead->suid = htobe64(suid);
|
||||||
|
// uid is assigned by vnode
|
||||||
|
blkHead->uid = 0;
|
||||||
|
blkHead->schemaLen = 0;
|
||||||
|
|
||||||
|
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
|
||||||
|
|
||||||
|
int32_t dataLen = 0;
|
||||||
|
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
|
||||||
|
STSRow* rowData = blkSchema;
|
||||||
|
if (createTb) {
|
||||||
|
memcpy(blkSchema, schemaStr, schemaLen);
|
||||||
|
blkHead->schemaLen = htonl(schemaLen);
|
||||||
|
rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(schemaStr);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
|
SRowBuilder rb = {0};
|
||||||
|
tdSRowInit(&rb, pTSchema->version);
|
||||||
|
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
||||||
|
tdSRowResetBuf(&rb, rowData);
|
||||||
|
|
||||||
|
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||||
|
const STColumn* pColumn = &pTSchema->columns[k];
|
||||||
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
|
if (colDataIsNull_s(pColData, j)) {
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
|
||||||
|
} else {
|
||||||
|
void* colData = colDataGetData(pColData, j);
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tdSRowEnd(&rb);
|
||||||
|
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||||
|
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||||
|
dataLen += rowLen;
|
||||||
|
}
|
||||||
|
blkHead->dataLen = htonl(dataLen);
|
||||||
|
|
||||||
|
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
|
||||||
|
ret->length = htonl(ret->length);
|
||||||
|
|
||||||
|
SRpcMsg msg = {
|
||||||
|
.msgType = TDMT_VND_SUBMIT,
|
||||||
|
.pCont = ret,
|
||||||
|
.contLen = ntohl(ret->length),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||||
|
rpcFreeCont(ret);
|
||||||
|
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArrayDestroy(tagArray);
|
||||||
|
}
|
||||||
|
|
||||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
const SArray* pRes = (const SArray*)data;
|
const SArray* pRes = (const SArray*)data;
|
||||||
SVnode* pVnode = (SVnode*)vnode;
|
SVnode* pVnode = (SVnode*)vnode;
|
||||||
|
|
|
@ -52,7 +52,7 @@ class TDTestCase:
|
||||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
tmqCom.initConsumerTable()
|
tmqCom.initConsumerTable()
|
||||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1,wal_retention_period=-1)
|
||||||
tdLog.info("create stb")
|
tdLog.info("create stb")
|
||||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||||
# tdLog.info("create ctb")
|
# tdLog.info("create ctb")
|
||||||
|
|
Loading…
Reference in New Issue