[td-225] refactor some codes.
This commit is contained in:
parent
267f2c037b
commit
586b5db8c1
|
@ -1225,9 +1225,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
} else { // other queries
|
} else { // other queries
|
||||||
// decide which group this rows belongs to according to current state value
|
// decide which group this rows belongs to according to current state value
|
||||||
if (groupbyStateValue) {
|
if (groupbyStateValue) {
|
||||||
char *stateVal = groupbyColumnData + bytes * offset;
|
char *val = groupbyColumnData + bytes * offset;
|
||||||
|
|
||||||
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, stateVal, type, bytes);
|
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1268,10 +1268,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
|
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
|
||||||
SDataStatis *pStatis, __block_search_fn_t searchFn,
|
SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) {
|
||||||
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
||||||
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2446,6 +2446,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
|
||||||
|
|
||||||
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
|
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
|
||||||
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
|
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
|
||||||
|
|
||||||
|
@ -2478,7 +2479,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// in case of prj/diff query, ensure the output buffer is sufficient to accomodate the results of current block
|
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
|
||||||
if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) {
|
if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) {
|
||||||
SResultRec *pRec = &pQuery->rec;
|
SResultRec *pRec = &pQuery->rec;
|
||||||
|
|
||||||
|
@ -2507,8 +2508,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
||||||
|
|
||||||
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
|
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
|
||||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey,
|
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
|
||||||
&pRuntimeEnv->windowResInfo, pDataBlock);
|
|
||||||
|
|
||||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
||||||
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
||||||
|
@ -2519,7 +2519,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the result buffer is not full, set the query completed flag
|
// if the result buffer is not full, set the query complete
|
||||||
if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
}
|
}
|
||||||
|
@ -2530,7 +2530,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
|
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
|
||||||
removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step);
|
removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step);
|
||||||
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1;
|
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
|
||||||
} else {
|
} else {
|
||||||
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
|
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
|
||||||
}
|
}
|
||||||
|
@ -3436,7 +3436,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
STsdbQueryCond cond = {
|
STsdbQueryCond cond = {
|
||||||
.twindow = qstatus.curWindow,
|
.twindow = qstatus.curWindow,
|
||||||
.order = pQuery->order.order,
|
.order = pQuery->order.order,
|
||||||
.colList = pQuery->colList,
|
.colList = pQuery->colList,
|
||||||
.numOfCols = pQuery->numOfCols,
|
.numOfCols = pQuery->numOfCols,
|
||||||
};
|
};
|
||||||
|
@ -3457,7 +3457,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!needReverseScan(pQuery)) {
|
if (pRuntimeEnv->stableQuery || !needReverseScan(pQuery)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4060,8 +4060,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
|
||||||
pQuery->lastKey = keys[pQuery->pos];
|
pQuery->lastKey = keys[pQuery->pos];
|
||||||
pQuery->limit.offset = 0;
|
pQuery->limit.offset = 0;
|
||||||
|
|
||||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey,
|
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
|
||||||
&pRuntimeEnv->windowResInfo, pDataBlock);
|
|
||||||
|
|
||||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes);
|
||||||
|
@ -4166,8 +4165,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
|
pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
|
||||||
pWindowResInfo->prevSKey = tw.skey;
|
pWindowResInfo->prevSKey = tw.skey;
|
||||||
|
|
||||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey,
|
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock);
|
||||||
&pRuntimeEnv->windowResInfo, pDataBlock);
|
|
||||||
|
|
||||||
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d",
|
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d",
|
||||||
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
|
||||||
|
@ -4454,6 +4452,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
.numOfCols = pQuery->numOfCols,
|
.numOfCols = pQuery->numOfCols,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// todo refactor
|
||||||
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
||||||
SArray *tx = taosArrayInit(1, sizeof(STableId));
|
SArray *tx = taosArrayInit(1, sizeof(STableId));
|
||||||
|
|
||||||
|
@ -4624,10 +4623,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex);
|
SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex);
|
||||||
|
|
||||||
STableQueryInfo *pInfo = item->info;
|
STableQueryInfo *pInfo = item->info;
|
||||||
if (pInfo->lastKey > 0) {
|
|
||||||
pQuery->window.skey = pInfo->lastKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) {
|
if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) {
|
||||||
pQInfo->tableIndex++;
|
pQInfo->tableIndex++;
|
||||||
continue;
|
continue;
|
||||||
|
@ -4635,7 +4630,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
|
|
||||||
// SPointInterpoSupporter pointInterpSupporter = {0};
|
// SPointInterpoSupporter pointInterpSupporter = {0};
|
||||||
|
|
||||||
// TODO handle the limit problem
|
// TODO handle the limit offset problem
|
||||||
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
|
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
|
||||||
// skipBlocks(pRuntimeEnv);
|
// skipBlocks(pRuntimeEnv);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue