[td-225] fix bugs in group by query
This commit is contained in:
parent
56891d464e
commit
ed09566e0d
|
@ -3609,20 +3609,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
|
|||
return loadPrimaryTS;
|
||||
}
|
||||
|
||||
static int32_t getNumOfSubset(SQInfo *pQInfo) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
int32_t totalSubset = 0;
|
||||
if (pQInfo->runtimeEnv.groupbyNormalCol || (QUERY_IS_INTERVAL_QUERY(pQuery))) {
|
||||
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
|
||||
} else {
|
||||
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||
}
|
||||
|
||||
return totalSubset;
|
||||
}
|
||||
|
||||
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orderType) {
|
||||
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_t orderType) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -3631,17 +3618,18 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
|
|||
int32_t step = -1;
|
||||
|
||||
qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
|
||||
int32_t totalSubset = getNumOfSubset(pQInfo);
|
||||
int32_t totalSet = numOfClosedTimeWindow(pResultInfo);
|
||||
SWindowResult* result = pResultInfo->pResult;
|
||||
|
||||
if (orderType == TSDB_ORDER_ASC) {
|
||||
startIdx = pQInfo->groupIndex;
|
||||
step = 1;
|
||||
} else { // desc order copy all data
|
||||
startIdx = totalSubset - pQInfo->groupIndex - 1;
|
||||
startIdx = totalSet - pQInfo->groupIndex - 1;
|
||||
step = -1;
|
||||
}
|
||||
|
||||
for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) {
|
||||
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
|
||||
if (result[i].numOfRows == 0) {
|
||||
pQInfo->offset = 0;
|
||||
pQInfo->groupIndex += 1;
|
||||
|
@ -3696,11 +3684,11 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
|
|||
* @param pQInfo
|
||||
* @param result
|
||||
*/
|
||||
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
|
||||
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo) {
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
|
||||
int32_t numOfResult = doCopyToSData(pQInfo, result, orderType);
|
||||
int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo, orderType);
|
||||
|
||||
pQuery->rec.rows += numOfResult;
|
||||
|
||||
|
@ -4523,7 +4511,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
pQInfo->groupIndex = 0;
|
||||
|
||||
ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size);
|
||||
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
|
||||
copyFromWindowResToSData(pQInfo, pWindowResInfo);
|
||||
|
||||
pQInfo->groupIndex = currentGroupIndex; //restore the group index
|
||||
assert(pQuery->rec.rows == pWindowResInfo->size);
|
||||
|
@ -4538,7 +4526,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
* we need to return it to client in the first place.
|
||||
*/
|
||||
if (pQInfo->groupIndex > 0) {
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
|
||||
pQuery->rec.total += pQuery->rec.rows;
|
||||
|
||||
if (pQuery->rec.rows > 0) {
|
||||
|
@ -4739,7 +4727,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|||
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
|
||||
#endif
|
||||
} else {
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
|
||||
}
|
||||
|
||||
qDebug("QInfo:%p current:%"PRId64", total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||
|
@ -4790,7 +4778,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|||
#endif
|
||||
}
|
||||
} else { // not a interval query
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
|
||||
}
|
||||
|
||||
// handle the limitation of output buffer
|
||||
|
@ -4945,7 +4933,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
|||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
pQInfo->groupIndex = 0; // always start from 0
|
||||
pQuery->rec.rows = 0;
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
|
||||
|
||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
||||
}
|
||||
|
@ -4974,7 +4962,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
|||
if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
|
||||
pQInfo->groupIndex = 0;
|
||||
pQuery->rec.rows = 0;
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
|
||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
||||
}
|
||||
|
||||
|
@ -5006,7 +4994,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
pQInfo->groupIndex = 0; // always start from 0
|
||||
|
||||
if (pRuntimeEnv->windowResInfo.size > 0) {
|
||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
|
||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
||||
|
||||
if (pQuery->rec.rows > 0) {
|
||||
|
|
Loading…
Reference in New Issue