add tag filter

This commit is contained in:
yihaoDeng 2022-04-11 22:10:58 +08:00
parent 01af381036
commit 4c54e3b303
1 changed files with 582 additions and 565 deletions

View File

@ -13,18 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "vnodeInt.h"
#include "tdatablock.h"
#include "os.h" #include "os.h"
#include "talgo.h" #include "talgo.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "texception.h" #include "texception.h"
#include "vnodeInt.h"
#include "filter.h"
#include "scalar.h"
#include "taosdef.h" #include "taosdef.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "vnodeInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "vnodeInt.h"
#define EXTRA_BYTES 2 #define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
@ -149,6 +151,8 @@ typedef struct STableGroupSupporter {
SSchema* pTagSchema; SSchema* pTagSchema;
} STableGroupSupporter; } STableGroupSupporter;
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo);
static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList);
static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList); static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList);
static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
@ -157,7 +161,8 @@ static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle); static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbReadHandle* pTsdbReadHandle); static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbReadHandle* pTsdbReadHandle);
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2); static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
// static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef); // static int32_t doGetExternalRow(STsdbReadHandle* pTsdbReadHandle, int16_t type, void* pMemRef);
// static void* doFreeColumnInfoData(SArray* pColumnInfoData); // static void* doFreeColumnInfoData(SArray* pColumnInfoData);
@ -208,7 +213,9 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
int64_t rows = 0; int64_t rows = 0;
STsdbMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable; STsdbMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable;
if (pMemTable == NULL) { return rows; } if (pMemTable == NULL) {
return rows;
}
// STableData* pMem = NULL; // STableData* pMem = NULL;
// STableData* pIMem = NULL; // STableData* pIMem = NULL;
@ -264,7 +271,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
} }
taosArrayPush(pTableCheckInfo, &info); taosArrayPush(pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr); tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
info.lastKey, pTsdbReadHandle->idStr);
} }
} }
@ -427,7 +435,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
return NULL; return NULL;
} }
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) { tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId); STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return NULL; return NULL;
@ -445,8 +454,9 @@ tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo
return NULL; return NULL;
} }
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %"PRIzu" %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle,
taosArrayGetSize(groupList->pGroupList), pTsdbReadHandle->idStr); taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList),
pTsdbReadHandle->idStr);
return (tsdbReaderT)pTsdbReadHandle; return (tsdbReaderT)pTsdbReadHandle;
} }
@ -518,7 +528,8 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pC
// pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo); // pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable); pTsdbReadHandle->pTableCheckInfo = NULL; // createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta,
// &pTable);
if (pTsdbReadHandle->pTableCheckInfo == NULL) { if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupReadHandle(pTsdbReadHandle); // tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
@ -528,7 +539,8 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond *pC
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
} }
tsdbReaderT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
// no qualified table // no qualified table
@ -619,7 +631,8 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
return pNew; return pNew;
} }
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
if (pNew->numOfTables == 0) { if (pNew->numOfTables == 0) {
@ -688,7 +701,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr); pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey,
(*pMem)->nrows, pHandle->idStr);
if (ASCENDING_TRAVERSE(order)) { if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key); assert(pCheckInfo->lastKey <= key);
@ -708,7 +722,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s", "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr); pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey,
(*pIMem)->nrows, pHandle->idStr);
if (ASCENDING_TRAVERSE(order)) { if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key); assert(pCheckInfo->lastKey <= key);
@ -761,30 +776,20 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
TSKEY r2 = TD_ROW_KEY(rimem); TSKEY r2 = TD_ROW_KEY(rimem);
if (r1 == r2) { if (r1 == r2) {
#if 0
if (update == TD_ROW_DISCARD_UPDATE) { if (update == TD_ROW_DISCARD_UPDATE) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} } else if (update == TD_ROW_OVERWRITE_UPDATE) {
else if(update == TD_ROW_OVERWRITE_UPDATE) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
tSkipListIterNext(pCheckInfo->iiter); tSkipListIterNext(pCheckInfo->iiter);
} else { } else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
} }
#endif
if (TD_SUPPORT_UPDATE(update)) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
} else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
tSkipListIterNext(pCheckInfo->iter);
}
return r1; return r1;
} else if (r1 < r2 && ASCENDING_TRAVERSE(order)) { } else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
return r1; return r1;
} } else {
else {
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
return r2; return r2;
} }
@ -1030,9 +1035,11 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL; TSKEY s = TSKEY_INITIAL_VAL, e = TSKEY_INITIAL_VAL;
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey); assert(pCheckInfo->lastKey <= pTsdbReadHandle->window.ekey &&
pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
} else { } else {
assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey && pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey); assert(pCheckInfo->lastKey >= pTsdbReadHandle->window.ekey &&
pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
} }
s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey); s = TMIN(pCheckInfo->lastKey, pTsdbReadHandle->window.ekey);
@ -1093,7 +1100,8 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
return code; return code;
} }
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) { static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
int32_t slotIndex) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
STSchema* pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); STSchema* pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
@ -1120,7 +1128,8 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData; int16_t* colIds = pTsdbReadHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true); int32_t ret = tsdbLoadBlockDataCols(&(pTsdbReadHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
(int)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)), true);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
int32_t c = terrno; int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS); assert(c != TSDB_CODE_SUCCESS);
@ -1149,8 +1158,10 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
int64_t elapsedTime = (taosGetTimestampUs() - st); int64_t elapsedTime = (taosGetTimestampUs() - st);
pTsdbReadHandle->cost.blockLoadTime += elapsedTime; pTsdbReadHandle->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s", tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr); " us, %s",
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime,
pTsdbReadHandle->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
@ -1162,10 +1173,12 @@ _error:
} }
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo); static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end); static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
int32_t start, int32_t end);
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols); static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle); static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
SDataBlockInfo* pBlockInfo, int32_t endPos);
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) { static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
@ -1187,15 +1200,15 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
// do not load file block into buffer // do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
TSKEY maxKey = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? (binfo.window.skey - step):(binfo.window.ekey - step); TSKEY maxKey =
cur->rows = tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
cur->rows =
tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
pTsdbReadHandle->realNumOfRows = cur->rows; pTsdbReadHandle->realNumOfRows = cur->rows;
// update the last key value // update the last key value
@ -1209,7 +1222,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
return code; return code;
} }
// return error, add test cases // return error, add test cases
if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) { if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
return code; return code;
@ -1252,16 +1264,18 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s", tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr); pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
} else { } else {
tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s", tsdbDebug("%p create data block from remain file block, brange:%" PRId64 "-%" PRId64
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr); ", rows:%d, total:%d, lastKey:%" PRId64 ", %s",
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey,
pTsdbReadHandle->idStr);
} }
} }
return code; return code;
} }
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) { static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
bool* exists) {
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order); bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
@ -1299,7 +1313,8 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0]; SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) { if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order); cur->pos =
binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
} else { } else {
cur->pos = pBlock->numOfRows - 1; cur->pos = pBlock->numOfRows - 1;
} }
@ -1378,7 +1393,8 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
return midPos; return midPos;
} }
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
int32_t start, int32_t end) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
@ -1604,13 +1620,14 @@ static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows,
int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows; int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
} }
} }
} }
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted, static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
int32_t* start, int32_t* end) { int32_t numOfExisted, int32_t* start, int32_t* end) {
*start = -1; *start = -1;
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
@ -1635,7 +1652,8 @@ static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startP
} }
} }
static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows, int32_t endPos) { static void updateInfoAfterMerge(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, int32_t numOfRows,
int32_t endPos) {
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
pCheckInfo->lastKey = cur->lastKey; pCheckInfo->lastKey = cur->lastKey;
@ -1655,7 +1673,8 @@ static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
} }
SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0); SColumnInfoData* pColInfoData = taosArrayGet(pTsdbReadHandle->pColumns, 0);
assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] && cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows-1]); assert(cur->win.skey == ((TSKEY*)pColInfoData->pData)[0] &&
cur->win.ekey == ((TSKEY*)pColInfoData->pData)[cur->rows - 1]);
} else { } else {
cur->win = pTsdbReadHandle->window; cur->win = pTsdbReadHandle->window;
@ -1664,7 +1683,8 @@ static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle) {
} }
} }
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) { static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
SDataBlockInfo* pBlockInfo, int32_t endPos) {
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
@ -1700,7 +1720,8 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
doCheckGeneratedBlockRange(pTsdbReadHandle); doCheckGeneratedBlockRange(pTsdbReadHandle);
tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr); pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
pTsdbReadHandle->idStr);
} }
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) { int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
@ -1740,7 +1761,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->pos >= 0 && cur->pos < pBlock->numOfRows); cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData; TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast); assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse // for search the endPos, so the order needs to reverse
int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
@ -1751,10 +1773,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
STable* pTable = NULL; STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d," tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
" rows:%d, start:%d,"
"end:%d, %s", "end:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr); cur->pos, endPos, pTsdbReadHandle->idStr);
// compared with the data from in-memory buffer, to generate the correct timestamp array list // compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t numOfRows = 0; int32_t numOfRows = 0;
@ -1786,8 +1809,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
break; break;
} }
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) &&
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) &&
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
break; break;
} }
@ -1802,7 +1827,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
rv2 = TD_ROW_SVER(row2); rv2 = TD_ROW_SVER(row2);
} }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true); mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, true);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
@ -1828,7 +1854,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE; bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull); mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
@ -1912,7 +1939,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
doCheckGeneratedBlockRange(pTsdbReadHandle); doCheckGeneratedBlockRange(pTsdbReadHandle);
tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", tsdbDebug("%p uid:%" PRIu64 ", data block created, mixblock:%d, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr); pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows,
pTsdbReadHandle->idStr);
} }
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
@ -2219,7 +2247,8 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
} }
// todo return error code to query engine // todo return error code to query engine
if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) != TSDB_CODE_SUCCESS) { if ((code = createDataBlocksInfo(pTsdbReadHandle, numOfBlocks, &pTsdbReadHandle->numOfBlocks)) !=
TSDB_CODE_SUCCESS) {
break; break;
} }
@ -2304,7 +2333,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files // current file are not overlapped with query time window, ignore remain files
if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) { if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
(!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
@ -2479,9 +2509,10 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} }
TSKEY key = TD_ROW_KEY(row); TSKEY key = TD_ROW_KEY(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pTsdbReadHandle, key, pTsdbReadHandle->window.skey, (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
pTsdbReadHandle->window.ekey); tsdbDebug("%p key:%" PRIu64 " beyond qrange:%" PRId64 " - %" PRId64 ", no more data in buffer", pTsdbReadHandle,
key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey);
break; break;
} }
@ -2495,7 +2526,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
rv = TD_ROW_SVER(row); rv = TD_ROW_SVER(row);
} }
mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, NULL, true); mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
NULL, true);
if (++numOfRows >= maxRowsToRead) { if (++numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
@ -2512,13 +2544,14 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
} }
} }
int64_t elapsedTime = taosGetTimestampUs() - st; int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle, tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr); pTsdbReadHandle, elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr);
return numOfRows; return numOfRows;
} }
@ -2589,7 +2622,8 @@ static bool loadBlockOfActiveTable(STsdbReadHandle* pTsdbReadHandle) {
} }
// current result is empty // current result is empty
if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey && pTsdbReadHandle->cur.rows == 0) { if (pTsdbReadHandle->currentLoadExternalRows && pTsdbReadHandle->window.skey == pTsdbReadHandle->window.ekey &&
pTsdbReadHandle->cur.rows == 0) {
// STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable; // STsdbMemTable* pMemRef = pTsdbReadHandle->pMemTable;
// doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef); // doGetExternalRow(pTsdbReadHandle, TSDB_PREV_ROW, pMemRef);
@ -2626,7 +2660,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// if (ret != TSDB_CODE_SUCCESS) { // if (ret != TSDB_CODE_SUCCESS) {
// return false; // return false;
// } // }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true); mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId,
NULL, NULL, true);
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
// update the last key value // update the last key value
@ -2644,8 +2679,6 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
return false; return false;
} }
// static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) { // static bool loadCachedLast(STsdbReadHandle* pTsdbReadHandle) {
// // the last row is cached in buffer, return it directly. // // the last row is cached in buffer, return it directly.
// // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER // // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
@ -2666,8 +2699,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// int32_t numOfCols = pTable->maxColNum; // int32_t numOfCols = pTable->maxColNum;
// //
// if (pTable->lastCols == NULL || pTable->maxColNum <= 0) { // if (pTable->lastCols == NULL || pTable->maxColNum <= 0) {
// tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid, pTable->tableId); // tsdbWarn("no last cached for table %s, uid:%" PRIu64 ",tid:%d", pTable->name->data, pTable->uid,
// continue; // pTable->tableId); continue;
// } // }
// //
// int32_t i = 0, j = 0; // int32_t i = 0, j = 0;
@ -2835,7 +2868,8 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
} }
if (emptyQueryTimewindow(pTsdbReadHandle)) { if (emptyQueryTimewindow(pTsdbReadHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle,
pTsdbReadHandle->idStr);
return false; return false;
} }
@ -3104,7 +3138,6 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
return code; return code;
} }
STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) { STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN}; STimeWindow window = {INT64_MAX, INT64_MIN};
@ -3185,10 +3218,11 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
uid = pCheckInfo->tableId; uid = pCheckInfo->tableId;
} }
tsdbDebug("data block generated, uid:%"PRIu64" numOfRows:%d, tsrange:%"PRId64" - %"PRId64" %s", uid, cur->rows, cur->win.skey, tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
cur->win.ekey, pHandle->idStr); cur->win.skey, cur->win.ekey, pHandle->idStr);
// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign the table uid // pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign
// the table uid
pDataBlockInfo->rows = cur->rows; pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win; pDataBlockInfo->window = cur->win;
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
@ -3296,7 +3330,8 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
for (int32_t i = 0; i < reqNumOfCols; ++i) { for (int32_t i = 0; i < reqNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
} }
} }
@ -3449,7 +3484,8 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(pGroups, &g); taosArrayPush(pGroups, &g);
} }
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols,
TSKEY skey) {
assert(pTableList != NULL); assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
@ -3566,10 +3602,11 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
// return true; // return true;
//} //}
//static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param); // static void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp
// *param);
// static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) { // static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// // query according to the expression tree // // // query according to the expression tree
// SExprTraverseSupp supp = { // SExprTraverseSupp supp = {
// .nodeFilterFn = (__result_filter_fn_t)tableFilterFp, // .nodeFilterFn = (__result_filter_fn_t)tableFilterFp,
// .setupInfoFn = filterPrepare, // .setupInfoFn = filterPrepare,
@ -3592,7 +3629,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
} }
if (pTbCfg->type != META_SUPER_TABLE) { if (pTbCfg->type != META_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId); tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
reqId);
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
goto _error; goto _error;
} }
@ -3611,63 +3649,41 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res); pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%" PRIx64
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); " QID:0x%" PRIx64,
pMeta, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
taosArrayDestroy(res); taosArrayDestroy(res);
return ret; return ret;
} }
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
// tExprNode* expr = NULL;
// void* filterInfo = taosMemoryCalloc(1, sizeof(SFilterInfo));
// TRY(TSDB_MAX_TAG_CONDITIONS) { ret = filterInitFromNode((SNode*)pTagCond, *filterInfo, 0);
// expr = exprTreeFromTableName(tbnameCond); if (ret != TSDB_CODE_SUCCESS) {
// if (expr == NULL) { terrno = ret;
// expr = exprTreeFromBinary(pTagCond, len); return ret;
// } else { }
// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL); ret = tsdbQueryTableList(pMeta, res, filterInfo);
// tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len); pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
// if (tagExpr != NULL) { pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
// CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
// tExprNode* tbnameExpr = expr; tsdbDebug("%p stable tid:%d, uid:%" PRIu64 " query, numOfTables:%u, belong to %" PRIzu " groups", tsdb,
// expr = taosMemoryCalloc(1, sizeof(tExprNode)); pTable->tableId, pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
// if (expr == NULL) {
// THROW( TSDB_CODE_TDB_OUT_OF_MEMORY ); taosArrayDestroy(res);
// } return ret;
// expr->nodeType = TSQL_NODE_EXPR;
// expr->_node.optr = (uint8_t)tagNameRelType;
// expr->_node.pLeft = tagExpr;
// expr->_node.pRight = tbnameExpr;
// }
// }
// CLEANUP_EXECUTE();
//
// } CATCH( code ) {
// CLEANUP_EXECUTE();
// terrno = code;
// tsdbUnlockRepoMeta(tsdb); // unlock tsdb in any cases
//
// goto _error;
// // TODO: more error handling
// } END_TRY
//
// doQueryTableList(pTable, res, expr);
// pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
// pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
//
// tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%u, belong to %" PRIzu " groups", tsdb, pTable->tableId,
// pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
//
// taosArrayDestroy(res);
//
// if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
// return ret;
_error: _error:
return terrno; return terrno;
} }
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
// impl later
return TSDB_CODE_SUCCESS;
}
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid); STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (pTbCfg == NULL) { if (pTbCfg == NULL) {
@ -3765,7 +3781,6 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
return NULL; return NULL;
} }
void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
@ -3798,8 +3813,10 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
SIOCostSummary* pCost = &pTsdbReadHandle->cost; SIOCostSummary* pCost = &pTsdbReadHandle->cost;
tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s", tsdbDebug("%p :io-cost summary: head-file read cnt:%" PRIu64 ", head-file time:%" PRIu64 " us, statis-info:%" PRId64
pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr); " us, datablock:%" PRId64 " us, check data:%" PRId64 " us, %s",
pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime,
pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr);
taosMemoryFreeClear(pTsdbReadHandle); taosMemoryFreeClear(pTsdbReadHandle);
} }
@ -4053,37 +4070,37 @@ static void queryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, S
} }
// Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list // Apply the filter expression to each node in the skiplist to acquire the qualified nodes in skip list
void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) { //void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param) {
if (pExpr == NULL) { // if (pExpr == NULL) {
return; // return;
} // }
//
tExprNode *pLeft = pExpr->_node.pLeft; // tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight; // tExprNode *pRight = pExpr->_node.pRight;
//
// column project // // column project
if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) { // if (pLeft->nodeType != TSQL_NODE_EXPR && pRight->nodeType != TSQL_NODE_EXPR) {
assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY)); // assert(pLeft->nodeType == TSQL_NODE_COL && (pRight->nodeType == TSQL_NODE_VALUE || pRight->nodeType == TSQL_NODE_DUMMY));
//
param->setupInfoFn(pExpr, param->pExtInfo); // param->setupInfoFn(pExpr, param->pExtInfo);
//
tQueryInfo *pQueryInfo = pExpr->_node.info; // tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE // if (pQueryInfo->indexed && (pQueryInfo->optr != TSDB_RELATION_LIKE
&& pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH // && pQueryInfo->optr != TSDB_RELATION_MATCH && pQueryInfo->optr != TSDB_RELATION_NMATCH
&& pQueryInfo->optr != TSDB_RELATION_IN)) { // && pQueryInfo->optr != TSDB_RELATION_IN)) {
queryIndexedColumn(pSkipList, pQueryInfo, result); // queryIndexedColumn(pSkipList, pQueryInfo, result);
} else { // } else {
queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn); // queryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
} // }
//
return; // return;
} // }
//
// The value of hasPK is always 0. // // The value of hasPK is always 0.
uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK; // uint8_t weight = pLeft->_node.hasPK + pRight->_node.hasPK;
assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0); // assert(weight == 0 && pSkipList != NULL && taosArrayGetSize(result) == 0);
//
//apply the hierarchical filter expression to every node in skiplist to find the qualified nodes // //apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
applyFilterToSkipListNode(pSkipList, pExpr, result, param); // applyFilterToSkipListNode(pSkipList, pExpr, result, param);
} //}
#endif #endif