fix bugs found in regression test.

This commit is contained in:
hjxilinx 2020-02-20 02:09:24 +08:00
parent 8928dea534
commit e9dc2db741
5 changed files with 307 additions and 414 deletions

View File

@ -2817,11 +2817,14 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
offset = pMsg - (char *)pMetaMsg;
pElem->tableCond = htonl(offset);
uint32_t len = strlen(pTagCond->tbnameCond.cond);
uint32_t len = 0;
if (pTagCond->tbnameCond.cond != NULL) {
len = strlen(pTagCond->tbnameCond.cond);
memcpy(pMsg, pTagCond->tbnameCond.cond, len);
}
pElem->tableCondLen = htonl(len);
memcpy(pMsg, pTagCond->tbnameCond.cond, len);
pMsg += len;
}

View File

@ -153,7 +153,6 @@ void doSkipResults(SQueryRuntimeEnv* pRuntimeEnv);
void doFinalizeResult(SQueryRuntimeEnv* pRuntimeEnv);
int64_t getNumOfResult(SQueryRuntimeEnv* pRuntimeEnv);
void forwardIntervalQueryRange(STableQuerySupportObj* pSupporter, SQueryRuntimeEnv* pRuntimeEnv);
void forwardQueryStartPosition(SQueryRuntimeEnv* pRuntimeEnv);
bool normalizedFirstQueryRange(bool dataInDisk, bool dataInCache, STableQuerySupportObj* pSupporter,
@ -171,12 +170,10 @@ void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter);
void copyFromWindowResToSData(SQInfo* pQInfo, SWindowResult* result);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv);
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pBlock, int32_t type);
// void queryOnBlock(STableQuerySupportObj* pSupporter, int32_t blockStatus, SBlockInfo* pBlockBasicInfo,
// SMeterDataInfo* pDataHeadInfoEx, SField* pFields,
// __block_search_fn_t searchFn);
SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot);
void stableApplyFunctionsOnBlock(STableQuerySupportObj* pSupporter, SMeterDataInfo* pMeterDataInfo,
SBlockInfo* pBlockInfo, SField* pFields, __block_search_fn_t searchFn);
@ -190,14 +187,13 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo** pMeterDataInfo, int32_t numOfMet
int32_t* nAllocBlocksInfoSize, int64_t addr);
void freeMeterBlockInfoEx(SMeterDataBlockInfoEx* pDataBlockInfoEx, int32_t len);
void setExecutionContext(STableQuerySupportObj* pSupporter, SWindowResult* outputRes, int32_t meterIdx,
int32_t groupIdx, SMeterQueryInfo* sqinfo);
int32_t setIntervalQueryExecutionContext(STableQuerySupportObj* pSupporter, int32_t meterIdx, SMeterQueryInfo* sqinfo);
void setExecutionContext(STableQuerySupportObj* pSupporter, SMeterQueryInfo* pMeterQueryInfo, int32_t meterIdx,
int32_t groupIdx, TSKEY nextKey);
int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo);
void doGetAlignedIntervalQueryRangeImpl(SQuery* pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast,
int64_t* actualSkey, int64_t* actualEkey, int64_t* skey, int64_t* ekey);
int64_t getQueryStartPositionInCache(SQueryRuntimeEnv* pRuntimeEnv, int32_t* slot, int32_t* pos, bool ignoreQueryRange);
int64_t getNextAccessedKeyInData(SQuery* pQuery, int64_t* pPrimaryCol, SBlockInfo* pBlockInfo, int32_t blockStatus);
int32_t getDataBlocksForMeters(STableQuerySupportObj* pSupporter, SQuery* pQuery, int32_t numOfMeters,
const char* filePath, SMeterDataInfo** pMeterDataInfo, uint32_t* numOfBlocks);

View File

@ -169,19 +169,16 @@ typedef struct SQueryRuntimeEnv {
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
STimeWindow intervalWindow; // the complete time window, not affected by the actual data distribution
bool stableQuery; // is super table query or not
SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Here we do not use the cacheblock info from pMeterObj, simple because it may change anytime
* during the query by the subumit/insert handling threads.
* Here we do not use the cache block info from pMeterObj, simple because it may change anytime
* during the query by the submit/insert handling threads.
* So we keep a copy of the support structure as well as the cache block data itself.
*/
SCacheBlock cacheBlock;
SQueryDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool stableQuery; // is super table query or not
} SQueryRuntimeEnv;
/* intermediate pos during multimeter query involves interval */
@ -222,13 +219,11 @@ typedef struct STableQuerySupportObj {
* rows may be generated by a specific subgroup. When query on all subgroups is executed,
* the result is copy to output buffer. This attribution is not used during single meter query processing.
*/
// SWindowResult* pResult;
SQueryRuntimeEnv runtimeEnv;
int64_t rawSKey;
int64_t rawEKey;
int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */
tSidSet* pSidSet;
/*

View File

@ -70,11 +70,11 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx);
static int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv);
static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
// static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static int32_t getGroupResultId(int32_t groupIndex) {
int32_t base = 200000;
@ -984,7 +984,6 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
return ret;
}
// todo ignore the blockType, pass the pQuery into this function
SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_t blockType) {
SBlockInfo blockInfo = {0};
if (IS_FILE_BLOCK(blockType)) {
@ -1030,7 +1029,7 @@ static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, in
setQueryStatus(pQuery, QUERY_COMPLETED);
return true;
}
// output buffer is full, pause current query
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pBlockInfo->size) ||
@ -1038,11 +1037,11 @@ static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, in
return true;
}
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)) {
return true;
}
// query completed
if ((pQuery->ekey <= pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->ekey >= pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
@ -1247,11 +1246,10 @@ static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
}
}
static SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
void *pBlock = getGenericDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot);
assert(pBlock != NULL);
int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
@ -1444,16 +1442,15 @@ static char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sa
return dataBlock;
}
static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return (pWindowResInfo->pResult[slot].status.closed == true);
}
static SWindowResult* getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
static SWindowResult *getWindowResult(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
return &pWindowResInfo->pResult[slot];
}
static bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
return (getWindowResult(pWindowResInfo, slot)->status.closed == true);
}
static int32_t curTimeWindow(SWindowResInfo *pWindowResInfo) {
assert(pWindowResInfo->curIndex >= 0 && pWindowResInfo->curIndex < pWindowResInfo->size);
return pWindowResInfo->curIndex;
@ -1491,7 +1488,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
taosAddToHashTable(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
}
return &pWindowResInfo->pResult[pWindowResInfo->curIndex];
return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex);
}
// get the correct time window according to the handled timestamp
@ -1503,7 +1500,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
w.ekey = w.skey + pQuery->intervalTime - 1;
} else {
int32_t slot = curTimeWindow(pWindowResInfo);
w = pWindowResInfo->pResult[slot].window;
w = getWindowResult(pWindowResInfo, slot)->window;
}
if (w.skey > ts || w.ekey < ts) {
@ -1522,19 +1519,16 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
w.ekey = w.skey + pQuery->intervalTime - 1;
}
// query border check
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) {
w.ekey = pQuery->ekey;
}
if (w.skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)) {
w.skey = pQuery->ekey;
}
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0);
if (w.skey == 1542597000000) {
int32_t k = 1;
}
return w;
}
@ -1568,9 +1562,6 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR
return -1;
}
if (pageId == 153 && pData->numOfElems >= 138) {
int32_t k = 1;
}
// set the number of rows in current disk page
if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer
pWindowRes->pos.pageId = pageId;
@ -1724,10 +1715,6 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo,
}
}
}
if (pQuery->lastKey == 1542597000001) {
int32_t k = 1;
}
assert(num >= 0);
return num;
@ -1752,7 +1739,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
}
}
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin, int32_t offset) {
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin,
int32_t offset) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
@ -1854,9 +1842,8 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
}
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : pRuntimeEnv->intervalWindow.ekey;
setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull,
pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
setExecParams(pQuery, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField,
hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
@ -1865,19 +1852,23 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
TSKEY ts = primaryKeyCol[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) {
int32_t k = 1;
}
if (win.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && !IS_MASTER_SCAN(pRuntimeEnv)) {
int32_t k = 1;
}
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
return 0;
}
TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey;
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = win.ekey;
if (ekey > pQuery->ekey) {
ekey = pQuery->ekey;
}
} else {
ekey = win.skey;
if (ekey < pQuery->ekey) {
ekey = pQuery->ekey;
}
}
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
@ -1892,18 +1883,26 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
if (startPos < 0) {
break;
}
if (nextWin.skey >= 1542597000000 && pRuntimeEnv->pMeterObj->sid == 9 && IS_MASTER_SCAN(pRuntimeEnv)) {
int32_t k = 1;
}
// null data, failed to allocate more memory buffer
int32_t sid = pRuntimeEnv->pMeterObj->sid;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) {
break;
}
ekey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.ekey : nextWin.skey;
ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = nextWin.ekey;
if (ekey > pQuery->ekey) {
ekey = pQuery->ekey;
}
} else {
ekey = nextWin.skey;
if (ekey < pQuery->ekey) {
ekey = pQuery->ekey;
}
}
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true);
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
@ -1934,8 +1933,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
}
validateTimestampForSupplementResult(pRuntimeEnv, num);
tfree(sasArray);
return (int32_t)num;
}
@ -2137,8 +2134,7 @@ int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) {
}
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
assert(slot >= 0 && slot < pWindowResInfo->size);
pWindowResInfo->pResult[slot].status.closed = true;
getWindowResult(pWindowResInfo, slot)->status.closed = true;
}
void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
@ -2283,7 +2279,8 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock);
char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep);
TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : pRuntimeEnv->intervalWindow.ekey;
TSKEY ts = pQuery->skey; // QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey :
// pRuntimeEnv->intervalWindow.ekey;
setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull,
pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag);
}
@ -2408,7 +2405,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
* requires checking buffer during loop
*/
if ((pQuery->checkBufferInLoop == 1) && (++numOfRes) >= pQuery->pointsOffset) {
pQuery->lastKey = primaryKeyCol[pQuery->pos + j * step] + step;
pQuery->lastKey = lastKey + step;
*forwardStep = j + 1;
break;
}
@ -2473,9 +2470,9 @@ static void validateQueryRangeAndData(SQueryRuntimeEnv *pRuntimeEnv, const TSKEY
!QUERY_IS_ASC_QUERY(pQuery)));
}
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo, int64_t *pPrimaryColumn,
SField *pFields, __block_search_fn_t searchFn, int32_t *numOfRes,
SWindowResInfo *pWindowResInfo) {
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pBlockInfo,
int64_t *pPrimaryColumn, SField *pFields, __block_search_fn_t searchFn,
int32_t *numOfRes, SWindowResInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo);
@ -2500,12 +2497,13 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
// interval query with limit applied
if (pQuery->intervalTime > 0 && pQuery->limit.limit > 0 && pQuery->limit.limit <= numOfClosedTimeWindow(pWindowResInfo)) {
if (pQuery->intervalTime > 0 && pQuery->limit.limit > 0 &&
pQuery->limit.limit <= numOfClosedTimeWindow(pWindowResInfo)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
assert(*numOfRes >= 0);
// check if buffer is large enough for accommodating all qualified points
@ -3417,7 +3415,6 @@ static bool doGetQueryPos(TSKEY key, STableQuerySupportObj *pSupporter, SPointIn
if (isPointInterpoQuery(pQuery)) { /* no qualified data in this query range */
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
} else {
// getAlignedIntervalQueryRange(pRuntimeEnv, key, pQuery->skey, pQuery->ekey);
return true;
}
} else { // key > pQuery->ekey, abort for normal query, continue for interp query
@ -3449,7 +3446,6 @@ static bool doSetDataInfo(STableQuerySupportObj *pSupporter, SPointInterpoSuppor
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
} else {
// getAlignedIntervalQueryRange(pRuntimeEnv, nextKey, pQuery->skey, pQuery->ekey);
return true;
}
}
@ -3554,69 +3550,6 @@ static int64_t getGreaterEqualTimestamp(SQueryRuntimeEnv *pRuntimeEnv) {
return key;
}
static void getActualRange(STableQuerySupportObj *pSupporter, STimeWindow *pTimeWindow) {
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
int32_t order = pQuery->order.order;
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->lastKey = pQuery->skey;
if (QUERY_IS_ASC_QUERY(pQuery)) { // do the desc check first for asc query
pQuery->order.order ^= 1u;
TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (t > 0) {
pTimeWindow->ekey = t;
} else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) {
pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
pQuery->order.order = order;
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->lastKey = pQuery->skey;
if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) {
pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
} else { // set no data in file
pQuery->fileId = -1;
pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
}
pQuery->skey = pTimeWindow->skey;
pQuery->ekey = pTimeWindow->ekey;
} else {
pQuery->order.order ^= 1u;
if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) {
pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
} else { // set no data in file
pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
}
// reverse check for maxValue in query range
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->order.order ^= 1u;
// set no data in file
pQuery->lastKey = pQuery->skey;
TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (t > 0) {
pTimeWindow->ekey = t;
} else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) {
pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
pQuery->ekey = pTimeWindow->skey;
pQuery->skey = pTimeWindow->ekey;
}
pQuery->order.order = order;
}
/**
* determine the first query range, according to raw query range [skey, ekey] and group-by interval.
* the time interval for aggregating is not enforced to check its validation, the minimum interval is not less than
@ -5213,14 +5146,12 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
TSKEY * primaryKeys = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
int64_t start = taosGetTimestampUs();
*pblockInfo = getBlockInfo(pRuntimeEnv);
if (IS_DISK_DATA_BLOCK(pQuery)) {
SCompBlock *pBlock = getDiskDataBlock(pQuery, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_FILE_BLOCK);
if (blockLoadStatus == DISK_DATA_LOADED) {
*forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, pQuery->pFields[pQuery->slot],
searchFn, numOfRes, &pRuntimeEnv->windowResInfo);
searchFn, numOfRes, &pRuntimeEnv->windowResInfo);
} else {
*forwardStep = pblockInfo->size;
}
@ -5228,12 +5159,8 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
pSummary->fileTimeUs += (taosGetTimestampUs() - start);
} else {
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pRuntimeEnv->pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
SCacheBlock *pBlock = getCacheDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot);
*pblockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
*forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, pblockInfo, primaryKeys, NULL, searchFn, numOfRes,
&pRuntimeEnv->windowResInfo);
&pRuntimeEnv->windowResInfo);
pSummary->cacheTimeUs += (taosGetTimestampUs() - start);
}
@ -5302,7 +5229,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (!Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED) && Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
if (nextPos >= blockInfo.size || nextPos < 0) {
moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA);
// slot/pos/fileId is updated in moveToNextBlock function
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
} else {
@ -5404,7 +5331,7 @@ void vnodeSetTagValueInParam(tSidSet *pSidSet, SQueryRuntimeEnv *pRuntimeEnv, SM
}
}
static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, tFilePage *inputSrc, int32_t inputIdx,
static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowResult* pWindowRes, /*int32_t inputIdx,*/
bool mergeFlag) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
@ -5421,9 +5348,10 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, tFilePage
pCtx[i].hasNull = true;
pCtx[i].nStartQueryTimestamp = timestamp;
pCtx[i].aInputElemBuf = ((char *)inputSrc->data) +
((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) +
pCtx[i].outputBytes * inputIdx;
pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
// pCtx[i].aInputElemBuf = ((char *)inputSrc->data) +
// ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) +
// pCtx[i].outputBytes * inputIdx;
// in case of tag column, the tag information should be extracted from input buffer
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) {
@ -5578,7 +5506,7 @@ int64_t getCurrentTimestamp(SCompSupporter *pSupportor, int32_t meterIdx) {
return *(int64_t *)(pPage->data + TSDB_KEYSIZE * pPos->rowIdx);
}
int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param) {
int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight;
@ -5598,6 +5526,7 @@ int32_t meterResultComparator(const void *pLeft, const void *pRight, void *param
return -1;
}
//!!!!!
tFilePage *pPageLeft = getMeterDataPage(pResultBuf, supporter->pMeterDataInfo[left]->pMeterQInfo, leftPos.pageIdx);
int64_t leftTimestamp = *(int64_t *)(pPageLeft->data + TSDB_KEYSIZE * leftPos.rowIdx);
@ -5695,6 +5624,32 @@ void copyResToQueryResultBuf(STableQuerySupportObj *pSupporter, SQuery *pQuery)
pSupporter->offset += 1;
}
int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* pWindowRes) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// bool hasMainFunction = hasMainOutput(pQuery);
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].pBase.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (/*hasMainFunction &&*/
(functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) {
continue;
}
SResultInfo *pResultInfo = &pWindowRes->resultInfo[j];
if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) {
maxOutput = pResultInfo->numOfRes;
}
}
return maxOutput;
}
int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery *pQuery, SQueryRuntimeEnv *pRuntimeEnv,
SMeterDataInfo *pMeterDataInfo, int32_t start, int32_t end) {
// calculate the maximum required space
@ -5708,6 +5663,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
Position * posList = calloc(1, sizeof(Position) * (end - start));
SMeterDataInfo **pTableList = malloc(POINTER_BYTES * (end - start));
//todo opt for the case of one table per group
int32_t numOfMeters = 0;
for (int32_t i = start; i < end; ++i) {
int32_t sid = pMeterDataInfo[i].pMeterQInfo->sid;
@ -5717,7 +5673,9 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
pTableList[numOfMeters] = &pMeterDataInfo[i];
// set the merge start position: page:0, index:0
posList[numOfMeters].pageIdx = 0;
posList[numOfMeters++].rowIdx = 0;
posList[numOfMeters].rowIdx = 0;
numOfMeters += 1;
}
}
@ -5731,7 +5689,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
SCompSupporter cs = {pTableList, posList, pSupporter};
SLoserTreeInfo *pTree = NULL;
tLoserTreeCreate(&pTree, numOfMeters, &cs, meterResultComparator);
tLoserTreeCreate(&pTree, numOfMeters, &cs, tableResultComparFn);
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
resetMergeResultBuf(pQuery, pCtx);
@ -5740,66 +5698,93 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery
int64_t startt = taosGetTimestampMs();
while (1) {
int32_t pos = pTree->pNode[0].index;
Position * position = &cs.pPosition[pos];
SQueryDiskbasedResultBuf *pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf;
tFilePage * pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
int64_t ts = getCurrentTimestamp(&cs, pos);
assert(ts > 0);
while (1) {//todo add iterator
int32_t pos = pTree->pNode[0].index;
Position * position = &cs.pPosition[pos];
if (ts == lastTimestamp) { // merge with the last one
doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true);
} else {
// copy data to disk buffer
if (buffer[0]->numOfElems == pQuery->pointsToRead) {
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
return -1;
}
// SQueryDiskbasedResultBuf *pResultBuf = cs.pSupporter->runtimeEnv.pResultBuf;
// tFilePage *pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
SWindowResInfo* pWindowResInfo = &pTableList[pos]->pMeterQInfo->windowResInfo;
SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, position->rowIdx);
char* b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes);
TSKEY ts = GET_INT64_VAL(b);
// int64_t ts = getCurrentTimestamp(&cs, pos);
assert(ts > 0 && ts == pWindowRes->window.skey);
resetMergeResultBuf(pQuery, pCtx);
}
pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
if (pPage->numOfElems <= 0) { // current source data page is empty
// do nothing
} else {
doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, false);
buffer[0]->numOfElems += 1;
}
}
lastTimestamp = ts;
if (cs.pPosition[pos].rowIdx >= pPage->numOfElems - 1) {
cs.pPosition[pos].rowIdx = 0;
cs.pPosition[pos].pageIdx += 1; // try next page
// check if current page is empty or not. if it is empty, ignore it and try next
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid);
if (cs.pPosition[pos].pageIdx <= list.size - 1) {
tFilePage *newPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
// if current source data page is null, it must be the last page of source output page
if (newPage->numOfElems <= 0) {
cs.pPosition[pos].pageIdx += 1;
assert(cs.pPosition[pos].pageIdx >= list.size - 1);
}
}
// the following code must be executed if current source pages are exhausted
if (cs.pPosition[pos].pageIdx >= list.size) {
cs.pPosition[pos].pageIdx = -1;
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
if (num <= 0) {
cs.pPosition[pos].rowIdx += 1;
if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) {
cs.pPosition[pos].rowIdx = -1;
// all input sources are exhausted
if (--numOfMeters == 0) {
break;
}
}
} else {
cs.pPosition[pos].rowIdx += 1;
if (ts == lastTimestamp) { // merge with the last one
doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ true);
} else {
// copy data to disk buffer
if (buffer[0]->numOfElems == pQuery->pointsToRead) {
if (flushFromResultBuf(pSupporter, pQuery, pRuntimeEnv) != TSDB_CODE_SUCCESS) {
return -1;
}
resetMergeResultBuf(pQuery, pCtx);
}
// pPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
// if (pPage->numOfElems <= 0) { // current source data page is empty
// do nothing
// } else {
doMerge(pRuntimeEnv, ts, pWindowRes, /*position->rowIdx,*/ false);
buffer[0]->numOfElems += 1;
// }
}
lastTimestamp = ts;
if (cs.pPosition[pos].rowIdx >= pWindowResInfo->size) {
cs.pPosition[pos].rowIdx = -1;
// all input sources are exhausted
if (--numOfMeters == 0) {
break;
}
// if (cs.pPosition[pos].rowIdx >= pPage->numOfElems - 1) {
// cs.pPosition[pos].rowIdx = 0;
// cs.pPosition[pos].pageIdx += 1; // try next page
//
// // check if current page is empty or not. if it is empty, ignore it and try next
// SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, cs.pMeterDataInfo[pos]->pMeterQInfo->sid);
// if (cs.pPosition[pos].pageIdx <= list.size - 1) {
// tFilePage *newPage = getMeterDataPage(pResultBuf, pTableList[pos]->pMeterQInfo, position->pageIdx);
//
// // if current source data page is null, it must be the last page of source output page
// if (newPage->numOfElems <= 0) {
// cs.pPosition[pos].pageIdx += 1;
// assert(cs.pPosition[pos].pageIdx >= list.size - 1);
// }
// }
//
// // the following code must be executed if current source pages are exhausted
// if (cs.pPosition[pos].pageIdx >= list.size) {
// cs.pPosition[pos].pageIdx = -1;
// cs.pPosition[pos].rowIdx = -1;
//
// // all input sources are exhausted
// if (--numOfMeters == 0) {
// break;
// }
// }
} else {
cs.pPosition[pos].rowIdx += 1;
}
}
tLoserTreeAdjust(pTree, pos + pTree->numOfEntries);
@ -5901,7 +5886,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
continue;
}
SWindowResult *buf = &pWindowResInfo->pResult[i];
SWindowResult *buf = getWindowResult(pWindowResInfo, i);
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
@ -6165,14 +6150,11 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->lastKey = pQuery->skey;
pRuntimeEnv->startPos = pRuntimeEnv->endPos;
SWAP(pRuntimeEnv->intervalWindow.skey, pRuntimeEnv->intervalWindow.ekey, TSKEY);
}
static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
SWAP(pRuntimeEnv->intervalWindow.skey, pRuntimeEnv->intervalWindow.ekey, TSKEY);
pQuery->lastKey = pStatus->lastKey;
pQuery->skey = pStatus->skey;
@ -6249,7 +6231,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
SWindowResult *pResult = getWindowResult(pWindowResInfo, i);
if (!pResult->status.closed) {
continue;
}
@ -6587,25 +6569,24 @@ void destroyMeterQueryInfo(SMeterQueryInfo *pMeterQueryInfo, int32_t numOfCols)
free(pMeterQueryInfo);
}
void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo *pMeterQueryInfo,
TSKEY skey, TSKEY ekey) {
void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo, TSKEY skey, TSKEY ekey) {
if (pMeterQueryInfo == NULL) {
return;
}
//order has change already!
// order has change already!
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (!QUERY_IS_ASC_QUERY(pQuery)) {
assert(pMeterQueryInfo->ekey >= pMeterQueryInfo->lastKey + step);
} else {
assert(pMeterQueryInfo->ekey <= pMeterQueryInfo->lastKey + step);
}
pMeterQueryInfo->ekey = pMeterQueryInfo->lastKey + step;
SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY);
pMeterQueryInfo->lastKey = pMeterQueryInfo->skey;
// pMeterQueryInfo->queryRangeSet = 0;
pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1;
pMeterQueryInfo->cur.vnodeIndex = -1;
@ -7009,23 +6990,24 @@ int32_t createDataBlocksInfoEx(SMeterDataInfo **pMeterDataInfo, int32_t numOfMet
* @param pRuntimeEnv
* @param pDataBlockInfoEx
*/
void setExecutionContext(STableQuerySupportObj *pSupporter, SWindowResult *outputRes, int32_t meterIdx,
int32_t groupIdx, SMeterQueryInfo *pMeterQueryInfo) {
void setExecutionContext(STableQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo, int32_t meterIdx,
int32_t groupIdx, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
int32_t GROUP_RES_ID = 1;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
int32_t GROUPRESULTID = 1;
SWindowResult *pWindowRes =
doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&groupIdx, sizeof(groupIdx));
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIdx, sizeof(groupIdx));
if (pWindowRes == NULL) {
return;
}
// not assign result buffer yet, add new result buffer
// all group belong to one result set, and each group result has different group id so set the id to be one
/*
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pWindowRes->pos.pageId == -1) {
int32_t ret =
addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, GROUP_RES_ID, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
if (addNewWindowResultBuf(pWindowRes, pRuntimeEnv->pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage) !=
TSDB_CODE_SUCCESS) {
return;
}
}
@ -7033,18 +7015,8 @@ void setExecutionContext(STableQuerySupportObj *pSupporter, SWindowResult *outpu
setWindowResOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv);
vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pSupporter->pMeterSidExtInfo[meterIdx]);
// set the right cursor position for ts buffer
if (pSupporter->runtimeEnv.pTSBuf != NULL) {
if (pMeterQueryInfo->cur.vnodeIndex == -1) {
pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
tsBufGetElemStartPos(pSupporter->runtimeEnv.pTSBuf, 0, pMeterQueryInfo->tag);
} else {
tsBufSetCursor(pSupporter->runtimeEnv.pTSBuf, &pMeterQueryInfo->cur);
}
}
pMeterQueryInfo->lastKey = nextKey;
setAdditionalInfo(pSupporter, meterIdx, pMeterQueryInfo);
}
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
@ -7072,101 +7044,43 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
}
}
static char *getOutputResPos(SQueryRuntimeEnv *pRuntimeEnv, tFilePage *pData, int32_t row, int32_t col) {
// the output for each record should be less than the DEFAULT_INTERN_BUF_SIZE
assert(pRuntimeEnv->pCtx[col].outputBytes <= DEFAULT_INTERN_BUF_SIZE);
return (char *)pData->data + pRuntimeEnv->offset[col] * pRuntimeEnv->numOfRowsPerPage +
pRuntimeEnv->pCtx[col].outputBytes * row;
}
void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t numOfIncrementRes) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (pRuntimeEnv->scanFlag == SUPPLEMENTARY_SCAN && numOfIncrementRes > 0) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS) {
assert(*(TSKEY *)pCtx[i].aOutputBuf == pCtx[i].nStartQueryTimestamp);
}
}
}
}
int32_t setOutputBufferForIntervalQuery(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) {
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE);
if (pWindowRes == NULL) {
return -1;
}
// not allocated yet, allocate new buffer
if (pWindowRes->pos.pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, pMeterQueryInfo->sid, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
return -1;
}
}
pWindowRes->window = win;
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].resultInfo = &pWindowRes->resultInfo[i];
pRuntimeEnv->pCtx[i].aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
}
return TSDB_CODE_SUCCESS;
}
int32_t setIntervalQueryExecutionContext(STableQuerySupportObj *pSupporter, int32_t meterIdx,
SMeterQueryInfo *pMeterQueryInfo) {
int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
assert(pMeterQueryInfo->lastKey > 0);
// not enough disk space or memory buffer for intermediate results
if (setOutputBufferForIntervalQuery(pRuntimeEnv, pMeterQueryInfo) != TSDB_CODE_SUCCESS) {
return -1;
}
initCtxOutputBuf(pRuntimeEnv);
vnodeSetTagValueInParam(pSupporter->pSidSet, pRuntimeEnv, pSupporter->pMeterSidExtInfo[meterIdx]);
// both the master and supplement scan needs to set the correct ts comp start position
if (pSupporter->runtimeEnv.pTSBuf != NULL) {
if (pRuntimeEnv->pTSBuf != NULL) {
if (pMeterQueryInfo->cur.vnodeIndex == -1) {
pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
tsBufGetElemStartPos(pSupporter->runtimeEnv.pTSBuf, 0, pMeterQueryInfo->tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pMeterQueryInfo->tag);
// keep the cursor info of current meter
pMeterQueryInfo->cur = pSupporter->runtimeEnv.pTSBuf->cur;
pMeterQueryInfo->cur = pRuntimeEnv->pTSBuf->cur;
} else {
tsBufSetCursor(pSupporter->runtimeEnv.pTSBuf, &pMeterQueryInfo->cur);
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pMeterQueryInfo->cur);
}
}
return 0;
}
int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) {
assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1);
TSKEY key = -1;
if (IS_DATA_BLOCK_LOADED(blockStatus)) {
key = pPrimaryCol[pQuery->pos];
} else { // while the data block is not loaded, the position must be the first or last position
assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0);
key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast;
}
assert((key >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (key <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery)));
return key;
}
// int64_t getNextAccessedKeyInData(SQuery *pQuery, int64_t *pPrimaryCol, SBlockInfo *pBlockInfo, int32_t blockStatus) {
// assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->size - 1);
//
// TSKEY key = -1;
// if (IS_DATA_BLOCK_LOADED(blockStatus)) {
// key = pPrimaryCol[pQuery->pos];
// } else { // while the data block is not loaded, the position must be the first or last position
// assert(pQuery->pos == pBlockInfo->size - 1 || pQuery->pos == 0);
// key = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->keyFirst : pBlockInfo->keyLast;
// }
//
// assert((key >= pQuery->skey && QUERY_IS_ASC_QUERY(pQuery)) || (key <= pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery)));
// return key;
//}
/*
* There are two cases to handle:
@ -7227,14 +7141,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO
}
}
win = getActiveTimeWindow(pWindowResInfo, pQuery->skey, pQuery);
SWindowResult *pWindowRes =
doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&pQuery->skey, TSDB_KEYSIZE);
if (pWindowRes == NULL) {
return;
}
pWindowRes->window = win;
pMeterQueryInfo->queryRangeSet = 1;
pMeterQueryInfo->lastKey = pQuery->skey;
pMeterQueryInfo->skey = pQuery->skey;
@ -7297,7 +7203,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
if (((pQuery->lastKey <= pBlock->keyFirst && pQuery->ekey >= pBlock->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->ekey <= pBlock->keyFirst && pQuery->lastKey >= pBlock->keyLast && !QUERY_IS_ASC_QUERY(pQuery))) &&
onDemand) {
int32_t req = 0;
uint32_t req = 0;
if (pQuery->numOfFilterCols > 0) {
req = BLK_DATA_ALL_NEEDED;
} else {
@ -7369,9 +7275,8 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk
/* find first qualified record position in this block */
if (loadTS) {
/* find first qualified record position in this block */
pQuery->pos =
searchFn(pRuntimeEnv->primaryColBuffer->data, pBlock->numOfPoints, pQuery->lastKey, pQuery->order.order);
pQuery->pos = searchFn((char *)primaryKeys, pBlock->numOfPoints, pQuery->lastKey, pQuery->order.order);
/* boundary timestamp check */
assert(pBlock->keyFirst == primaryKeys[0] && pBlock->keyLast == primaryKeys[pBlock->numOfPoints - 1]);
}
@ -7394,20 +7299,6 @@ bool onDemandLoadDatablock(SQuery *pQuery, int16_t queryRangeSet) {
return (pQuery->intervalTime == 0) || ((queryRangeSet == 1) && (pQuery->intervalTime > 0));
}
static void validateResultBuf(STableQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQueryInfo) {
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pSupporter->runtimeEnv.pQuery;
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
int32_t id = getLastPageId(&list);
tFilePage *newOutput = getResultBufferPageById(pResultBuf, id);
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
assert(pRuntimeEnv->pCtx[i].aOutputBuf - newOutput->data < DEFAULT_INTERN_BUF_SIZE);
}
}
static int32_t getNumOfSubset(STableQuerySupportObj *pSupporter) {
SQuery *pQuery = pSupporter->runtimeEnv.pQuery;
@ -7506,8 +7397,23 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
assert(pQuery->pointsRead <= pQuery->pointsToRead);
}
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, SMeterDataInfo *pMeterDataInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// update the number of result for each, only update the number of rows for the corresponding window result.
if (pQuery->intervalTime == 0) {
int32_t g = pMeterDataInfo->groupIdx;
assert(pRuntimeEnv->windowResInfo.size > 0);
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
if (pWindowRes->numOfRows == 0) {
pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv);
}
}
}
void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo *pMeterDataInfo,
SBlockInfo *pBlockInfo, SField *pFields, __block_search_fn_t searchFn) {
SBlockInfo *pBlockInfo, SField *pFields, __block_search_fn_t searchFn) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterQueryInfo * pMeterQueryInfo = pMeterDataInfo->pMeterQInfo;
@ -7525,17 +7431,10 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn
numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, forwardStep, pFields, pBlockInfo, pWindowResInfo, searchFn);
}
// update the number of result for each
if (pQuery->intervalTime == 0) { // todo refactor
SWindowResInfo *p1 = &pRuntimeEnv->windowResInfo;
for (int32_t i = 0; i < p1->size; ++i) {
SWindowResult* pResult = getWindowResult(p1, i);
if (isWindowResClosed(p1, i) && pResult->numOfRows == 0) {
pResult->numOfRows = getNumOfResult(pRuntimeEnv);
}
}
}
assert(numOfRes >= 0);
// todo merge refactor
updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo);
updatelastkey(pQuery, pMeterQueryInfo);
}

View File

@ -84,7 +84,7 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo) {
SQuery * pQuery = &pQInfo->query;
STableQuerySupportObj *pSupporter = pQInfo->pTableQuerySupporter;
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv;
@ -119,18 +119,18 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
pQInfo->pObj = pMeterObj;
pRuntimeEnv->pMeterObj = pMeterObj;
if (pMeterInfo[k].pMeterQInfo == NULL) {
pMeterInfo[k].pMeterQInfo =
if (pMeterDataInfo[k].pMeterQInfo == NULL) {
pMeterDataInfo[k].pMeterQInfo =
createMeterQueryInfo(pSupporter, pMeterObj->sid, pSupporter->rawSKey, pSupporter->rawEKey);
}
if (pMeterInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer
setMeterDataInfo(&pMeterInfo[k], pMeterObj, k, groupIdx);
if (pMeterDataInfo[k].pMeterObj == NULL) { // no data in disk for this meter, set its pointer
setMeterDataInfo(&pMeterDataInfo[k], pMeterObj, k, groupIdx);
}
assert(pMeterInfo[k].meterOrderIdx == k && pMeterObj == pMeterInfo[k].pMeterObj);
assert(pMeterDataInfo[k].meterOrderIdx == k && pMeterObj == pMeterDataInfo[k].pMeterObj);
SMeterQueryInfo *pMeterQueryInfo = pMeterInfo[k].pMeterQInfo;
SMeterQueryInfo *pMeterQueryInfo = pMeterDataInfo[k].pMeterQInfo;
restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
/*
@ -208,11 +208,10 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
}
if (pQuery->intervalTime == 0) {
setExecutionContext(pSupporter, pRuntimeEnv->windowResInfo.pResult, k, pMeterInfo[k].groupIdx,
pMeterQueryInfo);
setExecutionContext(pSupporter, pMeterQueryInfo, k, pMeterDataInfo[k].groupIdx, key);
} else {
setIntervalQueryRange(pMeterQueryInfo, pSupporter, key);
int32_t ret = setIntervalQueryExecutionContext(pSupporter, k, pMeterQueryInfo);
int32_t ret = setAdditionalInfo(pSupporter, k, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->killed = 1;
return;
@ -224,14 +223,14 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterInfo) {
// only record the key on last block
SET_CACHE_BLOCK_FLAG(pRuntimeEnv->blockStatus);
SBlockInfo binfo = getBlockBasicInfo(pRuntimeEnv, pBlock, BLK_CACHE_BLOCK);
SBlockInfo binfo = getBlockInfo(pRuntimeEnv);
dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, bstatus:%d",
GET_QINFO_ADDR(pQuery), binfo.keyFirst, binfo.keyLast, pQuery->fileId, pQuery->slot, pQuery->pos,
pRuntimeEnv->blockStatus);
totalBlocks++;
stableApplyFunctionsOnBlock(pSupporter, &pMeterInfo[k], &binfo, NULL, searchFn);
stableApplyFunctionsOnBlock(pSupporter, &pMeterDataInfo[k], &binfo, NULL, searchFn);
if (ALL_CACHE_BLOCKS_CHECKED(pQuery)) {
break;
@ -400,8 +399,8 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
SCompBlock *pBlock = pInfoEx->pBlock.compBlock;
bool ondemandLoad = onDemandLoadDatablock(pQuery, pMeterQueryInfo->queryRangeSet);
int32_t ret = LoadDatablockOnDemand(pBlock, &pInfoEx->pBlock.fields, &pRuntimeEnv->blockStatus, pRuntimeEnv,
fileIdx, pInfoEx->blockIndex, searchFn, ondemandLoad);
ret = LoadDatablockOnDemand(pBlock, &pInfoEx->pBlock.fields, &pRuntimeEnv->blockStatus, pRuntimeEnv, fileIdx,
pInfoEx->blockIndex, searchFn, ondemandLoad);
if (ret != DISK_DATA_LOADED) {
pSummary->skippedFileBlocks++;
continue;
@ -425,21 +424,22 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
// if data block is not loaded, it must be the intermediate blocks
assert((pBlock->keyFirst >= pQuery->lastKey && pBlock->keyLast <= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pBlock->keyFirst >= pQuery->ekey && pBlock->keyLast <= pQuery->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
nextKey = QUERY_IS_ASC_QUERY(pQuery)? pBlock->keyFirst:pBlock->keyLast;
}
if (pQuery->intervalTime == 0) {
setExecutionContext(pSupporter, pRuntimeEnv->windowResInfo.pResult, pOneMeterDataInfo->meterOrderIdx,
pOneMeterDataInfo->groupIdx, pMeterQueryInfo);
} else /* if (pQuery->intervalTime > 0)*/ { // interval query
setExecutionContext(pSupporter, pMeterQueryInfo, pOneMeterDataInfo->meterOrderIdx, pOneMeterDataInfo->groupIdx,
nextKey);
} else { // interval query
setIntervalQueryRange(pMeterQueryInfo, pSupporter, nextKey);
ret = setIntervalQueryExecutionContext(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo);
ret = setAdditionalInfo(pSupporter, pOneMeterDataInfo->meterOrderIdx, pMeterQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
tfree(pReqMeterDataInfo); // error code has been set
pQInfo->killed = 1;
return;
}
}
stableApplyFunctionsOnBlock(pSupporter, pOneMeterDataInfo, &binfo, pInfoEx->pBlock.fields, searchFn);
}
@ -849,8 +849,8 @@ static void doOrderedScan(SQInfo *pQInfo) {
}
static void setupMeterQueryInfoForSupplementQuery(STableQuerySupportObj *pSupporter) {
SQuery* pQuery = pSupporter->runtimeEnv.pQuery;
SQuery *pQuery = pSupporter->runtimeEnv.pQuery;
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
changeMeterQueryInfoForSuppleQuery(pQuery, pMeterQueryInfo, pSupporter->rawSKey, pSupporter->rawEKey);
@ -1101,7 +1101,7 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
doFinalizeResult(pRuntimeEnv);
// int64_t maxOutput = getNumOfResult(pRuntimeEnv);
// int64_t maxOutput = getNumOfResult(pRuntimeEnv);
// here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query
@ -1109,8 +1109,8 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
pQuery->interpoType == TSDB_INTERPO_NONE) {
// maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo);
int32_t c = MIN(numOfClosed, pQuery->limit.offset);
int32_t c = MIN(numOfClosed, pQuery->limit.offset);
clearFirstNTimeWindow(pRuntimeEnv, c);
pQuery->limit.offset -= c;
} else {
@ -1127,16 +1127,16 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
break;
}
// /*
// * the scan limitation mechanism is upon here,
// * 1. since there is only one(k) record is generated in one scan operation
// * 2. remain space is not sufficient for next query output, abort
// */
// if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) ||
// ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) {
// setQueryStatus(pQuery, QUERY_RESBUF_FULL);
// break;
// }
// /*
// * the scan limitation mechanism is upon here,
// * 1. since there is only one(k) record is generated in one scan operation
// * 2. remain space is not sufficient for next query output, abort
// */
// if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) ||
// ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) {
// setQueryStatus(pQuery, QUERY_RESBUF_FULL);
// break;
// }
}
}
@ -1158,7 +1158,7 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
pSupporter->subgroupIdx = 0; // always start from 0
pQuery->pointsRead = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx);
}
@ -1221,11 +1221,11 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
assert(pQInfo->refCount >= 1);
SQuery * pQuery = &pQInfo->query;
SMeterObj *pMeterObj = pQInfo->pObj;
STableQuerySupportObj* pSupporter = pQInfo->pTableQuerySupporter;
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = &pQInfo->query;
SMeterObj * pMeterObj = pQInfo->pObj;
STableQuerySupportObj *pSupporter = pQInfo->pTableQuerySupporter;
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
assert(pRuntimeEnv->pMeterObj == pMeterObj);
dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid,
@ -1265,13 +1265,13 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->intervalTime > 0) {
pQuery->pointsRead = 0;
pSupporter->subgroupIdx = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->pointsRead += pQuery->pointsRead;
clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx);
if (pQuery->pointsRead > 0) {
dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,