[TD-225] refactor code.
This commit is contained in:
parent
ed933ba7e9
commit
f55b477a96
|
@ -305,6 +305,36 @@ typedef struct SQueryParam {
|
||||||
SSqlGroupbyExpr *pGroupbyExpr;
|
SSqlGroupbyExpr *pGroupbyExpr;
|
||||||
} SQueryParam;
|
} SQueryParam;
|
||||||
|
|
||||||
|
typedef struct SSDataBlock {
|
||||||
|
SDataStatis *pBlockStatis;
|
||||||
|
SArray *pDataBlock;
|
||||||
|
SDataBlockInfo info;
|
||||||
|
} SSDataBlock;
|
||||||
|
|
||||||
|
typedef struct STableScanInfo {
|
||||||
|
void *pQueryHandle;
|
||||||
|
int32_t numOfBlocks;
|
||||||
|
int32_t numOfSkipped;
|
||||||
|
int32_t numOfBlockStatis;
|
||||||
|
|
||||||
|
int64_t numOfRows;
|
||||||
|
int32_t order;
|
||||||
|
bool completed;
|
||||||
|
|
||||||
|
SSDataBlock block;
|
||||||
|
|
||||||
|
int64_t elapsedTime;
|
||||||
|
SSDataBlock* (*apply)(void* param);
|
||||||
|
} STableScanInfo;
|
||||||
|
|
||||||
|
typedef struct SAggOperatorInfo {
|
||||||
|
SResultRowInfo *pResultRowInfo;
|
||||||
|
STableQueryInfo *pTableQueryInfo;
|
||||||
|
STableScanInfo *pTableScanInfo;
|
||||||
|
SQueryRuntimeEnv *pRuntimeEnv;
|
||||||
|
SSDataBlock* (*apply)(void* param);
|
||||||
|
} SAggOperatorInfo;
|
||||||
|
|
||||||
void freeParam(SQueryParam *param);
|
void freeParam(SQueryParam *param);
|
||||||
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
|
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
|
||||||
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
|
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
|
||||||
|
|
|
@ -3886,45 +3886,43 @@ int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
*/
|
*/
|
||||||
void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
|
void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
||||||
|
SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo;
|
||||||
|
|
||||||
if (pTableQueryInfo->queryRangeSet) {
|
if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) {
|
||||||
pTableQueryInfo->lastKey = key;
|
return;
|
||||||
} else {
|
|
||||||
pTableQueryInfo->win.skey = key;
|
|
||||||
STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey};
|
|
||||||
|
|
||||||
// for too small query range, no data in this interval.
|
|
||||||
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey < pQuery->window.skey)) ||
|
|
||||||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey < pQuery->window.ekey))) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* In handling the both ascending and descending order super table query, we need to find the first qualified
|
|
||||||
* timestamp of this table, and then set the first qualified start timestamp.
|
|
||||||
* In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional
|
|
||||||
* operations involve.
|
|
||||||
*/
|
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
|
||||||
SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo;
|
|
||||||
|
|
||||||
TSKEY sk = MIN(win.skey, win.ekey);
|
|
||||||
TSKEY ek = MAX(win.skey, win.ekey);
|
|
||||||
getAlignQueryTimeWindow(pQuery, win.skey, sk, ek, &w);
|
|
||||||
|
|
||||||
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
|
||||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
|
||||||
assert(win.ekey == pQuery->window.ekey);
|
|
||||||
}
|
|
||||||
|
|
||||||
pWindowResInfo->prevSKey = w.skey;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTableQueryInfo->queryRangeSet = 1;
|
|
||||||
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTableQueryInfo->win.skey = key;
|
||||||
|
STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey};
|
||||||
|
|
||||||
|
// for too small query range, no data in this interval.
|
||||||
|
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey < pQuery->window.skey)) ||
|
||||||
|
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey < pQuery->window.ekey))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In handling the both ascending and descending order super table query, we need to find the first qualified
|
||||||
|
* timestamp of this table, and then set the first qualified start timestamp.
|
||||||
|
* In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional
|
||||||
|
* operations involve.
|
||||||
|
*/
|
||||||
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
|
TSKEY sk = MIN(win.skey, win.ekey);
|
||||||
|
TSKEY ek = MAX(win.skey, win.ekey);
|
||||||
|
getAlignQueryTimeWindow(pQuery, win.skey, sk, ek, &w);
|
||||||
|
|
||||||
|
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
||||||
|
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
assert(win.ekey == pQuery->window.ekey);
|
||||||
|
}
|
||||||
|
pWindowResInfo->prevSKey = w.skey;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool requireTimestamp(SQuery *pQuery) {
|
bool requireTimestamp(SQuery *pQuery) {
|
||||||
|
@ -4276,6 +4274,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
|
||||||
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
|
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
|
static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
|
||||||
if (pTableBlockDist != NULL) {
|
if (pTableBlockDist != NULL) {
|
||||||
taosArrayDestroy(pTableBlockDist->dataBlockInfos);
|
taosArrayDestroy(pTableBlockDist->dataBlockInfos);
|
||||||
|
@ -4283,6 +4282,7 @@ static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
|
||||||
free(pTableBlockDist);
|
free(pTableBlockDist);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
|
static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
|
||||||
int32_t len = (int32_t)taosArrayGetSize(pArray);
|
int32_t len = (int32_t)taosArrayGetSize(pArray);
|
||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
|
@ -4642,7 +4642,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
|
||||||
pRuntimeEnv->stableQuery = isSTableQuery;
|
pRuntimeEnv->stableQuery = isSTableQuery;
|
||||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||||
pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
|
pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
|
||||||
pRuntimeEnv->stabledev = isStabledev(pQuery);
|
|
||||||
|
|
||||||
if (pTsBuf != NULL) {
|
if (pTsBuf != NULL) {
|
||||||
int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
|
@ -5586,6 +5585,103 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
|
||||||
tfree(arithSup.data);
|
tfree(arithSup.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doTableScan(void* param) {
|
||||||
|
STableScanInfo* pTableScanInfo = (STableScanInfo*) param;
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = &pTableScanInfo->block;
|
||||||
|
while(tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
|
||||||
|
pTableScanInfo->numOfBlocks += 1;
|
||||||
|
|
||||||
|
// todo check for query cancel
|
||||||
|
|
||||||
|
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
|
||||||
|
|
||||||
|
SDataStatis *pStatis = pBlock->pBlockStatis;
|
||||||
|
|
||||||
|
// this function never returns error?
|
||||||
|
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
|
||||||
|
pTableScanInfo->numOfBlockStatis += 1;
|
||||||
|
|
||||||
|
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
|
||||||
|
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
|
||||||
|
pTableScanInfo->numOfRows += pBlock->info.rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pBlock;
|
||||||
|
// int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
|
||||||
|
// if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if (status == BLK_DATA_DISCARD) {
|
||||||
|
// pQuery->current->lastKey =
|
||||||
|
// QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
|
||||||
|
// continue;
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static UNUSED_FUNC STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle) {
|
||||||
|
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
|
||||||
|
pInfo->pQueryHandle = pTsdbQueryHandle;
|
||||||
|
pInfo->apply = doTableScan;
|
||||||
|
return pInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is a blocking operator
|
||||||
|
static SSDataBlock* doAggOperator(void* param) {
|
||||||
|
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
|
||||||
|
|
||||||
|
// setup the output buffer
|
||||||
|
SSDataBlock* res = calloc(1, sizeof(SSDataBlock));
|
||||||
|
|
||||||
|
SQuery* pQuery = pInfo->pRuntimeEnv->pQuery;
|
||||||
|
res->info.numOfCols = pQuery->numOfOutput;
|
||||||
|
|
||||||
|
res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData));
|
||||||
|
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
|
|
||||||
|
SColumnInfoData idata = {0};
|
||||||
|
idata.info.type = pQuery->pExpr1[i].type;
|
||||||
|
idata.info.bytes = pQuery->pExpr1[i].bytes;
|
||||||
|
idata.info.colId = pQuery->pExpr1[i].base.resColId;
|
||||||
|
idata.pData = calloc(4096, idata.info.bytes);
|
||||||
|
taosArrayPush(res->pDataBlock, &idata);
|
||||||
|
|
||||||
|
pInfo->pRuntimeEnv->pCtx[i].pOutput = idata.pData;
|
||||||
|
}
|
||||||
|
|
||||||
|
while(1) {
|
||||||
|
SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockwiseApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pInfo->pResultRowInfo, binarySearchForKey, pBlock->pDataBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
finalizeQueryResult(pInfo->pRuntimeEnv);
|
||||||
|
|
||||||
|
res->info.rows = getNumOfResult(pInfo->pRuntimeEnv);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
static UNUSED_FUNC SAggOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
|
||||||
|
SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo) {
|
||||||
|
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
|
||||||
|
|
||||||
|
pInfo->pResultRowInfo = pResultRowInfo;
|
||||||
|
pInfo->pTableQueryInfo = pTableQueryInfo;
|
||||||
|
pInfo->pTableScanInfo = pTableScanInfo;
|
||||||
|
pInfo->pRuntimeEnv = pRuntimeEnv;
|
||||||
|
pInfo->apply = doAggOperator;
|
||||||
|
|
||||||
|
return pInfo;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* in each query, this function will be called only once, no retry for further result.
|
* in each query, this function will be called only once, no retry for further result.
|
||||||
*
|
*
|
||||||
|
@ -5600,11 +5696,15 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
|
STableScanInfo* pi = createTableScanInfo(pRuntimeEnv->pQueryHandle);
|
||||||
finalizeQueryResult(pRuntimeEnv);
|
SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pi);
|
||||||
|
SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo);
|
||||||
|
|
||||||
|
// scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
|
||||||
|
|
||||||
// since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously.
|
// since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously.
|
||||||
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
// pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
|
||||||
|
pQuery->rec.rows = pResBlock->info.rows;//getNumOfResult(pRuntimeEnv);
|
||||||
doSecondaryArithmeticProcess(pQuery);
|
doSecondaryArithmeticProcess(pQuery);
|
||||||
|
|
||||||
if (isQueryKilled(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
|
|
Loading…
Reference in New Issue