|
|
|
@ -48,6 +48,7 @@
|
|
|
|
|
#define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
|
|
|
|
|
|
|
|
|
|
#define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step))
|
|
|
|
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC))
|
|
|
|
|
|
|
|
|
|
/* get the qinfo struct address from the query struct address */
|
|
|
|
|
#define GET_COLUMN_BYTES(query, colidx) \
|
|
|
|
@ -82,15 +83,24 @@ typedef enum {
|
|
|
|
|
QUERY_OVER = 0x8u,
|
|
|
|
|
} vnodeQueryStatus;
|
|
|
|
|
|
|
|
|
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
|
|
|
|
bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
|
|
|
|
|
|
|
|
|
enum {
|
|
|
|
|
TS_JOIN_TS_EQUAL = 0,
|
|
|
|
|
TS_JOIN_TS_NOT_EQUALS = 1,
|
|
|
|
|
TS_JOIN_TAG_NOT_EQUALS = 2,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t status; // query status
|
|
|
|
|
TSKEY lastKey; // the lastKey value before query executed
|
|
|
|
|
STimeWindow w; // whole query time window
|
|
|
|
|
STimeWindow current; // current query window
|
|
|
|
|
int32_t windowIndex; // index of active time window result for interval query
|
|
|
|
|
STSCursor cur;
|
|
|
|
|
} SQueryStatusInfo;
|
|
|
|
|
|
|
|
|
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
|
|
|
|
bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
|
|
|
|
|
|
|
|
|
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group);
|
|
|
|
|
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult);
|
|
|
|
|
|
|
|
|
@ -2224,111 +2234,11 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
|
|
|
|
|
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
|
|
|
|
|
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
|
|
|
|
|
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
|
|
|
|
|
qTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
|
|
|
|
|
pQuery->window.ekey, pQuery->order.order);
|
|
|
|
|
|
|
|
|
|
sem_post(&pQInfo->dataReady);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQuery->status = 0;
|
|
|
|
|
pQuery->rec = (SResultRec){0};
|
|
|
|
|
|
|
|
|
|
changeExecuteScanOrder(pQuery, true);
|
|
|
|
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* since we employ the output control mechanism in main loop.
|
|
|
|
|
* so, disable it during data block scan procedure.
|
|
|
|
|
*/
|
|
|
|
|
setScanLimitationByResultBuffer(pQuery);
|
|
|
|
|
|
|
|
|
|
// save raw query range for applying to each subgroup
|
|
|
|
|
pQuery->lastKey = pQuery->window.skey;
|
|
|
|
|
|
|
|
|
|
// create runtime environment
|
|
|
|
|
// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
|
|
|
|
|
|
|
|
|
|
// get one queried meter
|
|
|
|
|
assert(0);
|
|
|
|
|
// SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid);
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pTSBuf = param;
|
|
|
|
|
pRuntimeEnv->cur.vnodeIndex = -1;
|
|
|
|
|
|
|
|
|
|
// set the ts-comp file traverse order
|
|
|
|
|
if (param != NULL) {
|
|
|
|
|
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
|
|
|
|
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(0);
|
|
|
|
|
// int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true);
|
|
|
|
|
// if (ret != TSDB_CODE_SUCCESS) {
|
|
|
|
|
// return ret;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// createTableGroup(pQInfo->pSidSet);
|
|
|
|
|
|
|
|
|
|
int32_t size = getInitialPageNum(pQInfo);
|
|
|
|
|
int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pQuery->intervalTime == 0) {
|
|
|
|
|
int16_t type = TSDB_DATA_TYPE_NULL;
|
|
|
|
|
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
|
|
|
|
|
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
|
|
|
|
|
} else {
|
|
|
|
|
type = TSDB_DATA_TYPE_INT; // group id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true);
|
|
|
|
|
|
|
|
|
|
STsdbQueryCond cond = {
|
|
|
|
|
.twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey},
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) {
|
|
|
|
|
// SMeterObj *p1 = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
|
|
|
|
|
// taosArrayPush(sa, &p1);
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
|
|
|
|
taosArrayPush(cols, &pQuery->colList[i]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo, cols);
|
|
|
|
|
|
|
|
|
|
// metric query do not invoke interpolation, it will be done at the second-stage merge
|
|
|
|
|
if (!isPointInterpoQuery(pQuery)) {
|
|
|
|
|
pQuery->interpoType = TSDB_INTERPO_NONE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit,
|
|
|
|
|
pQuery->precision);
|
|
|
|
|
taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0);
|
|
|
|
|
pRuntimeEnv->stableQuery = true;
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* decrease the refcount for each table involved in this query
|
|
|
|
|
* @param pQInfo
|
|
|
|
|
*/
|
|
|
|
|
void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
|
|
|
|
|
UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
|
|
|
|
|
if (pQInfo != NULL) {
|
|
|
|
|
// assert(taosHashGetSize(pQInfo->groupInfo) >= 1);
|
|
|
|
|
}
|
|
|
|
@ -2362,7 +2272,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) {
|
|
|
|
|
UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
|
|
|
@ -2907,14 +2817,14 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
|
|
|
|
|
|
|
|
|
while (pQInfo->subgroupIdx < numOfGroups) {
|
|
|
|
|
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx);
|
|
|
|
|
while (pQInfo->groupIndex < numOfGroups) {
|
|
|
|
|
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
|
|
|
|
|
ret = mergeIntoGroupResultImpl(pQInfo, group);
|
|
|
|
|
if (ret < 0) { // not enough disk space to save the data into disk
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQInfo->subgroupIdx += 1;
|
|
|
|
|
pQInfo->groupIndex += 1;
|
|
|
|
|
|
|
|
|
|
// this group generates at least one result, return results
|
|
|
|
|
if (ret > 0) {
|
|
|
|
@ -2922,11 +2832,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(pQInfo->numOfGroupResultPages == 0);
|
|
|
|
|
qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->subgroupIdx - 1);
|
|
|
|
|
qTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
|
|
|
|
|
pQInfo, pQInfo->subgroupIdx - 1, numOfGroups, taosGetTimestampMs() - st);
|
|
|
|
|
pQInfo, pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
@ -2941,7 +2851,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// set current query completed
|
|
|
|
|
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
|
|
|
|
|
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == pQInfo->pSidSet->numOfSubSet) {
|
|
|
|
|
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
|
|
|
|
|
// return;
|
|
|
|
|
// }
|
|
|
|
@ -2950,7 +2860,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|
|
|
|
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
|
|
|
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
|
|
|
|
|
|
|
|
|
int32_t id = getGroupResultId(pQInfo->subgroupIdx - 1);
|
|
|
|
|
int32_t id = getGroupResultId(pQInfo->groupIndex - 1);
|
|
|
|
|
SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id);
|
|
|
|
|
|
|
|
|
|
int32_t total = 0;
|
|
|
|
@ -3156,7 +3066,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
|
|
|
|
|
r = capacity;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t id = getGroupResultId(pQInfo->subgroupIdx) + pQInfo->numOfGroupResultPages;
|
|
|
|
|
int32_t id = getGroupResultId(pQInfo->groupIndex) + pQInfo->numOfGroupResultPages;
|
|
|
|
|
tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId);
|
|
|
|
|
|
|
|
|
|
// pagewise copy to dest buffer
|
|
|
|
@ -3205,8 +3115,8 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
|
|
|
|
|
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
|
|
|
|
|
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
|
|
|
|
|
|
|
|
|
|
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) ||
|
|
|
|
|
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) {
|
|
|
|
|
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
|
|
|
|
|
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
|
|
|
|
|
buf->resultInfo[j].complete = false;
|
|
|
|
|
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
|
|
|
|
|
buf->resultInfo[j].complete = true;
|
|
|
|
@ -3215,32 +3125,28 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
|
|
|
|
|
void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
int32_t order = pQuery->order.order;
|
|
|
|
|
|
|
|
|
|
// group by normal columns and interval query on normal table
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
|
|
|
|
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
|
|
|
|
|
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
|
|
|
|
|
} else { // for simple result of table query,
|
|
|
|
|
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
|
|
|
|
|
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
|
|
|
|
|
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
|
|
|
|
|
|
|
|
|
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
|
|
|
|
|
|
|
|
|
|
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) ||
|
|
|
|
|
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) {
|
|
|
|
|
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
|
|
|
|
|
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
|
|
|
|
|
pCtx->resultInfo->complete = false;
|
|
|
|
|
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
|
|
|
|
|
pCtx->resultInfo->complete = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQuery->order.order = pQuery->order.order ^ 1u;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
|
|
|
|
@ -3266,14 +3172,11 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
|
|
|
|
|
pQuery->order.order = (pQuery->order.order) ^ 1u;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void enableFuncForForwardScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
|
|
|
|
|
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
|
|
|
|
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
|
|
|
|
|
SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQuery->order.order = (pQuery->order.order) ^ 1u;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) {
|
|
|
|
@ -3387,70 +3290,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SQueryStatus {
|
|
|
|
|
int8_t overStatus;
|
|
|
|
|
TSKEY lastKey;
|
|
|
|
|
STSCursor cur;
|
|
|
|
|
} SQueryStatus;
|
|
|
|
|
|
|
|
|
|
// todo refactor
|
|
|
|
|
static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
pStatus->overStatus = pQuery->status;
|
|
|
|
|
pStatus->lastKey = pQuery->lastKey;
|
|
|
|
|
|
|
|
|
|
pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
|
|
|
|
|
|
|
|
|
|
if (pRuntimeEnv->pTSBuf) {
|
|
|
|
|
pRuntimeEnv->pTSBuf->cur.order ^= 1u;
|
|
|
|
|
tsBufNextPos(pRuntimeEnv->pTSBuf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
|
|
|
|
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
|
|
|
|
pQuery->lastKey = pQuery->window.skey;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
|
|
|
|
|
|
|
|
|
pQuery->lastKey = pStatus->lastKey;
|
|
|
|
|
pQuery->status = pStatus->overStatus;
|
|
|
|
|
|
|
|
|
|
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
SQueryStatus qStatus = {0};
|
|
|
|
|
|
|
|
|
|
if (!needReverseScan(pQuery)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qTrace("QInfo:%p start to supp scan", GET_QINFO_ADDR(pQuery));
|
|
|
|
|
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
// close necessary function execution during supplementary scan
|
|
|
|
|
disableFunctForTableSuppleScan(pRuntimeEnv, pQuery->order.order);
|
|
|
|
|
queryStatusSave(pRuntimeEnv, &qStatus);
|
|
|
|
|
|
|
|
|
|
STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
|
|
|
|
|
|
|
|
|
|
// reverse scan from current position
|
|
|
|
|
TsdbPosT current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle);
|
|
|
|
|
tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order);
|
|
|
|
|
|
|
|
|
|
doScanAllDataBlocks(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
queryStatusRestore(pRuntimeEnv, &qStatus);
|
|
|
|
|
enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order);
|
|
|
|
|
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void setQueryStatus(SQuery *pQuery, int8_t status) {
|
|
|
|
|
if (status == QUERY_NOT_COMPLETED) {
|
|
|
|
|
pQuery->status = status;
|
|
|
|
@ -3506,82 +3345,140 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
return toContinue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv* pRuntimeEnv) {
|
|
|
|
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
SQueryStatusInfo info = {
|
|
|
|
|
.status = pQuery->status,
|
|
|
|
|
.windowIndex = pRuntimeEnv->windowResInfo.curIndex,
|
|
|
|
|
.lastKey = pQuery->lastKey,
|
|
|
|
|
.w = pQuery->window,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return info;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusInfo* pStatus) {
|
|
|
|
|
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
|
|
|
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
// the step should be placed before order changed
|
|
|
|
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
|
|
|
|
|
|
|
|
|
pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
|
|
|
|
|
if (pRuntimeEnv->pTSBuf) {
|
|
|
|
|
SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order);
|
|
|
|
|
tsBufNextPos(pRuntimeEnv->pTSBuf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// reverse order time range
|
|
|
|
|
pQuery->window.skey = pQuery->lastKey - step;
|
|
|
|
|
pQuery->window.ekey = pStatus->lastKey; // the start timestamp of current query
|
|
|
|
|
|
|
|
|
|
SWITCH_ORDER(pQuery->order.order);
|
|
|
|
|
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
STsdbQueryCond cond = {
|
|
|
|
|
.twindow = pQuery->window,
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.numOfCols = pQuery->numOfCols,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// clean unused handle
|
|
|
|
|
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
|
|
|
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
|
|
|
|
|
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
switchCtxOrder(pRuntimeEnv);
|
|
|
|
|
disableFuncInReverseScan(pRuntimeEnv);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKey, SQueryStatusInfo* pStatus) {
|
|
|
|
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
SWITCH_ORDER(pQuery->order.order);
|
|
|
|
|
switchCtxOrder(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur);
|
|
|
|
|
if (pRuntimeEnv->pTSBuf) {
|
|
|
|
|
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
|
|
|
|
|
// during reverse scan
|
|
|
|
|
pQuery->lastKey = lastKey;
|
|
|
|
|
pQuery->status = pStatus->status;
|
|
|
|
|
pQuery->window = pStatus->w;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
|
|
|
|
|
// store the start query position
|
|
|
|
|
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
int64_t skey = pQuery->lastKey;
|
|
|
|
|
int32_t status = pQuery->status;
|
|
|
|
|
int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex;
|
|
|
|
|
SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
|
|
|
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
doScanAllDataBlocks(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
|
|
|
|
|
qstatus.status = pQuery->status;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
|
|
|
|
|
|
|
|
|
|
// restore the status
|
|
|
|
|
// restore the status code and jump out of loop
|
|
|
|
|
if (pRuntimeEnv->scanFlag == REPEAT_SCAN) {
|
|
|
|
|
pQuery->status = status;
|
|
|
|
|
pQuery->status = qstatus.status;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// set the correct start position, and load the corresponding block in buffer for next round scan all data blocks.
|
|
|
|
|
// /*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos);
|
|
|
|
|
|
|
|
|
|
STsdbQueryCond cond = {
|
|
|
|
|
.twindow = {pQuery->window.skey, pQuery->lastKey},
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.twindow = {.skey = qstatus.lastKey, .ekey = pQuery->lastKey - step},
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.numOfCols = pQuery->numOfCols,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
|
|
|
|
taosArrayPush(cols, &pQuery->colList[i]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
|
|
|
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo, cols);
|
|
|
|
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(cols);
|
|
|
|
|
|
|
|
|
|
status = pQuery->status;
|
|
|
|
|
pRuntimeEnv->windowResInfo.curIndex = activeSlot;
|
|
|
|
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
|
|
|
|
|
pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex;
|
|
|
|
|
|
|
|
|
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
|
|
|
|
pRuntimeEnv->scanFlag = REPEAT_SCAN;
|
|
|
|
|
|
|
|
|
|
// check if query is killed or not
|
|
|
|
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
|
|
|
|
if (isQueryKilled(pQInfo)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!needReverseScan(pQuery)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TSKEY lastKey = pQuery->lastKey;
|
|
|
|
|
setEnvBeforeReverseScan(pRuntimeEnv, &qstatus);
|
|
|
|
|
|
|
|
|
|
// no need to set the end key
|
|
|
|
|
TSKEY lkey = pQuery->lastKey;
|
|
|
|
|
TSKEY ekey = pQuery->window.ekey;
|
|
|
|
|
|
|
|
|
|
pQuery->window.skey = skey;
|
|
|
|
|
pQuery->window.ekey = pQuery->lastKey - step;
|
|
|
|
|
/*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle);
|
|
|
|
|
|
|
|
|
|
doSingleMeterSupplementScan(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
|
|
|
|
|
pQuery->lastKey = lkey;
|
|
|
|
|
pQuery->window.ekey = ekey;
|
|
|
|
|
|
|
|
|
|
// STimeWindow win = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
|
|
|
|
|
// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order);
|
|
|
|
|
// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle);
|
|
|
|
|
// reverse scan from current position
|
|
|
|
|
qTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv));
|
|
|
|
|
doScanAllDataBlocks(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
|
|
|
|
@ -3875,17 +3772,17 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
|
|
|
|
|
int32_t totalSubset = getNumOfSubset(pQInfo);
|
|
|
|
|
|
|
|
|
|
if (orderType == TSDB_ORDER_ASC) {
|
|
|
|
|
startIdx = pQInfo->subgroupIdx;
|
|
|
|
|
startIdx = pQInfo->groupIndex;
|
|
|
|
|
step = 1;
|
|
|
|
|
} else { // desc order copy all data
|
|
|
|
|
startIdx = totalSubset - pQInfo->subgroupIdx - 1;
|
|
|
|
|
startIdx = totalSubset - pQInfo->groupIndex - 1;
|
|
|
|
|
step = -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) {
|
|
|
|
|
if (result[i].numOfRows == 0) {
|
|
|
|
|
pQInfo->offset = 0;
|
|
|
|
|
pQInfo->subgroupIdx += 1;
|
|
|
|
|
pQInfo->groupIndex += 1;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3903,7 +3800,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
|
|
|
|
|
pQInfo->offset += numOfRowsToCopy;
|
|
|
|
|
} else {
|
|
|
|
|
pQInfo->offset = 0;
|
|
|
|
|
pQInfo->subgroupIdx += 1;
|
|
|
|
|
pQInfo->groupIndex += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
|
|
|
|
@ -4174,18 +4071,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
|
|
|
|
|
pQuery->lastKey = pQuery->window.skey;
|
|
|
|
|
|
|
|
|
|
STsdbQueryCond cond = {
|
|
|
|
|
.twindow = pQuery->window,
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.twindow = pQuery->window,
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.numOfCols = pQuery->numOfCols,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
|
|
|
|
taosArrayPush(cols, &pQuery->colList[i]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo, cols);
|
|
|
|
|
taosArrayDestroy(cols);
|
|
|
|
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo);
|
|
|
|
|
pQInfo->tsdb = tsdb;
|
|
|
|
|
|
|
|
|
|
pRuntimeEnv->pQuery = pQuery;
|
|
|
|
@ -4403,25 +4295,19 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
|
|
|
|
.twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey},
|
|
|
|
|
.order = pQuery->order.order,
|
|
|
|
|
.colList = pQuery->colList,
|
|
|
|
|
.numOfCols = pQuery->numOfCols,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
|
|
|
|
taosArrayPush(cols, &pQuery->colList[i]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SArray* g1 = taosArrayInit(1, POINTER_BYTES);
|
|
|
|
|
SArray* tx = taosArrayInit(1, sizeof(SPair));
|
|
|
|
|
|
|
|
|
|
taosArrayPush(tx, p);
|
|
|
|
|
taosArrayPush(g1, &tx);
|
|
|
|
|
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
|
|
|
|
|
|
|
|
|
|
SArray* tx = taosArrayInit(1, sizeof(SPair));
|
|
|
|
|
taosArrayPush(tx, p);
|
|
|
|
|
|
|
|
|
|
taosArrayPush(g1, &tx);
|
|
|
|
|
// include only current table
|
|
|
|
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, cols);
|
|
|
|
|
|
|
|
|
|
// vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj);
|
|
|
|
|
// vnodeUpdateFilterColumnIndex(pQuery);
|
|
|
|
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp);
|
|
|
|
|
|
|
|
|
|
if (pRuntimeEnv->pTSBuf != NULL) {
|
|
|
|
|
if (pRuntimeEnv->cur.vnodeIndex == -1) {
|
|
|
|
@ -4501,14 +4387,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
|
|
|
|
|
|
|
|
|
|
#if 0
|
|
|
|
|
while (pQInfo->subgroupIdx < numOfGroups) {
|
|
|
|
|
while (pQInfo->groupIndex < numOfGroups) {
|
|
|
|
|
|
|
|
|
|
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx);
|
|
|
|
|
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
|
|
|
|
|
size_t numOfTable = taosArrayGetSize(group);
|
|
|
|
|
|
|
|
|
|
if (isFirstLastRowQuery(pQuery)) {
|
|
|
|
|
qTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
|
|
|
|
|
pQInfo->subgroupIdx);
|
|
|
|
|
pQInfo->groupIndex);
|
|
|
|
|
|
|
|
|
|
TSKEY key = -1;
|
|
|
|
|
int32_t index = -1;
|
|
|
|
@ -4529,7 +4415,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
// assert(num >= 0);
|
|
|
|
|
} else {
|
|
|
|
|
qTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
|
|
|
|
|
pQInfo->subgroupIdx);
|
|
|
|
|
pQInfo->groupIndex);
|
|
|
|
|
|
|
|
|
|
for (int32_t k = start; k <= end; ++k) {
|
|
|
|
|
if (isQueryKilled(pQInfo)) {
|
|
|
|
@ -4547,7 +4433,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pSupporter->subgroupIdx++;
|
|
|
|
|
pSupporter->groupIndex++;
|
|
|
|
|
|
|
|
|
|
// output buffer is full, return to client
|
|
|
|
|
if (pQuery->size >= pQuery->pointsToRead) {
|
|
|
|
@ -4564,7 +4450,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
|
|
|
|
|
* we need to return it to client in the first place.
|
|
|
|
|
*/
|
|
|
|
|
if (pQInfo->subgroupIdx > 0) {
|
|
|
|
|
if (pQInfo->groupIndex > 0) {
|
|
|
|
|
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
|
|
|
|
pQuery->rec.total += pQuery->rec.rows;
|
|
|
|
|
|
|
|
|
@ -4585,13 +4471,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList));
|
|
|
|
|
|
|
|
|
|
while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) {
|
|
|
|
|
int32_t k = pQInfo->tableIndex;
|
|
|
|
|
|
|
|
|
|
if (isQueryKilled(pQInfo)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SPair *p = taosArrayGet(group, k);
|
|
|
|
|
SPair *p = taosArrayGet(group, pQInfo->tableIndex);
|
|
|
|
|
STableDataInfo* pInfo = p->sec;
|
|
|
|
|
|
|
|
|
|
TSKEY skey = pInfo->pTableQInfo->lastKey;
|
|
|
|
@ -4599,7 +4483,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
pQuery->window.skey = skey;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!multiTableMultioutputHelper(pQInfo, k)) {
|
|
|
|
|
if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) {
|
|
|
|
|
pQInfo->tableIndex++;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -4696,7 +4580,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQInfo->subgroupIdx = 0;
|
|
|
|
|
pQInfo->groupIndex = 0;
|
|
|
|
|
pQuery->rec.rows = 0;
|
|
|
|
|
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
|
|
|
|
|
}
|
|
|
|
@ -4773,7 +4657,7 @@ static void doRestoreContext(SQInfo* pQInfo) {
|
|
|
|
|
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order);
|
|
|
|
|
switchCtxOrder(pRuntimeEnv);
|
|
|
|
|
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -4806,9 +4690,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|
|
|
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
|
|
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
|
|
|
|
|
|
|
|
|
if (pQInfo->subgroupIdx > 0) {
|
|
|
|
|
if (pQInfo->groupIndex > 0) {
|
|
|
|
|
/*
|
|
|
|
|
* if the subgroupIdx > 0, the query process must be completed yet, we only need to
|
|
|
|
|
* if the groupIndex > 0, the query process must be completed yet, we only need to
|
|
|
|
|
* copy the data into output buffer
|
|
|
|
|
*/
|
|
|
|
|
if (isIntervalQuery(pQuery)) {
|
|
|
|
@ -4870,7 +4754,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
|
|
|
|
|
// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0);
|
|
|
|
|
// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0);
|
|
|
|
|
|
|
|
|
|
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
|
|
|
|
|
copyResToQueryResultBuf(pQInfo, pQuery);
|
|
|
|
@ -5008,11 +4892,11 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
|
|
|
|
|
tableIntervalProcessImpl(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
if (isIntervalQuery(pQuery)) {
|
|
|
|
|
pQInfo->subgroupIdx = 0; // always start from 0
|
|
|
|
|
pQInfo->groupIndex = 0; // always start from 0
|
|
|
|
|
pQuery->rec.rows = 0;
|
|
|
|
|
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
|
|
|
|
|
|
|
|
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
|
|
|
|
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// the offset is handled at prepare stage if no interpolation involved
|
|
|
|
@ -5044,10 +4928,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
|
|
|
|
|
|
|
|
|
|
// all data scanned, the group by normal column can return
|
|
|
|
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result
|
|
|
|
|
pQInfo->subgroupIdx = 0;
|
|
|
|
|
pQInfo->groupIndex = 0;
|
|
|
|
|
pQuery->rec.rows = 0;
|
|
|
|
|
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
|
|
|
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
|
|
|
|
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pQInfo->pointsInterpo += numOfInterpo;
|
|
|
|
@ -5083,13 +4967,13 @@ static void tableQueryImpl(SQInfo* pQInfo) {
|
|
|
|
|
|
|
|
|
|
// todo limit the output for interval query?
|
|
|
|
|
pQuery->rec.rows = 0;
|
|
|
|
|
pQInfo->subgroupIdx = 0; // always start from 0
|
|
|
|
|
pQInfo->groupIndex = 0; // always start from 0
|
|
|
|
|
|
|
|
|
|
if (pRuntimeEnv->windowResInfo.size > 0) {
|
|
|
|
|
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
|
|
|
|
pQuery->rec.rows += pQuery->rec.rows;
|
|
|
|
|
|
|
|
|
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
|
|
|
|
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
|
|
|
|
|
|
|
|
|
if (pQuery->rec.rows > 0) {
|
|
|
|
|
qTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
|
|
|
@ -5895,13 +5779,6 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|
|
|
|
sem_destroy(&(pQInfo->dataReady));
|
|
|
|
|
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
|
|
|
|
|
|
|
|
|
|
// if (pQInfo->pTableDataInfo != NULL) {
|
|
|
|
|
// size_t num = taosHashGetSize(pQInfo->groupInfo);
|
|
|
|
|
// for (int32_t j = 0; j < 0; ++j) {
|
|
|
|
|
// destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
|
|
|
|
|
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
|
|
|
|
|
if (pColFilter->numOfFilters > 0) {
|
|
|
|
@ -5933,6 +5810,12 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|
|
|
|
tfree(pQuery->pGroupbyExpr);
|
|
|
|
|
tfree(pQuery);
|
|
|
|
|
|
|
|
|
|
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
|
|
|
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
|
|
|
|
SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
|
|
|
|
|
taosArrayDestroy(p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
|
|
|
|
|
|
|
|
|
|
qTrace("QInfo:%p QInfo is freed", pQInfo);
|
|
|
|
@ -6036,7 +5919,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool isSTableQuery = false;
|
|
|
|
|
STableGroupInfo* groupInfo = calloc(1, sizeof(STableGroupInfo));
|
|
|
|
|
STableGroupInfo groupInfo = {0};
|
|
|
|
|
|
|
|
|
|
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
|
|
|
|
|
isSTableQuery = true;
|
|
|
|
@ -6044,8 +5927,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|
|
|
|
STableId* id = taosArrayGet(pTableIdList, 0);
|
|
|
|
|
id->uid = -1; //todo fix me
|
|
|
|
|
|
|
|
|
|
/*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols);
|
|
|
|
|
if (groupInfo->numOfTables == 0) { // no qualified tables no need to do query
|
|
|
|
|
/*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols);
|
|
|
|
|
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
|
|
|
|
|
code = TSDB_CODE_SUCCESS;
|
|
|
|
|
goto _query_over;
|
|
|
|
|
}
|
|
|
|
@ -6053,12 +5936,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|
|
|
|
assert(taosArrayGetSize(pTableIdList) == 1);
|
|
|
|
|
|
|
|
|
|
STableId* id = taosArrayGet(pTableIdList, 0);
|
|
|
|
|
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, groupInfo)) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
goto _query_over;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, groupInfo);
|
|
|
|
|
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo);
|
|
|
|
|
if ((*pQInfo) == NULL) {
|
|
|
|
|
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
@ -6066,24 +5949,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
|
|
|
|
code = initQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery);
|
|
|
|
|
|
|
|
|
|
_query_over:
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
taosArrayDestroy(pTableIdList);
|
|
|
|
|
}
|
|
|
|
|
taosArrayDestroy(pTableIdList);
|
|
|
|
|
|
|
|
|
|
// if failed to add ref for all meters in this query, abort current query
|
|
|
|
|
// if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
// vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber);
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// tfree(pQueryMsg->pSqlFuncExprs);
|
|
|
|
|
// tfree(pMeterObjList);
|
|
|
|
|
// ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle);
|
|
|
|
|
//
|
|
|
|
|
// tfree(pQueryMsg->pSidExtInfo);
|
|
|
|
|
// for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) {
|
|
|
|
|
// vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|