[td-1067]
This commit is contained in:
parent
0fc1c8cdb5
commit
51290fd1aa
|
@ -193,7 +193,7 @@ typedef struct SQInfo {
|
||||||
int64_t owner; // if it is in execution
|
int64_t owner; // if it is in execution
|
||||||
void* tsdb;
|
void* tsdb;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
|
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
|
||||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||||
SQueryRuntimeEnv runtimeEnv;
|
SQueryRuntimeEnv runtimeEnv;
|
||||||
SArray* arrTableIdInfo;
|
SArray* arrTableIdInfo;
|
||||||
|
|
|
@ -124,6 +124,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
||||||
|
|
||||||
#define calloc u_calloc
|
#define calloc u_calloc
|
||||||
#define malloc u_malloc
|
#define malloc u_malloc
|
||||||
|
#define realloc u_realloc
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||||
|
@ -3146,7 +3147,8 @@ static void setupQueryRangeForReverseScan(SQInfo* pQInfo) {
|
||||||
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||||
updateTableQueryInfoForReverseScan(pQuery, pCheckInfo);
|
updateTableQueryInfoForReverseScan(pQuery, pCheckInfo);
|
||||||
|
|
||||||
// update the last key in tableKeyInfo list
|
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
|
||||||
|
// the start check timestamp of tsdbQueryHandle
|
||||||
STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
|
STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
|
||||||
pTableKeyInfo->lastKey = pCheckInfo->lastKey;
|
pTableKeyInfo->lastKey = pCheckInfo->lastKey;
|
||||||
|
|
||||||
|
@ -3607,12 +3609,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo) {
|
||||||
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo);
|
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define CHECK_QUERY_TIME_RANGE(_q, _tableInfo) \
|
|
||||||
do { \
|
|
||||||
assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_q)) || \
|
|
||||||
(((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_q))); \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* set output buffer for different group
|
* set output buffer for different group
|
||||||
* @param pRuntimeEnv
|
* @param pRuntimeEnv
|
||||||
|
@ -4316,6 +4312,34 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
if (isFirstLastRowQuery(pQuery)) {
|
if (isFirstLastRowQuery(pQuery)) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
|
|
||||||
|
// update the query time window
|
||||||
|
pQuery->window = cond.twindow;
|
||||||
|
|
||||||
|
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
SArray *tableKeyGroup = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i);
|
||||||
|
|
||||||
|
size_t t = taosArrayGetSize(group);
|
||||||
|
for (int32_t j = 0; j < t; ++j) {
|
||||||
|
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||||
|
updateTableQueryInfoForReverseScan(pQuery, pCheckInfo);
|
||||||
|
|
||||||
|
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
|
||||||
|
// the start check timestamp of tsdbQueryHandle
|
||||||
|
STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j);
|
||||||
|
pCheckInfo->win.skey = pTableKeyInfo->lastKey;
|
||||||
|
pCheckInfo->win.ekey = pTableKeyInfo->lastKey;
|
||||||
|
|
||||||
|
pCheckInfo->lastKey = pTableKeyInfo->lastKey;
|
||||||
|
|
||||||
|
assert(pCheckInfo->pTable == pTableKeyInfo->pTable);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
} else if (isPointInterpoQuery(pQuery)) {
|
} else if (isPointInterpoQuery(pQuery)) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
} else {
|
} else {
|
||||||
|
@ -4361,6 +4385,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
|
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
|
||||||
|
|
||||||
setScanLimitationByResultBuffer(pQuery);
|
setScanLimitationByResultBuffer(pQuery);
|
||||||
|
|
||||||
|
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
|
||||||
|
// TODO fixme
|
||||||
changeExecuteScanOrder(pQInfo, false);
|
changeExecuteScanOrder(pQInfo, false);
|
||||||
|
|
||||||
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||||
|
@ -4512,7 +4539,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->current = *pTableQueryInfo;
|
pQuery->current = *pTableQueryInfo;
|
||||||
CHECK_QUERY_TIME_RANGE(pQuery, *pTableQueryInfo);
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
assert(
|
||||||
|
((*pTableQueryInfo)->win.skey <= (*pTableQueryInfo)->win.ekey) &&
|
||||||
|
((*pTableQueryInfo)->lastKey >= (*pTableQueryInfo)->win.skey) &&
|
||||||
|
((*pTableQueryInfo)->win.skey >= pQuery->window.skey && (*pTableQueryInfo)->win.ekey <= pQuery->window.ekey));
|
||||||
|
} else {
|
||||||
|
assert(
|
||||||
|
((*pTableQueryInfo)->win.skey >= (*pTableQueryInfo)->win.ekey) &&
|
||||||
|
((*pTableQueryInfo)->lastKey <= (*pTableQueryInfo)->win.skey) &&
|
||||||
|
((*pTableQueryInfo)->win.skey <= pQuery->window.skey && (*pTableQueryInfo)->win.ekey >= pQuery->window.ekey));
|
||||||
|
}
|
||||||
|
|
||||||
if (!pRuntimeEnv->groupbyNormalCol) {
|
if (!pRuntimeEnv->groupbyNormalCol) {
|
||||||
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
|
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
|
||||||
|
@ -4653,7 +4690,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
|
|
||||||
if (isFirstLastRowQuery(pQuery)) {
|
if (isFirstLastRowQuery(pQuery)) {
|
||||||
assert(0); // last_row query switch to other routine to handle
|
assert(0); // last_row query switch to other routine to handle
|
||||||
// pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
|
||||||
} else {
|
} else {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
}
|
}
|
||||||
|
@ -6069,6 +6105,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
||||||
if (p1 == NULL) {
|
if (p1 == NULL) {
|
||||||
goto _cleanup;
|
goto _cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
|
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
|
||||||
|
|
||||||
for(int32_t j = 0; j < s; ++j) {
|
for(int32_t j = 0; j < s; ++j) {
|
||||||
|
@ -6076,13 +6113,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
||||||
|
|
||||||
STableId* id = TSDB_TABLEID(info->pTable);
|
STableId* id = TSDB_TABLEID(info->pTable);
|
||||||
STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
|
STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
|
||||||
if (pTableId != NULL ) {
|
|
||||||
window.skey = pTableId->key;
|
|
||||||
} else {
|
|
||||||
window.skey = pQueryMsg->window.skey;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
window.skey = (pTableId != NULL)? pTableId->key:pQueryMsg->window.skey;
|
||||||
void* buf = (char*)pQInfo->pBuf + index * sizeof(STableQueryInfo);
|
void* buf = (char*)pQInfo->pBuf + index * sizeof(STableQueryInfo);
|
||||||
|
|
||||||
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf);
|
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf);
|
||||||
if (item == NULL) {
|
if (item == NULL) {
|
||||||
goto _cleanup;
|
goto _cleanup;
|
||||||
|
|
Loading…
Reference in New Issue