fix(query): handle the optimized order by in tablescan operator.
This commit is contained in:
parent
56f3c9bf61
commit
9b40ec72d6
|
@ -99,6 +99,15 @@ typedef struct SColumnInfoData {
|
||||||
};
|
};
|
||||||
} SColumnInfoData;
|
} SColumnInfoData;
|
||||||
|
|
||||||
|
typedef struct SQueryTableDataCond {
|
||||||
|
STimeWindow twindow;
|
||||||
|
int32_t order; // desc|asc order to iterate the data block
|
||||||
|
int32_t numOfCols;
|
||||||
|
SColumnInfo *colList;
|
||||||
|
bool loadExternalRows; // load external rows or not
|
||||||
|
int32_t type; // data block load type:
|
||||||
|
} SQueryTableDataCond;
|
||||||
|
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
||||||
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
||||||
|
@ -229,7 +238,6 @@ typedef struct SResSchame {
|
||||||
char name[TSDB_COL_NAME_LEN];
|
char name[TSDB_COL_NAME_LEN];
|
||||||
} SResSchema;
|
} SResSchema;
|
||||||
|
|
||||||
// TODO move away to executor.h
|
|
||||||
typedef struct SExprBasicInfo {
|
typedef struct SExprBasicInfo {
|
||||||
SResSchema resSchema;
|
SResSchema resSchema;
|
||||||
int16_t numOfParams; // argument value of each function
|
int16_t numOfParams; // argument value of each function
|
||||||
|
|
|
@ -202,7 +202,7 @@ typedef struct SqlFunctionCtx {
|
||||||
SPoint1 end;
|
SPoint1 end;
|
||||||
SFuncExecFuncs fpSet;
|
SFuncExecFuncs fpSet;
|
||||||
SScalarFuncExecFuncs sfp;
|
SScalarFuncExecFuncs sfp;
|
||||||
SExprInfo *pExpr;
|
struct SExprInfo *pExpr;
|
||||||
struct SDiskbasedBuf *pBuf;
|
struct SDiskbasedBuf *pBuf;
|
||||||
struct SSDataBlock *pSrcBlock;
|
struct SSDataBlock *pSrcBlock;
|
||||||
int32_t curBufPage;
|
int32_t curBufPage;
|
||||||
|
|
|
@ -77,16 +77,15 @@ char *metaTbCursorNext(SMTbCursor *pTbCur);
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
typedef struct STsdb STsdb;
|
typedef struct STsdb STsdb;
|
||||||
typedef struct STsdbQueryCond STsdbQueryCond;
|
|
||||||
typedef void *tsdbReaderT;
|
typedef void *tsdbReaderT;
|
||||||
|
|
||||||
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
||||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||||
|
|
||||||
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
|
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
|
||||||
void *pMemRef);
|
void *pMemRef);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
||||||
|
@ -98,6 +97,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
||||||
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
||||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis);
|
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis);
|
||||||
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
|
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
|
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond);
|
||||||
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
||||||
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
||||||
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
|
||||||
|
@ -157,15 +157,6 @@ struct SVnodeCfg {
|
||||||
int8_t hashMethod;
|
int8_t hashMethod;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STsdbQueryCond {
|
|
||||||
STimeWindow twindow;
|
|
||||||
int32_t order; // desc|asc order to iterate the data block
|
|
||||||
int32_t numOfCols;
|
|
||||||
SColumnInfo *colList;
|
|
||||||
bool loadExternalRows; // load external rows or not
|
|
||||||
int32_t type; // data block load type:
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
|
|
|
@ -254,7 +254,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
||||||
|
|
||||||
assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
|
assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
|
||||||
} else {
|
} else {
|
||||||
assert(info.lastKey >= pTsdbReadHandle->window.ekey && info.lastKey <= pTsdbReadHandle->window.skey);
|
info.lastKey = pTsdbReadHandle->window.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pTableCheckInfo, &info);
|
taosArrayPush(pTableCheckInfo, &info);
|
||||||
|
@ -317,7 +317,7 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
|
||||||
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
|
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) {
|
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) {
|
||||||
pTsdbReadHandle->window = pCond->twindow;
|
pTsdbReadHandle->window = pCond->twindow;
|
||||||
|
|
||||||
bool updateTs = false;
|
bool updateTs = false;
|
||||||
|
@ -343,7 +343,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) {
|
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
|
||||||
STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
|
STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
|
||||||
if (pReadHandle == NULL) {
|
if (pReadHandle == NULL) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -422,7 +422,7 @@ _end:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
|
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
|
@ -448,7 +448,7 @@ tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo
|
||||||
return (tsdbReaderT)pTsdbReadHandle;
|
return (tsdbReaderT)pTsdbReadHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
|
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
|
||||||
STsdbReadHandle* pTsdbReadHandle = queryHandle;
|
STsdbReadHandle* pTsdbReadHandle = queryHandle;
|
||||||
|
|
||||||
if (emptyQueryTimewindow(pTsdbReadHandle)) {
|
if (emptyQueryTimewindow(pTsdbReadHandle)) {
|
||||||
|
@ -460,9 +460,9 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTsdbReadHandle->order = pCond->order;
|
pTsdbReadHandle->order = pCond->order;
|
||||||
pTsdbReadHandle->window = pCond->twindow;
|
pTsdbReadHandle->window = pCond->twindow;
|
||||||
pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
|
pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||||
pTsdbReadHandle->cur.fid = -1;
|
pTsdbReadHandle->cur.fid = -1;
|
||||||
pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
|
pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
|
||||||
pTsdbReadHandle->checkFiles = true;
|
pTsdbReadHandle->checkFiles = true;
|
||||||
|
@ -485,7 +485,7 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
|
||||||
resetCheckInfo(pTsdbReadHandle);
|
resetCheckInfo(pTsdbReadHandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pCond, STableGroupInfo* groupList) {
|
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableGroupInfo* groupList) {
|
||||||
STsdbReadHandle* pTsdbReadHandle = queryHandle;
|
STsdbReadHandle* pTsdbReadHandle = queryHandle;
|
||||||
|
|
||||||
pTsdbReadHandle->order = pCond->order;
|
pTsdbReadHandle->order = pCond->order;
|
||||||
|
@ -526,7 +526,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pC
|
||||||
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
|
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
pCond->twindow = updateLastrowForEachGroup(groupList);
|
pCond->twindow = updateLastrowForEachGroup(groupList);
|
||||||
|
|
||||||
|
@ -555,7 +555,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
|
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
|
||||||
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
|
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -618,7 +618,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
|
||||||
return pNew;
|
return pNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||||
uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
|
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
|
||||||
|
|
||||||
|
@ -1185,10 +1185,11 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
|
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
|
||||||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
|
||||||
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
|
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
|
||||||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
|
|
||||||
|
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
|
||||||
// do not load file block into buffer
|
// do not load file block into buffer
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
||||||
|
|
||||||
|
@ -1225,8 +1226,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
|
assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
|
||||||
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
|
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
|
||||||
|
|
||||||
if ((cur->pos == 0 && endPos == binfo.rows - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) || (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) {
|
||||||
(cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) {
|
|
||||||
pTsdbReadHandle->realNumOfRows = binfo.rows;
|
pTsdbReadHandle->realNumOfRows = binfo.rows;
|
||||||
|
|
||||||
cur->rows = binfo.rows;
|
cur->rows = binfo.rows;
|
||||||
|
@ -1234,7 +1234,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
cur->mixBlock = false;
|
cur->mixBlock = false;
|
||||||
cur->blockCompleted = true;
|
cur->blockCompleted = true;
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
|
if (ascScan) {
|
||||||
cur->lastKey = binfo.window.ekey + 1;
|
cur->lastKey = binfo.window.ekey + 1;
|
||||||
cur->pos = binfo.rows;
|
cur->pos = binfo.rows;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1382,8 +1382,6 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
|
|
||||||
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
|
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
|
||||||
int32_t start, int32_t end) {
|
int32_t start, int32_t end) {
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
|
||||||
|
|
||||||
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
|
||||||
TSKEY* tsArray = pCols->cols[0].pData;
|
TSKEY* tsArray = pCols->cols[0].pData;
|
||||||
|
|
||||||
|
@ -1394,6 +1392,11 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
|
||||||
|
int32_t trueStart = ascScan ? start : end;
|
||||||
|
int32_t trueEnd = ascScan ? end : start;
|
||||||
|
int32_t step = ascScan ? 1 : -1;
|
||||||
|
|
||||||
int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
|
int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
|
||||||
|
|
||||||
// data in buffer has greater timestamp, copy data in file block
|
// data in buffer has greater timestamp, copy data in file block
|
||||||
|
@ -1411,7 +1414,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
||||||
if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
|
if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
|
||||||
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
||||||
int32_t rowIndex = numOfRows;
|
int32_t rowIndex = numOfRows;
|
||||||
for (int32_t k = start; k <= end; ++k, ++rowIndex) {
|
for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
|
||||||
SCellVal sVal = {0};
|
SCellVal sVal = {0};
|
||||||
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
|
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
|
||||||
TASSERT(0);
|
TASSERT(0);
|
||||||
|
@ -1427,7 +1430,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
||||||
int32_t rowIndex = numOfRows;
|
int32_t rowIndex = numOfRows;
|
||||||
|
|
||||||
// todo refactor, only copy one-by-one
|
// todo refactor, only copy one-by-one
|
||||||
for (int32_t k = start; k < num + start; ++k, ++rowIndex) {
|
for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
|
||||||
SCellVal sVal = {0};
|
SCellVal sVal = {0};
|
||||||
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
|
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
|
||||||
TASSERT(0);
|
TASSERT(0);
|
||||||
|
@ -1444,26 +1447,19 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
||||||
j++;
|
j++;
|
||||||
i++;
|
i++;
|
||||||
} else { // pColInfo->info.colId < src->colId, it is a NULL data
|
} else { // pColInfo->info.colId < src->colId, it is a NULL data
|
||||||
int32_t rowIndex = numOfRows;
|
colDataAppendNNULL(pColInfo, numOfRows, num);
|
||||||
for (int32_t k = start; k < num + start; ++k, ++rowIndex) { // TODO opt performance
|
|
||||||
colDataAppend(pColInfo, rowIndex, NULL, true);
|
|
||||||
}
|
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while (i < requiredNumOfCols) { // the remain columns are all null data
|
while (i < requiredNumOfCols) { // the remain columns are all null data
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
|
||||||
int32_t rowIndex = numOfRows;
|
colDataAppendNNULL(pColInfo, numOfRows, num);
|
||||||
|
|
||||||
for (int32_t k = start; k < num + start; ++k, ++rowIndex) {
|
|
||||||
colDataAppend(pColInfo, rowIndex, NULL, true); // TODO add a fast version to set a number of consecutive NULL value.
|
|
||||||
}
|
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTsdbReadHandle->cur.win.ekey = tsArray[end];
|
pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
|
||||||
pTsdbReadHandle->cur.lastKey = tsArray[end] + step;
|
pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
|
||||||
|
|
||||||
return numOfRows + num;
|
return numOfRows + num;
|
||||||
}
|
}
|
||||||
|
@ -2966,7 +2962,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// // load the previous row
|
// // load the previous row
|
||||||
// STsdbQueryCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
|
// SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
|
||||||
// if (type == TSDB_PREV_ROW) {
|
// if (type == TSDB_PREV_ROW) {
|
||||||
// cond.order = TSDB_ORDER_DESC;
|
// cond.order = TSDB_ORDER_DESC;
|
||||||
// cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
|
// cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
|
||||||
|
@ -3330,21 +3326,7 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor
|
|
||||||
int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
|
int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
|
||||||
|
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
|
||||||
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
|
|
||||||
int32_t emptySize = pHandle->outputCapacity - numOfRows;
|
|
||||||
int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < reqNumOfCols; ++i) {
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
|
||||||
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
|
|
||||||
numOfRows * pColInfo->info.bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -319,27 +319,32 @@ typedef struct SColMatchInfo {
|
||||||
bool output;
|
bool output;
|
||||||
} SColMatchInfo;
|
} SColMatchInfo;
|
||||||
|
|
||||||
|
typedef struct SScanInfo {
|
||||||
|
int32_t numOfAsc;
|
||||||
|
int32_t numOfDesc;
|
||||||
|
} SScanInfo;
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
void* dataReader;
|
void* dataReader;
|
||||||
|
|
||||||
int32_t numOfBlocks; // extract basic running information.
|
int32_t numOfBlocks; // extract basic running information.
|
||||||
int32_t numOfSkipped;
|
int32_t numOfSkipped;
|
||||||
int32_t numOfBlockStatis;
|
int32_t numOfBlockStatis;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
int32_t order; // scan order
|
int64_t elapsedTime;
|
||||||
int32_t times; // repeat counts
|
int32_t prevGroupId; // previous table group id
|
||||||
|
SScanInfo scanInfo;
|
||||||
int32_t current;
|
int32_t current;
|
||||||
int32_t reverseTimes; // 0 by default
|
SNode* pFilterNode; // filter operator info
|
||||||
SNode* pFilterNode; // filter operator info
|
SqlFunctionCtx* pCtx; // next operator query context
|
||||||
SqlFunctionCtx* pCtx; // next operator query context
|
|
||||||
SResultRowInfo* pResultRowInfo;
|
SResultRowInfo* pResultRowInfo;
|
||||||
int32_t* rowCellInfoOffset;
|
int32_t* rowCellInfoOffset;
|
||||||
SExprInfo* pExpr;
|
SExprInfo* pExpr;
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
SArray* pColMatchInfo;
|
SArray* pColMatchInfo;
|
||||||
int32_t numOfOutput;
|
int32_t numOfOutput;
|
||||||
int64_t elapsedTime;
|
|
||||||
int32_t prevGroupId; // previous table group id
|
|
||||||
|
|
||||||
|
SQueryTableDataCond cond;
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
double sampleRatio; // data block sample ratio, 1 by default
|
double sampleRatio; // data block sample ratio, 1 by default
|
||||||
|
@ -627,9 +632,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pReaderHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
|
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
|
||||||
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition,
|
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo);
|
||||||
SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo);
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,7 @@ static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutpu
|
||||||
static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo);
|
static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo);
|
||||||
static void releaseQueryBuf(size_t numOfTables);
|
static void releaseQueryBuf(size_t numOfTables);
|
||||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
// static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
|
// static SQueryTableDataCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
|
||||||
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
|
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
|
||||||
|
|
||||||
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
|
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
|
||||||
|
@ -3587,8 +3587,8 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
// STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
|
// SQueryTableDataCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
|
||||||
// STsdbQueryCond cond = {
|
// SQueryTableDataCond cond = {
|
||||||
// .colList = pQueryAttr->tableCols,
|
// .colList = pQueryAttr->tableCols,
|
||||||
// .order = pQueryAttr->order.order,
|
// .order = pQueryAttr->order.order,
|
||||||
// .numOfCols = pQueryAttr->numOfCols,
|
// .numOfCols = pQueryAttr->numOfCols,
|
||||||
|
@ -4677,7 +4677,7 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; }
|
//static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; }
|
||||||
|
|
||||||
// this is a blocking operator
|
// this is a blocking operator
|
||||||
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
|
@ -5657,8 +5657,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
|
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
||||||
const STableGroupInfo* pTableGroupInfo) {
|
|
||||||
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -6315,6 +6314,19 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
|
static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
|
||||||
static SArray* createIndexMap(SNodeList* pNodeList);
|
static SArray* createIndexMap(SNodeList* pNodeList);
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
|
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
|
|
||||||
|
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
||||||
|
SInterval interval = {
|
||||||
|
.interval = pTableScanNode->interval,
|
||||||
|
.sliding = pTableScanNode->sliding,
|
||||||
|
.intervalUnit = pTableScanNode->intervalUnit,
|
||||||
|
.slidingUnit = pTableScanNode->slidingUnit,
|
||||||
|
.offset = pTableScanNode->offset,
|
||||||
|
};
|
||||||
|
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||||
|
@ -6325,7 +6337,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -6335,16 +6347,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
SInterval interval = {
|
SQueryTableDataCond cond = {0};
|
||||||
.interval = pTableScanNode->interval,
|
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||||
.sliding = pTableScanNode->sliding,
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
.intervalUnit = pTableScanNode->intervalUnit,
|
return NULL;
|
||||||
.slidingUnit = pTableScanNode->slidingUnit,
|
}
|
||||||
.offset = pTableScanNode->offset,
|
|
||||||
};
|
|
||||||
|
|
||||||
return createTableScanOperatorInfo(pDataReader, pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC,
|
SInterval interval = extractIntervalInfo(pTableScanNode);
|
||||||
numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList,
|
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList,
|
||||||
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
|
@ -6365,10 +6375,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
||||||
|
|
||||||
struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
||||||
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
|
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
||||||
pHandle->meta, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
|
pHandle->meta, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
|
||||||
|
@ -6489,38 +6499,47 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOptr;
|
return pOptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo,
|
|
||||||
void* readHandle, uint64_t queryId, uint64_t taskId) {
|
|
||||||
STsdbQueryCond cond = {.loadExternalRows = false};
|
|
||||||
|
|
||||||
cond.order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
|
||||||
cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
pCond->loadExternalRows = false;
|
||||||
cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
|
|
||||||
if (cond.colList == NULL) {
|
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
|
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
||||||
|
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
|
||||||
|
if (pCond->colList == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
cond.twindow = pTableScanNode->scanRange;
|
pCond->twindow = pTableScanNode->scanRange;
|
||||||
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
|
||||||
// cond.type = pTableScanNode->scanFlag;
|
#if 1
|
||||||
|
//todo work around a problem, remove it later
|
||||||
|
if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) ||
|
||||||
|
(pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) {
|
||||||
|
TSWAP(pCond->twindow.skey, pCond->twindow.ekey, int64_t);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||||
|
// pCond->type = pTableScanNode->scanFlag;
|
||||||
|
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
for (int32_t i = 0; i < cond.numOfCols; ++i) {
|
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
|
STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
|
||||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||||
if (pColNode->colType == COLUMN_TYPE_TAG) {
|
if (pColNode->colType == COLUMN_TYPE_TAG) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
cond.colList[j].type = pColNode->node.resType.type;
|
pCond->colList[j].type = pColNode->node.resType.type;
|
||||||
cond.colList[j].bytes = pColNode->node.resType.bytes;
|
pCond->colList[j].bytes = pColNode->node.resType.bytes;
|
||||||
cond.colList[j].colId = pColNode->colId;
|
pCond->colList[j].colId = pColNode->colId;
|
||||||
j += 1;
|
j += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
cond.numOfCols = j;
|
pCond->numOfCols = j;
|
||||||
return tsdbQueryTables(readHandle, &cond, pGroupInfo, queryId, taskId);
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* extractScanColumnId(SNodeList* pNodeList) {
|
SArray* extractScanColumnId(SNodeList* pNodeList) {
|
||||||
|
@ -6738,7 +6757,13 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
return createDataReaderImpl(pTableScanNode, pTableGroupInfo, pHandle->reader, queryId, taskId);
|
SQueryTableDataCond cond = {0};
|
||||||
|
code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
|
@ -255,17 +255,15 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
// reverse order time range
|
|
||||||
SET_REVERSE_SCAN_FLAG(pTableScanInfo);
|
SET_REVERSE_SCAN_FLAG(pTableScanInfo);
|
||||||
|
|
||||||
switchCtxOrder(pCtx, numOfOutput);
|
switchCtxOrder(pCtx, numOfOutput);
|
||||||
SWITCH_ORDER(pTableScanInfo->order);
|
// setupQueryRangeForReverseScan(pTableScanInfo);
|
||||||
setupQueryRangeForReverseScan(pTableScanInfo);
|
|
||||||
|
|
||||||
pTableScanInfo->times = 1;
|
STimeWindow* pTWindow = &pTableScanInfo->cond.twindow;
|
||||||
pTableScanInfo->current = 0;
|
TSWAP(pTWindow->skey, pTWindow->ekey, int64_t);
|
||||||
pTableScanInfo->reverseTimes = 0;
|
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
@ -311,63 +309,68 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
|
// SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
|
||||||
*newgroup = false;
|
*newgroup = false;
|
||||||
|
|
||||||
while (pTableScanInfo->current < pTableScanInfo->times) {
|
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
|
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (++pTableScanInfo->current >= pTableScanInfo->times) {
|
pTableScanInfo->current += 1;
|
||||||
if (pTableScanInfo->reverseTimes <= 0 /* || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)*/) {
|
|
||||||
return NULL;
|
if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
} else {
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
break;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
|
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
|
||||||
|
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
|
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
|
|
||||||
|
// do prepare for the next round table scan operation
|
||||||
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
|
||||||
|
if (pTableScanInfo->current < total) {
|
||||||
|
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
|
||||||
|
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
||||||
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
}
|
||||||
|
|
||||||
|
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
|
||||||
|
qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
|
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
|
|
||||||
|
while (pTableScanInfo->current < total) {
|
||||||
|
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
|
||||||
|
if (p != NULL) {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTableScanInfo->current += 1;
|
||||||
|
|
||||||
|
if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
|
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||||
|
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
||||||
|
|
||||||
|
// do prepare for the next round table scan operation
|
||||||
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
|
||||||
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
|
|
||||||
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
|
|
||||||
|
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
|
||||||
|
|
||||||
// if (pResultRowInfo->size > 0) {
|
|
||||||
// pResultRowInfo->curPos = 0;
|
|
||||||
// }
|
|
||||||
|
|
||||||
qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
|
||||||
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* p = NULL;
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
// todo refactor
|
return NULL;
|
||||||
if (pTableScanInfo->reverseTimes > 0) {
|
|
||||||
setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
|
||||||
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
|
|
||||||
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
|
|
||||||
|
|
||||||
qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
|
||||||
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
|
||||||
|
|
||||||
if (pResultRowInfo->size > 0) {
|
|
||||||
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
p = doTableScanImpl(pOperator, newgroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
return p;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
|
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
|
||||||
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
|
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
|
||||||
SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
|
|
||||||
assert(repeatTime > 0);
|
|
||||||
|
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -378,18 +381,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->cond = *pCond;
|
||||||
|
pInfo->scanInfo = (SScanInfo) {.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
|
||||||
|
|
||||||
pInfo->interval = *pInterval;
|
pInfo->interval = *pInterval;
|
||||||
pInfo->sampleRatio = sampleRatio;
|
pInfo->sampleRatio = sampleRatio;
|
||||||
pInfo->dataBlockLoadFlag= dataLoadFlag;
|
pInfo->dataBlockLoadFlag= dataLoadFlag;
|
||||||
pInfo->pResBlock = pResBlock;
|
pInfo->pResBlock = pResBlock;
|
||||||
pInfo->pFilterNode = pCondition;
|
pInfo->pFilterNode = pCondition;
|
||||||
pInfo->dataReader = pDataReader;
|
pInfo->dataReader = pDataReader;
|
||||||
pInfo->times = repeatTime;
|
|
||||||
pInfo->reverseTimes = reverseTime;
|
|
||||||
pInfo->order = order;
|
|
||||||
pInfo->current = 0;
|
pInfo->current = 0;
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
pInfo->pColMatchInfo = pColMatchInfo;
|
pInfo->pColMatchInfo = pColMatchInfo;
|
||||||
|
|
||||||
pOperator->name = "TableScanOperator";
|
pOperator->name = "TableScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blockingOptr = false;
|
||||||
|
@ -410,19 +414,17 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
|
|
||||||
pInfo->dataReader = pTsdbReadHandle;
|
pInfo->dataReader = pTsdbReadHandle;
|
||||||
pInfo->times = 1;
|
pInfo->current = 0;
|
||||||
pInfo->reverseTimes = 0;
|
|
||||||
pInfo->current = 0;
|
|
||||||
pInfo->prevGroupId = -1;
|
pInfo->prevGroupId = -1;
|
||||||
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
pOperator->name = "TableSeqScanOperator";
|
pOperator->name = "TableSeqScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN;
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blockingOptr = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->getNextFn = doTableScanImpl;
|
pOperator->getNextFn = doTableScanImpl;
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue