[td-225] fix bugs in multi-vnode projection query.
This commit is contained in:
parent
e81204431e
commit
cfcbefba48
|
@ -145,7 +145,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
|
|||
return;
|
||||
}
|
||||
|
||||
// local reducer has handle this situation during super table non-projection query.
|
||||
// local merge has handle this situation during super table non-projection query.
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
|
||||
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
|
||||
}
|
||||
|
@ -222,9 +222,30 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
|
|||
tscResetForNextRetrieve(pRes);
|
||||
|
||||
// handle the sub queries of join query
|
||||
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
|
||||
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
||||
tscFetchDatablockFromSubquery(pSql);
|
||||
} else {
|
||||
} else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
|
||||
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
|
||||
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
|
||||
return;
|
||||
} else {
|
||||
/*
|
||||
* all available virtual node has been checked already, now we need to check
|
||||
* for the next subclause queries
|
||||
*/
|
||||
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
|
||||
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* 1. has reach the limitation
|
||||
* 2. no remain virtual nodes to be retrieved anymore
|
||||
*/
|
||||
(*pSql->fetchFp)(param, pSql, 0);
|
||||
}
|
||||
return;
|
||||
} else { // current query is not completed, continue retrieve from node
|
||||
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
|
||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||
}
|
||||
|
|
|
@ -1444,8 +1444,9 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
pRes->row = 0;
|
||||
pRes->completed = (pRes->numOfRows == 0);
|
||||
|
||||
uint8_t code = pRes->code;
|
||||
int32_t code = pRes->code;
|
||||
if (pRes->code == TSDB_CODE_SUCCESS) {
|
||||
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
|
||||
} else {
|
||||
|
|
|
@ -463,11 +463,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// current data are exhausted, fetch more data
|
||||
if (pRes->row >= pRes->numOfRows && pRes->completed != true &&
|
||||
// current data set are exhausted, fetch more data from node
|
||||
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql)) &&
|
||||
(pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
|
||||
pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
|
||||
pCmd->command == TSDB_SQL_FETCH ||
|
||||
pCmd->command == TSDB_SQL_SHOW ||
|
||||
pCmd->command == TSDB_SQL_SELECT ||
|
||||
|
|
|
@ -1203,7 +1203,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
|
|||
}
|
||||
}
|
||||
|
||||
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_METRIC_JOIN_RETRIEVE;
|
||||
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1949,7 +1949,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
|
|||
|
||||
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
|
||||
|
||||
if(pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
|
||||
if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
||||
if (pRes->completed) {
|
||||
tfree(pRes->tsrow);
|
||||
}
|
||||
|
|
|
@ -386,7 +386,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
|
|||
|
||||
int32_t cmd = pCmd->command;
|
||||
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||
cmd == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
|
||||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
|
||||
tscRemoveFromSqlList(pSql);
|
||||
}
|
||||
|
||||
|
@ -1644,9 +1644,11 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
|
|||
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
|
||||
tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
|
||||
|
||||
int32_t index = pQueryInfo->numOfTables;
|
||||
while (index >= 0) {
|
||||
doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache);
|
||||
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
|
||||
|
||||
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
|
||||
free(pTableMetaInfo);
|
||||
}
|
||||
|
||||
tfree(pQueryInfo->pTableMetaInfo);
|
||||
|
@ -1698,16 +1700,8 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
|
|||
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
|
||||
tfree(pTableMetaInfo->vgroupList);
|
||||
|
||||
if (pTableMetaInfo->tagColList != NULL) {
|
||||
size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
|
||||
for(int32_t i = 0; i < numOfTags; ++i) { // todo do NOT use the allocated object
|
||||
SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
|
||||
tfree(pCol);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTableMetaInfo->tagColList);
|
||||
pTableMetaInfo->tagColList = NULL;
|
||||
}
|
||||
tscColumnListDestroy(pTableMetaInfo->tagColList);
|
||||
pTableMetaInfo->tagColList = NULL;
|
||||
}
|
||||
|
||||
void tscResetForNextRetrieve(SSqlRes* pRes) {
|
||||
|
@ -1991,22 +1985,28 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
|
|||
|
||||
/**
|
||||
* If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
|
||||
* in case of multi-vnode super table projection query and the result does not reach the limitation.
|
||||
* while multi-vnode super table projection query and the result does not reach the limitation.
|
||||
*/
|
||||
bool hasMoreVnodesToTry(SSqlObj* pSql) {
|
||||
// SSqlCmd* pCmd = &pSql->cmd;
|
||||
// SSqlRes* pRes = &pSql->res;
|
||||
|
||||
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
if (pCmd->command != TSDB_SQL_FETCH) {
|
||||
return false;
|
||||
// }
|
||||
}
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
|
||||
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
|
||||
// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
|
||||
// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < totalVnode - 1);
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
assert(pRes->completed);
|
||||
|
||||
// for normal table, do not try any more if result are exhausted
|
||||
if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
|
||||
return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
|
||||
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
|
||||
}
|
||||
|
||||
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
||||
|
@ -2022,12 +2022,11 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
|
||||
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
int32_t totalVnode = 0;
|
||||
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
|
||||
|
||||
while (++pTableMetaInfo->vgroupIndex < totalVnode) {
|
||||
|
||||
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
|
||||
while (++pTableMetaInfo->vgroupIndex < totalVgroups) {
|
||||
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
|
||||
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotalInCurrentClause);
|
||||
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfTotalInCurrentClause);
|
||||
|
||||
/*
|
||||
* update the limit and offset value for the query on the next vnode,
|
||||
|
@ -2043,10 +2042,10 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
}
|
||||
|
||||
pQueryInfo->limit.offset = pRes->offset;
|
||||
|
||||
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
|
||||
tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql,
|
||||
pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
|
||||
|
||||
tscTrace("%p new query to next vgroup, index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64,
|
||||
pSql, pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
|
||||
|
||||
/*
|
||||
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
|
||||
|
@ -2060,43 +2059,46 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
tscResetForNextRetrieve(pRes);
|
||||
|
||||
// in case of async query, set the callback function
|
||||
void* fp1 = pSql->fp;
|
||||
// void* fp1 = pSql->fp;
|
||||
pSql->fp = fp;
|
||||
|
||||
if (fp1 != NULL) {
|
||||
assert(fp != NULL);
|
||||
// if (fp1 != NULL) {
|
||||
// assert(fp != NULL);
|
||||
// }
|
||||
|
||||
int32_t ret = tscProcessSql(pSql);
|
||||
if (ret == TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
} else {// todo check for failure
|
||||
}
|
||||
|
||||
int32_t ret = tscProcessSql(pSql); // todo check for failure
|
||||
|
||||
// in case of async query, return now
|
||||
if (fp != NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
pSql->res.code = ret;
|
||||
return;
|
||||
}
|
||||
|
||||
// retrieve data
|
||||
assert(pCmd->command == TSDB_SQL_SELECT);
|
||||
pCmd->command = TSDB_SQL_FETCH;
|
||||
|
||||
if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
|
||||
pSql->res.code = ret;
|
||||
return;
|
||||
}
|
||||
|
||||
// if the result from current virtual node are empty, try next if exists. otherwise, return the results.
|
||||
if (pRes->numOfRows > 0) {
|
||||
break;
|
||||
}
|
||||
// if (fp != NULL) {
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// if (ret != TSDB_CODE_SUCCESS) {
|
||||
// pSql->res.code = ret;
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// // retrieve data
|
||||
// assert(pCmd->command == TSDB_SQL_SELECT);
|
||||
// pCmd->command = TSDB_SQL_FETCH;
|
||||
//
|
||||
// if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
|
||||
// pSql->res.code = ret;
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// // if the result from current virtual node are empty, try next if exists. otherwise, return the results.
|
||||
// if (pRes->numOfRows > 0) {
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
|
||||
if (pRes->numOfRows == 0) {
|
||||
tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
|
||||
}
|
||||
// if (pRes->numOfRows == 0) {
|
||||
// tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
|
||||
// }
|
||||
}
|
||||
|
||||
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
|
||||
|
|
|
@ -61,7 +61,7 @@ enum _sql_type {
|
|||
TSDB_SQL_LOCAL, // SQL below for client local
|
||||
TSDB_SQL_DESCRIBE_TABLE,
|
||||
TSDB_SQL_RETRIEVE_LOCALMERGE,
|
||||
TSDB_SQL_METRIC_JOIN_RETRIEVE,
|
||||
TSDB_SQL_TABLE_JOIN_RETRIEVE,
|
||||
|
||||
/*
|
||||
* build empty result instead of accessing dnode to fetch result
|
||||
|
|
Loading…
Reference in New Issue