[td-225]fix memory leak.
This commit is contained in:
parent
0f22b3d41b
commit
e9db267bc5
|
@ -7137,25 +7137,29 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
false, HASH_NO_LOCK);
|
|
||||||
|
|
||||||
SArray* tableNameList = taosArrayInit(4, sizeof(SName));
|
SArray* tableNameList = NULL;
|
||||||
|
SArray* pVgroupList = NULL;
|
||||||
|
SArray* plist = NULL;
|
||||||
|
|
||||||
|
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
tableNameList = taosArrayInit(4, sizeof(SName));
|
||||||
int32_t size = taosArrayGetSize(pInfo->list);
|
int32_t size = taosArrayGetSize(pInfo->list);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i);
|
SSqlNode* pSqlNode = taosArrayGetP(pInfo->list, i);
|
||||||
|
|
||||||
// load the table meta in the from clause
|
// load the table meta in the from clause
|
||||||
if (pSqlNode->from->type == SQL_NODE_FROM_TABLELIST) {
|
if (pSqlNode->from->type == SQL_NODE_FROM_TABLELIST) {
|
||||||
int32_t code = getTableNameFromSqlNode(pSqlNode, tableNameList, tscGetErrorMsgPayload(pCmd), pSql);
|
code = getTableNameFromSqlNode(pSqlNode, tableNameList, tscGetErrorMsgPayload(pCmd), pSql);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
goto _end;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t code = getTableNameFromSubquery(pSqlNode, tableNameList, tscGetErrorMsgPayload(pCmd), pSql);
|
code = getTableNameFromSubquery(pSqlNode, tableNameList, tscGetErrorMsgPayload(pCmd), pSql);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7164,12 +7168,15 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(tableNameList);
|
int32_t numOfTables = taosArrayGetSize(tableNameList);
|
||||||
STableMeta* pTableMeta = calloc(1, maxSize);
|
|
||||||
|
|
||||||
SArray* plist = taosArrayInit(4, POINTER_BYTES);
|
char buf[80 * 1024] = {0};
|
||||||
SArray* pVgroupList = taosArrayInit(4, POINTER_BYTES);
|
assert(maxSize < 80 * 1024);
|
||||||
|
STableMeta* pTableMeta = (STableMeta*)buf;
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
plist = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
pVgroupList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
SName* pname = taosArrayGet(tableNameList, i);
|
SName* pname = taosArrayGet(tableNameList, i);
|
||||||
tNameExtractFullName(pname, name);
|
tNameExtractFullName(pname, name);
|
||||||
|
|
||||||
|
@ -7179,8 +7186,12 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
|
||||||
if (pTableMeta->id.uid > 0) {
|
if (pTableMeta->id.uid > 0) {
|
||||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
||||||
int32_t code = tscCreateTableMetaFromCChildMeta(pTableMeta, name);
|
code = tscCreateTableMetaFromCChildMeta(pTableMeta, name);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // add to retrieve list
|
|
||||||
|
// create the child table meta from super table failed, try load it from mnode
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
char* t = strdup(name);
|
||||||
|
taosArrayPush(plist, &t);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
|
@ -7191,27 +7202,37 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
||||||
STableMetaVgroupInfo p = {.pTableMeta = pMeta,};
|
|
||||||
|
STableMetaVgroupInfo p = { .pTableMeta = pMeta };
|
||||||
|
|
||||||
const char* px = tNameGetTableName(pname);
|
const char* px = tNameGetTableName(pname);
|
||||||
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
|
taosHashPut(pCmd->pTableMetaMap, px, strlen(px), &p, sizeof(STableMetaVgroupInfo));
|
||||||
} else {// add to the retrieve table meta array list.
|
} else { // add to the retrieve table meta array list.
|
||||||
char* t = strdup(name);
|
char* t = strdup(name);
|
||||||
taosArrayPush(plist, &t);
|
taosArrayPush(plist, &t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pTableMeta);
|
|
||||||
|
|
||||||
// load the table meta for a given table name list
|
// load the table meta for a given table name list
|
||||||
if (taosArrayGetSize(plist) > 0) {
|
if (taosArrayGetSize(plist) > 0 || taosArrayGetSize(pVgroupList) > 0) {
|
||||||
int32_t code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList);
|
code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList);
|
||||||
taosArrayDestroyEx(plist, freeElem);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
_end:
|
||||||
|
|
||||||
|
if (plist != NULL) {
|
||||||
|
taosArrayDestroyEx(plist, freeElem);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVgroupList != NULL) {
|
||||||
|
taosArrayDestroy(pVgroupList);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tableNameList != NULL) {
|
||||||
|
taosArrayDestroy(tableNameList);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, int32_t numOfTables) {
|
static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, int32_t numOfTables) {
|
||||||
|
|
|
@ -1009,6 +1009,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
||||||
}
|
}
|
||||||
|
|
||||||
pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num);
|
pSourceOperator = createJoinOperator(p, px->numOfTables, schema, num);
|
||||||
|
tfree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
SExprInfo* exprInfo = NULL;
|
SExprInfo* exprInfo = NULL;
|
||||||
|
@ -1027,6 +1028,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
||||||
px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
|
px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
|
||||||
tfree(pColumnInfo);
|
tfree(pColumnInfo);
|
||||||
tfree(schema);
|
tfree(schema);
|
||||||
|
tfree(exprInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t qId = 0;
|
uint64_t qId = 0;
|
||||||
|
@ -3228,8 +3230,15 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
|
||||||
SSqlObj* pParentSql = ps->pParentSql;
|
SSqlObj* pParentSql = ps->pParentSql;
|
||||||
SSqlObj* pSql = tres;
|
SSqlObj* pSql = tres;
|
||||||
|
|
||||||
if (!subAndCheckDone(pSql, pParentSql, ps->subqueryIndex)) {
|
int32_t index = ps->subqueryIndex;
|
||||||
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, ps->subqueryIndex);
|
bool ret = subAndCheckDone(pSql, pParentSql, index);
|
||||||
|
|
||||||
|
// TODO refactor
|
||||||
|
tfree(ps);
|
||||||
|
pSql->param = NULL;
|
||||||
|
|
||||||
|
if (!ret) {
|
||||||
|
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" orderOfSub:%d completed, not all subquery finished", pParentSql->self, pSql->self, index);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3241,28 +3250,9 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
|
||||||
schedMsg.thandle = (void *)1;
|
schedMsg.thandle = (void *)1;
|
||||||
schedMsg.msg = 0;
|
schedMsg.msg = 0;
|
||||||
taosScheduleTask(tscQhandle, &schedMsg);
|
taosScheduleTask(tscQhandle, &schedMsg);
|
||||||
|
|
||||||
// merge all subquery result
|
|
||||||
// SSqlCmd* pCmd = &pSql->cmd;
|
|
||||||
// SSqlRes* pRes = &pSql->res;
|
|
||||||
|
|
||||||
// add it to the message queue
|
|
||||||
|
|
||||||
// SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
|
||||||
// /*TAOS_ROW* pRow = */taos_fetch_row(pSql);
|
|
||||||
// if (pSql->res.numOfRows > 0) {
|
|
||||||
// handleDownstreamOperator(pRes, pQueryInfo, &pParentSql->res);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// code = pParentSql->res.code;
|
|
||||||
// pParentSql->res.qId = -1;
|
|
||||||
// if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
|
|
||||||
// (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
|
|
||||||
// } else {
|
|
||||||
// tscAsyncResultOnError(pParentSql);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo handle the failure
|
||||||
static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
|
static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
|
||||||
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
|
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
|
||||||
}
|
}
|
||||||
|
@ -3280,6 +3270,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
||||||
|
|
||||||
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
|
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
|
||||||
pSql->subState.numOfSub = taosArrayGetSize(pQueryInfo->pUpstream);
|
pSql->subState.numOfSub = taosArrayGetSize(pQueryInfo->pUpstream);
|
||||||
|
|
||||||
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
|
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
|
||||||
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
|
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
|
||||||
|
|
||||||
|
@ -3289,6 +3280,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
||||||
pSql->cmd.active = pSub;
|
pSql->cmd.active = pSub;
|
||||||
pSql->cmd.command = TSDB_SQL_SELECT;
|
pSql->cmd.command = TSDB_SQL_SELECT;
|
||||||
|
|
||||||
|
// TODO handle memory failure
|
||||||
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
|
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
|
||||||
if (pNew == NULL) {
|
if (pNew == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
@ -3321,6 +3313,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
|
||||||
// create sub query to handle the sub query.
|
// create sub query to handle the sub query.
|
||||||
executeQuery(pNew, pSub);
|
executeQuery(pNew, pSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
// merge sub query result and generate final results
|
// merge sub query result and generate final results
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue