Merge pull request #14914 from taosdata/feature/3.0_debug_wxy
fix: client module memory leak problem
This commit is contained in:
commit
84740bc786
|
@ -77,6 +77,7 @@ typedef struct SScanLogicNode {
|
||||||
SArray* pSmaIndexes;
|
SArray* pSmaIndexes;
|
||||||
SNodeList* pGroupTags;
|
SNodeList* pGroupTags;
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
|
int8_t cacheLastMode;
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
|
|
@ -152,6 +152,7 @@ typedef struct SRealTableNode {
|
||||||
char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES
|
char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES
|
||||||
double ratio;
|
double ratio;
|
||||||
SArray* pSmaIndexes;
|
SArray* pSmaIndexes;
|
||||||
|
int8_t cacheLastMode;
|
||||||
} SRealTableNode;
|
} SRealTableNode;
|
||||||
|
|
||||||
typedef struct STempTableNode {
|
typedef struct STempTableNode {
|
||||||
|
|
|
@ -934,7 +934,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
pRequest->body.execMode = pQuery->execMode;
|
pRequest->body.execMode = pQuery->execMode;
|
||||||
|
|
||||||
switch (pQuery->execMode) {
|
switch (pQuery->execMode) {
|
||||||
case QUERY_EXEC_MODE_LOCAL:
|
case QUERY_EXEC_MODE_LOCAL:
|
||||||
asyncExecLocalCmd(pRequest, pQuery);
|
asyncExecLocalCmd(pRequest, pQuery);
|
||||||
|
@ -1006,10 +1006,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (!keepQuery) {
|
|
||||||
// qDestroyQuery(pQuery);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
}
|
}
|
||||||
|
@ -1479,7 +1475,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
|
||||||
tsem_wait(&pParam->sem);
|
tsem_wait(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
if (setupOneRowPtr) {
|
if (setupOneRowPtr) {
|
||||||
|
|
|
@ -688,8 +688,10 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
} else {
|
} else {
|
||||||
destorySqlParseWrapper(pWrapper);
|
destorySqlParseWrapper(pWrapper);
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
|
@ -867,7 +869,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
} else {
|
} else {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,9 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTa
|
||||||
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_DNODE_VARIABLES))) {
|
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_DNODE_VARIABLES))) {
|
||||||
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
|
code = reserveDnodeRequiredInCache(pCxt->pMetaCache);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pCxt->pMetaCache);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1772,6 +1772,19 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t setTableCacheLastMode(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
|
||||||
|
if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDbCfgInfo dbCfg = {0};
|
||||||
|
int32_t code = getDBCfg(pCxt, pRealTable->table.dbName, &dbCfg);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pRealTable->cacheLastMode = dbCfg.cacheLast;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
switch (nodeType(pTable)) {
|
switch (nodeType(pTable)) {
|
||||||
|
@ -1791,6 +1804,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = setTableIndex(pCxt, &name, pRealTable);
|
code = setTableIndex(pCxt, &name, pRealTable);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = setTableCacheLastMode(pCxt, &name, pRealTable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision;
|
pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision;
|
||||||
pRealTable->table.singleTable = isSingleTable(pRealTable);
|
pRealTable->table.singleTable = isSingleTable(pRealTable);
|
||||||
|
@ -5413,11 +5429,12 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt,
|
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt,
|
||||||
const STag* pTag, uint64_t suid, const char* sTableNmae, SVgroupInfo* pVgInfo, SArray* tagName) {
|
const STag* pTag, uint64_t suid, const char* sTableNmae, SVgroupInfo* pVgInfo,
|
||||||
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
SArray* tagName) {
|
||||||
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
|
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
// strcpy(name.dbname, pStmt->dbName);
|
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
|
||||||
// tNameGetFullDbName(&name, dbFName);
|
// strcpy(name.dbname, pStmt->dbName);
|
||||||
|
// tNameGetFullDbName(&name, dbFName);
|
||||||
|
|
||||||
struct SVCreateTbReq req = {0};
|
struct SVCreateTbReq req = {0};
|
||||||
req.type = TD_CHILD_TABLE;
|
req.type = TD_CHILD_TABLE;
|
||||||
|
@ -5524,7 +5541,7 @@ static int32_t buildNormalTagVal(STranslateContext* pCxt, SSchema* pTagSchema, S
|
||||||
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
|
if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
|
||||||
void* nodeVal = nodesGetValueFromNode(pVal);
|
void* nodeVal = nodesGetValueFromNode(pVal);
|
||||||
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
||||||
// strcpy(val.colName, pTagSchema->name);
|
// strcpy(val.colName, pTagSchema->name);
|
||||||
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
||||||
val.pData = varDataVal(nodeVal);
|
val.pData = varDataVal(nodeVal);
|
||||||
val.nData = varDataLen(nodeVal);
|
val.nData = varDataLen(nodeVal);
|
||||||
|
@ -5621,7 +5638,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
|
||||||
} else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL && !pVal->isNull) {
|
} else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL && !pVal->isNull) {
|
||||||
char* tmpVal = nodesGetValueFromNode(pVal);
|
char* tmpVal = nodesGetValueFromNode(pVal);
|
||||||
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
||||||
// strcpy(val.colName, pTagSchema->name);
|
// strcpy(val.colName, pTagSchema->name);
|
||||||
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
||||||
val.pData = varDataVal(tmpVal);
|
val.pData = varDataVal(tmpVal);
|
||||||
val.nData = varDataLen(tmpVal);
|
val.nData = varDataLen(tmpVal);
|
||||||
|
@ -5664,7 +5681,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
|
||||||
code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
|
code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
STag* pTag = NULL;
|
STag* pTag = NULL;
|
||||||
SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -5680,7 +5697,8 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
|
||||||
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
|
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid, pStmt->useTableName, &info, tagName);
|
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid,
|
||||||
|
pStmt->useTableName, &info, tagName);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(tagName);
|
taosArrayDestroy(tagName);
|
||||||
|
|
|
@ -234,6 +234,8 @@ void generateDnodes(MockCatalogService* mcs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void generateDatabases(MockCatalogService* mcs) {
|
void generateDatabases(MockCatalogService* mcs) {
|
||||||
|
mcs->createDatabase(TSDB_INFORMATION_SCHEMA_DB);
|
||||||
|
mcs->createDatabase(TSDB_PERFORMANCE_SCHEMA_DB);
|
||||||
mcs->createDatabase("test");
|
mcs->createDatabase("test");
|
||||||
mcs->createDatabase("rollup_db", true);
|
mcs->createDatabase("rollup_db", true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -244,6 +244,7 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
|
||||||
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
|
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
|
||||||
pScan->ratio = pRealTable->ratio;
|
pScan->ratio = pRealTable->ratio;
|
||||||
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
pScan->cacheLastMode = pRealTable->cacheLastMode;
|
||||||
|
|
||||||
*pLogicNode = (SLogicNode*)pScan;
|
*pLogicNode = (SLogicNode*)pScan;
|
||||||
|
|
||||||
|
|
|
@ -124,9 +124,11 @@ static bool scanPathOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode->pParent))) {
|
QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode->pParent))) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if ((QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) && WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType) ||
|
if ((QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) &&
|
||||||
|
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType) ||
|
||||||
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent &&
|
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent &&
|
||||||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent) && WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType)) {
|
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent) &&
|
||||||
|
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode->pParent)) {
|
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode->pParent)) {
|
||||||
|
@ -1983,7 +1985,8 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) ||
|
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) ||
|
||||||
NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) ||
|
NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) ||
|
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) ||
|
||||||
NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions) {
|
NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions ||
|
||||||
|
0 == ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->cacheLastMode) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2010,7 +2013,10 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
||||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||||
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
|
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
|
||||||
pFunc->functionName[len] = '\0';
|
pFunc->functionName[len] = '\0';
|
||||||
fmGetFuncInfo(pFunc, NULL, 0);
|
int32_t code = fmGetFuncInfo(pFunc, NULL, 0);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pAgg->hasLastRow = false;
|
pAgg->hasLastRow = false;
|
||||||
|
|
||||||
|
|
|
@ -517,6 +517,8 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
|
||||||
pScan->groupSort = pScanLogicNode->groupSort;
|
pScan->groupSort = pScanLogicNode->groupSort;
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
|
|
||||||
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
|
|
||||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -106,6 +106,8 @@ TEST_F(PlanBasicTest, lastRowFunc) {
|
||||||
|
|
||||||
run("SELECT LAST_ROW(c1, c2) FROM t1");
|
run("SELECT LAST_ROW(c1, c2) FROM t1");
|
||||||
|
|
||||||
|
run("SELECT LAST_ROW(c1), c2 FROM t1");
|
||||||
|
|
||||||
run("SELECT LAST_ROW(c1) FROM st1");
|
run("SELECT LAST_ROW(c1) FROM st1");
|
||||||
|
|
||||||
run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME");
|
run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME");
|
||||||
|
|
Loading…
Reference in New Issue