fix: memory issues

This commit is contained in:
dapan1121 2024-06-07 09:25:17 +08:00
parent ddf15fa55c
commit 3f99e5e064
9 changed files with 6389 additions and 83 deletions

View File

@ -133,7 +133,8 @@ int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData** pData
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx);
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema);
int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema, SBindInfo* pBindInfos);
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t rowNum);
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);

View File

@ -227,23 +227,22 @@ typedef struct STableDataCxt {
} STableDataCxt;
typedef struct SStbInterlaceInfo {
void* pCatalog;
void* pQuery;
int32_t acctId;
char* dbname;
void* transport;
SEpSet mgmtEpSet;
void* pRequest;
uint64_t requestId;
int64_t requestSelf;
bool tbFromHash;
SHashObj* pVgroupHash;
SArray* pVgroupList;
SSHashObj* pTableHash;
int64_t tbRemainNum;
STableBufInfo tbBuf;
char firstName[TSDB_TABLE_NAME_LEN];
void* pCatalog;
void* pQuery;
int32_t acctId;
char* dbname;
void* transport;
SEpSet mgmtEpSet;
void* pRequest;
uint64_t requestId;
int64_t requestSelf;
bool tbFromHash;
SHashObj* pVgroupHash;
SArray* pVgroupList;
SSHashObj* pTableHash;
int64_t tbRemainNum;
STableBufInfo tbBuf;
char firstName[TSDB_TABLE_NAME_LEN];
STSchema *pTSchema;
STableDataCxt *pDataCtx;
void *boundTags;

6206
source/a Normal file

File diff suppressed because it is too large Load Diff

View File

@ -100,6 +100,7 @@ typedef struct SStmtSQLInfo {
SStmtQueryResInfo queryRes;
bool autoCreateTbl;
SHashObj *pVgHash;
SBindInfo *pBindInfo;
SStbInterlaceInfo siInfo;
} SStmtSQLInfo;

View File

@ -334,24 +334,33 @@ int32_t stmtParseSql(STscStmt* pStmt) {
} else if (pStmt->sql.pQuery->pPrepareRoot) {
pStmt->sql.type = STMT_TYPE_QUERY;
pStmt->sql.stbInterlaceMode = false;
return TSDB_CODE_SUCCESS;
}
if (pStmt->sql.stbInterlaceMode) {
STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (!pSrc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableDataCxt** pSrc = (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pSrc || NULL == *pSrc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableDataCxt* pDataBlock = *pSrc;
STableDataCxt* pTableCtx = *pSrc;
if (pStmt->sql.stbInterlaceMode) {
int16_t lastIdx = -1;
for (int32_t i = 0; i < pDataBlock->boundColsInfo.numOfBound; ++i) {
if (pDataBlock->boundColsInfo.pColIndex[i] < lastIdx) {
for (int32_t i = 0; i < pTableCtx->boundColsInfo.numOfBound; ++i) {
if (pTableCtx->boundColsInfo.pColIndex[i] < lastIdx) {
pStmt->sql.stbInterlaceMode = false;
break;
}
lastIdx = pDataBlock->boundColsInfo.pColIndex[i];
lastIdx = pTableCtx->boundColsInfo.pColIndex[i];
}
}
if (NULL == pStmt->sql.pBindInfo) {
pStmt->sql.pBindInfo = taosMemoryMalloc(pTableCtx->boundColsInfo.numOfBound * sizeof(*pStmt->sql.pBindInfo));
if (NULL == pStmt->sql.pBindInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
@ -393,8 +402,18 @@ void stmtResetQueueTableBuf(STableBufInfo* pTblBuf, SStmtQueue* pQueue) {
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
if (pStmt->sql.stbInterlaceMode) {
pStmt->sql.siInfo.pTableColsIdx = 0;
stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
if (deepClean) {
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
if (NULL != pStmt->exec.pCurrBlock) {
taosMemoryFreeClear(pStmt->exec.pCurrBlock->pData);
qDestroyStmtDataBlock(pStmt->exec.pCurrBlock);
}
} else {
pStmt->sql.siInfo.pTableColsIdx = 0;
stmtResetQueueTableBuf(&pStmt->sql.siInfo.tbBuf, &pStmt->queue);
}
} else {
if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
taos_free_result(pStmt->exec.pRequest);
@ -438,9 +457,20 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
return TSDB_CODE_SUCCESS;
}
void stmtFreeTbBuf(void *buf) {
void* pBuf = *(void**)buf;
taosMemoryFree(pBuf);
}
void stmtFreeTbCols(void *buf) {
SArray* pCols = *(SArray**)buf;
taosArrayDestroy(pCols);
}
int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
STMT_DLOG_E("start to free SQL info");
taosMemoryFree(pStmt->sql.pBindInfo);
taosMemoryFree(pStmt->sql.queryRes.fields);
taosMemoryFree(pStmt->sql.queryRes.userFields);
taosMemoryFree(pStmt->sql.sqlStr);
@ -465,6 +495,14 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
taos_free_result(pStmt->sql.siInfo.pRequest);
taosHashCleanup(pStmt->sql.siInfo.pVgroupHash);
tSimpleHashCleanup(pStmt->sql.siInfo.pTableHash);
taosArrayDestroyEx(pStmt->sql.siInfo.tbBuf.pBufList, stmtFreeTbBuf);
taosMemoryFree(pStmt->sql.siInfo.pTSchema);
qDestroyStmtDataBlock(pStmt->sql.siInfo.pDataCtx);
taosArrayDestroyEx(pStmt->sql.siInfo.pTableCols, stmtFreeTbCols);
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
STMT_DLOG_E("end to free SQL info");
@ -853,6 +891,32 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (!pSrc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableDataCxt* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
pStmt->sql.siInfo.pDataCtx = pDst;
SArray* pTblCols = NULL;
for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
pTblCols = taosArrayInit(20, POINTER_BYTES);
if (NULL == pTblCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols);
}
pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
return TSDB_CODE_SUCCESS;
}
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STscStmt* pStmt = (STscStmt*)stmt;
@ -892,26 +956,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
}
if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (!pSrc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableDataCxt* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
pStmt->sql.siInfo.pDataCtx = pDst;
SArray* pTblCols = NULL;
for (int32_t i = 0; i < STMT_TABLE_COLS_NUM; i++) {
pTblCols = taosArrayDup(pDst->pData->aCol, NULL);
if (NULL == pTblCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pStmt->sql.siInfo.pTableCols, &pTblCols);
}
pStmt->sql.siInfo.boundTags = pStmt->bInfo.boundTags;
STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
}
int64_t startUs2 = taosGetTimestampUs();
@ -1052,7 +1097,7 @@ static FORCE_INLINE int32_t stmtGetTableColsFromCache(STscStmt* pStmt, SArray**
} else {
SArray* pTblCols = NULL;
for (int32_t i = 0; i < 100; i++) {
pTblCols = taosArrayDup(pStmt->sql.siInfo.pDataCtx->pData->aCol, NULL);
pTblCols = taosArrayInit(20, POINTER_BYTES);
if (NULL == pTblCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -1131,6 +1176,10 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
return TSDB_CODE_SUCCESS;
}
if (pStmt->sql.stbInterlaceMode && NULL == pStmt->sql.siInfo.pDataCtx) {
STMT_ERR_RET(stmtInitStbInterlaceTableInfo(pStmt));
}
STableDataCxt** pDataBlock = NULL;
if (pStmt->exec.pCurrBlock) {
@ -1143,6 +1192,10 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
}
pStmt->exec.pCurrBlock = *pDataBlock;
if (pStmt->sql.stbInterlaceMode) {
taosArrayDestroy(pStmt->exec.pCurrBlock->pData->aCol);
pStmt->exec.pCurrBlock->pData->aCol = NULL;
}
}
int64_t startUs2 = taosGetTimestampUs();
@ -1151,13 +1204,10 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
SStmtQNode* param = NULL;
if (pStmt->sql.stbInterlaceMode) {
STMT_ERR_RET(stmtAllocQNodeFromBuf(&pStmt->sql.siInfo.tbBuf, (void**)&param));
STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
//STMT_ERR_RET(stmtGetTableColsFromCache(pStmt, &param->tblData.aCol));
//taosArrayClear(param->tblData.aCol);
int32_t colNum = taosArrayGetSize(param->tblData.aCol);
for (int32_t i = 0; i < colNum; ++i) {
SColData* pCol = (SColData*)taosArrayGet(param->tblData.aCol, i);
tColDataClear(pCol);
}
param->tblData.aCol = taosArrayInit(20, POINTER_BYTES);
strcpy(param->tblData.tbName, pStmt->bInfo.tbName);
}
@ -1168,7 +1218,13 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
SArray* pCols = pStmt->sql.stbInterlaceMode ? param->tblData.aCol : (*pDataBlock)->pData->aCol;
if (colIdx < 0) {
code = qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema);
if (pStmt->sql.stbInterlaceMode) {
(*pDataBlock)->pData->flags = 0;
code = qBindStmtStbColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, &pStmt->sql.siInfo.pTSchema, pStmt->sql.pBindInfo);
} else {
code = qBindStmtColsValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
}
if (code) {
tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
STMT_ERR_RET(code);

View File

@ -9715,7 +9715,6 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
}
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
#if 0
int32_t nColData = TARRAY_SIZE(pTbData->aCol);
SColData *aColData = (SColData *)TARRAY_DATA(pTbData->aCol);
@ -9723,13 +9722,13 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
tColDataDestroy(&aColData[i]);
}
taosArrayDestroy(pTbData->aCol);
#endif
} else {
int32_t nRow = TARRAY_SIZE(pTbData->aRowP);
SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP);
for (int32_t i = 0; i < nRow; ++i) {
tRowDestroy(rows[i]);
rows[i] = NULL;
}
taosArrayDestroy(pTbData->aRowP);
}

View File

@ -276,7 +276,7 @@ int32_t convertStmtNcharCol(SMsgBuf* pMsgBuf, SSchema* pSchema, TAOS_MULTI_BIND*
return TSDB_CODE_SUCCESS;
}
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema) {
int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, STSchema** pTSchema, SBindInfo* pBindInfos) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
@ -285,7 +285,6 @@ int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, c
TAOS_MULTI_BIND ncharBind = {0};
TAOS_MULTI_BIND* pBind = NULL;
int32_t code = 0;
SBindInfo bindInfos[100];
int16_t lastColId = -1;
bool colInOrder = true;
@ -322,9 +321,9 @@ int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, c
pBind = bind + c;
}
bindInfos[c].columnId = pColSchema->colId;
bindInfos[c].bind = pBind;
bindInfos[c].type = pColSchema->type;
pBindInfos[c].columnId = pColSchema->colId;
pBindInfos[c].bind = pBind;
pBindInfos[c].type = pColSchema->type;
//code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
//if (code) {
@ -332,7 +331,7 @@ int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, c
//}
}
code = tRowBuildFromBind(bindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
@ -344,6 +343,57 @@ _return:
return code;
}
int32_t qBindStmtColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = bind->num;
TAOS_MULTI_BIND ncharBind = {0};
TAOS_MULTI_BIND* pBind = NULL;
int32_t code = 0;
for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
SColData* pCol = taosArrayGet(pCols, c);
if (bind[c].num != rowNum) {
code = buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
goto _return;
}
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
goto _return;
}
if (TSDB_DATA_TYPE_NCHAR == pColSchema->type) {
code = convertStmtNcharCol(&pBuf, pColSchema, bind + c, &ncharBind);
if (code) {
goto _return;
}
pBind = &ncharBind;
} else {
pBind = bind + c;
}
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
if (code) {
goto _return;
}
}
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
_return:
taosMemoryFree(ncharBind.buffer);
taosMemoryFree(ncharBind.length);
return code;
}
int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t rowNum) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;

View File

@ -577,12 +577,7 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
uint64_t uid;
int32_t vgId;
pTbCtx->pData->aCol = pTbData->aCol;
SColData* pCol = taosArrayGet(pTbCtx->pData->aCol, 0);
if (pCol->nVal <= 0) {
return TSDB_CODE_SUCCESS;
}
pTbCtx->pData->aRowP = pTbData->aCol;
code = insGetStmtTableVgUid(pAllVgHash, pBuildInfo, pTbData, &uid, &vgId);
if (TSDB_CODE_SUCCESS != code) {
@ -593,13 +588,12 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
pTbCtx->pMeta->uid = uid;
pTbCtx->pData->uid = uid;
if (pTbCtx->pData->pCreateTbReq) {
pTbCtx->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
if (!pTbCtx->ordered) {
code = tRowSort(pTbCtx->pData->aRowP);
}
if (code == TSDB_CODE_SUCCESS && (!pTbCtx->ordered || pTbCtx->duplicateTs)) {
code = tRowMerge(pTbCtx->pData->aRowP, pTbCtx->pSchema, 0);
}
taosArraySort(pTbCtx->pData->aCol, insColDataComp);
tColDataSortMerge(pTbCtx->pData->aCol);
if (TSDB_CODE_SUCCESS != code) {
return code;
@ -622,12 +616,12 @@ int32_t insAppendStmtTableDataCxt(SHashObj* pAllVgHash, STableColsData* pTbData,
code = fillVgroupDataCxt(pTbCtx, pVgCxt, false, false);
}
/*
if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 1000) {
if (taosArrayGetSize(pVgCxt->pData->aSubmitTbData) >= 20000) {
code = qBuildStmtFinOutput1((SQuery*)pBuildInfo->pQuery, pAllVgHash, pBuildInfo->pVgroupList);
taosArrayClear(pVgCxt->pData->aSubmitTbData);
//taosArrayClear(pVgCxt->pData->aSubmitTbData);
tDestroySubmitReq(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
//insDestroyVgroupDataCxt(pVgCxt);
}
*/
return code;
}

View File

@ -1268,7 +1268,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
#if 1
#if 0
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;