[TD-225] refactor codes.
This commit is contained in:
parent
0c38e1d9d3
commit
8103620575
|
@ -84,14 +84,13 @@ typedef struct SResultRec {
|
||||||
} SResultRec;
|
} SResultRec;
|
||||||
|
|
||||||
typedef struct SWindowResInfo {
|
typedef struct SWindowResInfo {
|
||||||
SResultRow** pResult; // result list
|
SResultRow** pResult; // result list
|
||||||
int16_t type:8; // data type for hash key
|
int16_t type:8; // data type for hash key
|
||||||
int32_t size:24; // number of result set
|
int32_t size:24; // number of result set
|
||||||
int32_t threshold; // threshold to halt query and return the generated results.
|
int32_t capacity; // max capacity
|
||||||
int32_t capacity; // max capacity
|
int32_t curIndex; // current start active index
|
||||||
int32_t curIndex; // current start active index
|
int64_t startTime; // start time of the first time window for sliding query
|
||||||
int64_t startTime; // start time of the first time window for sliding query
|
int64_t prevSKey; // previous (not completed) sliding window start key
|
||||||
int64_t prevSKey; // previous (not completed) sliding window start key
|
|
||||||
} SWindowResInfo;
|
} SWindowResInfo;
|
||||||
|
|
||||||
typedef struct SColumnFilterElem {
|
typedef struct SColumnFilterElem {
|
||||||
|
|
|
@ -30,7 +30,7 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t typ
|
||||||
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type);
|
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type);
|
||||||
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
|
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
|
||||||
|
|
||||||
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, int32_t size, int32_t threshold, int16_t type);
|
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, int32_t size, int16_t type);
|
||||||
|
|
||||||
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo);
|
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo);
|
||||||
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
|
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
|
||||||
|
|
|
@ -761,7 +761,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
|
||||||
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
|
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
|
||||||
|
|
||||||
// the number of completed slots are larger than the threshold, return current generated results to client.
|
// the number of completed slots are larger than the threshold, return current generated results to client.
|
||||||
if (numOfClosed > pWindowResInfo->threshold) {
|
if (numOfClosed > pQuery->rec.threshold) {
|
||||||
qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return",
|
qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return",
|
||||||
GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold);
|
GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold);
|
||||||
|
|
||||||
|
@ -3587,17 +3587,9 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
|
||||||
|
|
||||||
// order has changed already
|
// order has changed already
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
// TODO validate the assertion
|
|
||||||
// if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
|
||||||
// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
|
|
||||||
// } else {
|
|
||||||
// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (pTableQueryInfo->lastKey == pTableQueryInfo->win.skey) {
|
if (pTableQueryInfo->lastKey == pTableQueryInfo->win.skey) {
|
||||||
// do nothing, no results
|
// do nothing, no results
|
||||||
} else {// even win.skey != lastKey, the results may not generated.
|
} else {// NOTE: even win.skey != lastKey, the results may not generated.
|
||||||
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4102,9 +4094,8 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void
|
||||||
|
|
||||||
// set more initial size of interval/groupby query
|
// set more initial size of interval/groupby query
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
||||||
int32_t initialSize = 16;
|
int32_t initialSize = 128;
|
||||||
int32_t initialThreshold = 100;
|
int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT);
|
||||||
int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, initialThreshold, TSDB_DATA_TYPE_INT);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4989,20 +4980,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
|
|
||||||
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
int16_t type = TSDB_DATA_TYPE_NULL;
|
int16_t type = TSDB_DATA_TYPE_NULL;
|
||||||
int32_t threshold = 0;
|
|
||||||
|
|
||||||
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
|
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
|
||||||
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
|
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
|
||||||
threshold = 4000;
|
|
||||||
} else {
|
} else {
|
||||||
type = TSDB_DATA_TYPE_INT; // group id
|
type = TSDB_DATA_TYPE_INT; // group id
|
||||||
threshold = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
|
|
||||||
if (threshold < 8) {
|
|
||||||
threshold = 8;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, threshold, type);
|
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, type);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -5022,7 +5006,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
type = TSDB_DATA_TYPE_TIMESTAMP;
|
type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, 1024, type);
|
code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,9 +43,8 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, int32_t size, int32_t threshold, int16_t type) {
|
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, int32_t size, int16_t type) {
|
||||||
pWindowResInfo->capacity = size;
|
pWindowResInfo->capacity = size;
|
||||||
pWindowResInfo->threshold = threshold;
|
|
||||||
|
|
||||||
pWindowResInfo->type = type;
|
pWindowResInfo->type = type;
|
||||||
pWindowResInfo->curIndex = -1;
|
pWindowResInfo->curIndex = -1;
|
||||||
|
|
Loading…
Reference in New Issue