commit
863f2362d9
|
@ -1951,36 +1951,36 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
|
|||
|
||||
// todo handle the case the the order irrelevant query type mixed up with order critical query type
|
||||
// descending order query for last_row query
|
||||
if (isFirstLastRowQuery(pQuery) && !QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
if (isFirstLastRowQuery(pQuery)) {
|
||||
qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery),
|
||||
pQuery->order.order, TSDB_ORDER_ASC);
|
||||
|
||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
pQuery->order.order = TSDB_ORDER_ASC;
|
||||
assert (pQuery->window.skey <= pQuery->window.ekey);
|
||||
if (pQuery->window.skey > pQuery->window.ekey) {
|
||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) && pQuery->order.order == TSDB_ORDER_DESC) {
|
||||
pQuery->order.order = TSDB_ORDER_ASC;
|
||||
if (pQuery->window.skey > pQuery->window.ekey) {
|
||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
}
|
||||
|
||||
doExchangeTimeWindow(pQInfo, &pQuery->window);
|
||||
return;
|
||||
}
|
||||
|
||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) && !QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
pQuery->order.order = TSDB_ORDER_ASC;
|
||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
assert (pQuery->window.skey <= pQuery->window.ekey);
|
||||
|
||||
doExchangeTimeWindow(pQInfo, &pQuery->window);
|
||||
return;
|
||||
}
|
||||
|
||||
if (isPointInterpoQuery(pQuery) && (pQuery->intervalTime == 0) && !QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey,
|
||||
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) {
|
||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey,
|
||||
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||
}
|
||||
|
||||
pQuery->order.order = TSDB_ORDER_ASC;
|
||||
|
||||
assert (pQuery->window.skey <= pQuery->window.ekey);
|
||||
doExchangeTimeWindow(pQInfo, &pQuery->window);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2365,16 +2365,16 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
|
|||
memset(tmp + sizeof(tFilePage) + bytes * pRec->rows, 0, (size_t)((newSize - pRec->rows) * bytes));
|
||||
pQuery->sdata[i] = (tFilePage *)tmp;
|
||||
}
|
||||
|
||||
|
||||
// set the pCtx output buffer position
|
||||
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes;
|
||||
|
||||
|
||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
||||
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
qDebug("QInfo:%p realloc output buffer, new size: %d rows, old:%" PRId64 ", remain:%" PRId64, GET_QINFO_ADDR(pRuntimeEnv),
|
||||
newSize, pRec->capacity, newSize - pRec->rows);
|
||||
|
||||
|
@ -2920,11 +2920,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
STableQueryInfo *item = taosArrayGetP(pGroup, i);
|
||||
|
||||
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
|
||||
pageList = list;
|
||||
tid = TSDB_TABLEID(item->pTable)->tid;
|
||||
|
||||
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
|
||||
pTableList[numOfTables++] = item;
|
||||
tid = TSDB_TABLEID(item->pTable)->tid;
|
||||
pageList = list;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3357,7 +3357,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||
|
||||
|
||||
memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pQuery->rec.rows * bytes));
|
||||
pRuntimeEnv->pCtx[i].aOutputBuf = ((char*) pQuery->sdata[i]->data) + pQuery->rec.rows * bytes;
|
||||
|
||||
|
@ -4354,32 +4354,6 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void freeTableQueryInfo(STableGroupInfo* pTableGroupInfo) {
|
||||
if (pTableGroupInfo->pGroupList == NULL) {
|
||||
assert(pTableGroupInfo->numOfTables == 0);
|
||||
} else {
|
||||
size_t numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
|
||||
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||
SArray *p = taosArrayGetP(pTableGroupInfo->pGroupList, i);
|
||||
|
||||
size_t num = taosArrayGetSize(p);
|
||||
for(int32_t j = 0; j < num; ++j) {
|
||||
STableQueryInfo* item = taosArrayGetP(p, j);
|
||||
destroyTableQueryInfo(item);
|
||||
}
|
||||
|
||||
taosArrayDestroy(p);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTableGroupInfo->pGroupList);
|
||||
pTableGroupInfo->pGroupList = NULL;
|
||||
pTableGroupInfo->numOfTables = 0;
|
||||
}
|
||||
|
||||
taosHashCleanup(pTableGroupInfo->map);
|
||||
pTableGroupInfo->map = NULL;
|
||||
}
|
||||
|
||||
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
@ -4415,22 +4389,20 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
|
|||
terrno = TSDB_CODE_SUCCESS;
|
||||
if (isFirstLastRowQuery(pQuery)) {
|
||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||
if (pRuntimeEnv->pQueryHandle == NULL) { // no data in current stable, clear all
|
||||
freeTableQueryInfo(&pQInfo->tableqinfoGroupInfo);
|
||||
} else { // update the query time window
|
||||
pQuery->window = cond.twindow;
|
||||
|
||||
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||
// update the query time window
|
||||
pQuery->window = cond.twindow;
|
||||
|
||||
size_t t = taosArrayGetSize(group);
|
||||
for (int32_t j = 0; j < t; ++j) {
|
||||
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||
|
||||
pCheckInfo->win = pQuery->window;
|
||||
pCheckInfo->lastKey = pCheckInfo->win.skey;
|
||||
}
|
||||
size_t t = taosArrayGetSize(group);
|
||||
for (int32_t j = 0; j < t; ++j) {
|
||||
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||
|
||||
pCheckInfo->win = pQuery->window;
|
||||
pCheckInfo->lastKey = pCheckInfo->win.skey;
|
||||
}
|
||||
}
|
||||
} else if (isPointInterpoQuery(pQuery)) {
|
||||
|
@ -4484,12 +4456,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
return code;
|
||||
}
|
||||
|
||||
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
|
||||
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pQInfo->tsdb = tsdb;
|
||||
pQInfo->vgId = vgId;
|
||||
|
||||
|
@ -4612,7 +4578,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||
SQueryCostInfo* summary = &pRuntimeEnv->summary;
|
||||
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
|
||||
|
@ -4622,7 +4588,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
summary->totalBlocks += 1;
|
||||
|
||||
|
||||
if (IS_QUERY_KILLED(pQInfo)) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
@ -4659,7 +4625,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
|
||||
summary->totalRows += blockInfo.rows;
|
||||
stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
|
||||
|
||||
|
||||
qDebug("QInfo:%p check data block completed, uid:%"PRId64", tid:%d, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, "
|
||||
"lastKey:%" PRId64,
|
||||
pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
|
||||
|
@ -6383,10 +6349,25 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|||
taosTFree(pQuery);
|
||||
}
|
||||
|
||||
freeTableQueryInfo(&pQInfo->tableqinfoGroupInfo);
|
||||
// todo refactor, extract method to destroytableDataInfo
|
||||
if (pQInfo->tableqinfoGroupInfo.pGroupList != NULL) {
|
||||
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
|
||||
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||
SArray *p = GET_TABLEGROUP(pQInfo, i);
|
||||
|
||||
size_t num = taosArrayGetSize(p);
|
||||
for(int32_t j = 0; j < num; ++j) {
|
||||
STableQueryInfo* item = taosArrayGetP(p, j);
|
||||
destroyTableQueryInfo(item);
|
||||
}
|
||||
|
||||
taosArrayDestroy(p);
|
||||
}
|
||||
}
|
||||
|
||||
taosTFree(pQInfo->pBuf);
|
||||
|
||||
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);
|
||||
taosHashCleanup(pQInfo->tableqinfoGroupInfo.map);
|
||||
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
|
||||
taosArrayDestroy(pQInfo->arrTableIdInfo);
|
||||
|
||||
|
|
Loading…
Reference in New Issue