[td-1067] opt last_row queries
This commit is contained in:
parent
61622a73b1
commit
49f23d689d
|
@ -4623,7 +4623,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isFirstLastRowQuery(pQuery)) {
|
if (isFirstLastRowQuery(pQuery)) {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
assert(0); // last_row query switch to other routine to handle
|
||||||
|
// pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
} else {
|
} else {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
}
|
}
|
||||||
|
@ -4984,7 +4985,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||||
el = scanMultiTableDataBlocks(pQInfo);
|
el = scanMultiTableDataBlocks(pQInfo);
|
||||||
qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el);
|
qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el);
|
||||||
|
|
||||||
// doCloseAllTimeWindowAfterScan(pQInfo);
|
|
||||||
doRestoreContext(pQInfo);
|
doRestoreContext(pQInfo);
|
||||||
} else {
|
} else {
|
||||||
qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo);
|
qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo);
|
||||||
|
@ -5262,15 +5262,13 @@ static void stableQueryImpl(SQInfo *pQInfo) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
|
||||||
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol &&
|
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol)) {
|
||||||
!isFirstLastRowQuery(pQuery))) {
|
|
||||||
multiTableQueryProcess(pQInfo);
|
multiTableQueryProcess(pQInfo);
|
||||||
} else {
|
} else {
|
||||||
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
|
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
|
||||||
isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
|
isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
|
||||||
|
|
||||||
sequentialTableProcess(pQInfo);
|
sequentialTableProcess(pQInfo);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// record the total elapsed time
|
// record the total elapsed time
|
||||||
|
|
|
@ -28,6 +28,14 @@
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
|
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
|
||||||
|
|
||||||
|
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
|
||||||
|
((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
|
||||||
|
.numOfCols = (_block)->numOfCols, \
|
||||||
|
.rows = (_block)->numOfRows, \
|
||||||
|
.tid = (_checkInfo)->tableId.tid, \
|
||||||
|
.uid = (_checkInfo)->tableId.uid})
|
||||||
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TSDB_QUERY_TYPE_ALL = 1,
|
TSDB_QUERY_TYPE_ALL = 1,
|
||||||
TSDB_QUERY_TYPE_LAST = 2,
|
TSDB_QUERY_TYPE_LAST = 2,
|
||||||
|
@ -119,7 +127,14 @@ typedef struct STsdbQueryHandle {
|
||||||
SIOCostSummary cost;
|
SIOCostSummary cost;
|
||||||
} STsdbQueryHandle;
|
} STsdbQueryHandle;
|
||||||
|
|
||||||
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
typedef struct STableGroupSupporter {
|
||||||
|
int32_t numOfCols;
|
||||||
|
SColIndex* pCols;
|
||||||
|
STSchema* pTagSchema;
|
||||||
|
} STableGroupSupporter;
|
||||||
|
|
||||||
|
static STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList);
|
||||||
|
|
||||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
||||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
|
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
|
||||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
|
@ -283,12 +298,10 @@ out_of_memory:
|
||||||
}
|
}
|
||||||
|
|
||||||
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
|
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
|
||||||
|
pCond->order = TSDB_ORDER_DESC;
|
||||||
|
pCond->twindow = changeTableGroupByLastrow(groupList);
|
||||||
|
|
||||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
|
||||||
if (pQueryHandle != NULL) {
|
|
||||||
pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
|
|
||||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
|
||||||
changeQueryHandleForLastrowQuery(pQueryHandle);
|
|
||||||
}
|
|
||||||
return pQueryHandle;
|
return pQueryHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -672,14 +685,6 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
|
|
||||||
((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
|
|
||||||
.numOfCols = (_block)->numOfCols, \
|
|
||||||
.rows = (_block)->numOfRows, \
|
|
||||||
.tid = (_checkInfo)->tableId.tid, \
|
|
||||||
.uid = (_checkInfo)->tableId.uid})
|
|
||||||
|
|
||||||
|
|
||||||
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
|
static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
|
||||||
STsdbRepo *pRepo = pQueryHandle->pTsdb;
|
STsdbRepo *pRepo = pQueryHandle->pTsdb;
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -1660,6 +1665,128 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
|
||||||
|
// filter the queried time stamp in the first place
|
||||||
|
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
||||||
|
pQueryHandle->order = TSDB_ORDER_DESC;
|
||||||
|
|
||||||
|
assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
|
||||||
|
|
||||||
|
// starts from the buffer in case of descending timestamp order check data blocks
|
||||||
|
// todo consider the query time window, current last_row does not apply the query time window
|
||||||
|
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
while(i < numOfTables) {
|
||||||
|
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||||
|
if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
|
||||||
|
pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// there are no data in all the tables
|
||||||
|
if (i == numOfTables) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||||
|
taosArrayClear(pQueryHandle->pTableCheckInfo);
|
||||||
|
|
||||||
|
info.lastKey = pQueryHandle->window.skey;
|
||||||
|
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
||||||
|
|
||||||
|
// update the query time window according to the chosen last timestamp
|
||||||
|
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||||
|
STsdbQueryHandle* pQueryHandle) {
|
||||||
|
int numOfRows = 0;
|
||||||
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
|
win->skey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
STable* pTable = pCheckInfo->pTableObj;
|
||||||
|
|
||||||
|
do {
|
||||||
|
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
||||||
|
if (row == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSKEY key = dataRowKey(row);
|
||||||
|
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||||
|
tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
|
||||||
|
pQueryHandle->window.ekey);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (win->skey == INT64_MIN) {
|
||||||
|
win->skey = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
win->ekey = key;
|
||||||
|
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
|
||||||
|
|
||||||
|
if (++numOfRows >= maxRowsToRead) {
|
||||||
|
moveToNextRowInMem(pCheckInfo);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
} while(moveToNextRowInMem(pCheckInfo));
|
||||||
|
|
||||||
|
assert(numOfRows <= maxRowsToRead);
|
||||||
|
|
||||||
|
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||||
|
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
|
||||||
|
int32_t emptySize = maxRowsToRead - numOfRows;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t elapsedTime = taosGetTimestampUs() - st;
|
||||||
|
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %p", pQueryHandle,
|
||||||
|
elapsedTime, numOfRows, numOfCols, pQueryHandle->qinfo);
|
||||||
|
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
|
||||||
|
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
|
||||||
|
while (tSkipListIterNext(iter)) {
|
||||||
|
SSkipListNode* pNode = tSkipListIterGet(iter);
|
||||||
|
|
||||||
|
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
|
||||||
|
|
||||||
|
STableKeyInfo info = {.pTable = *pTable, .lastKey = TSKEY_INITIAL_VAL};
|
||||||
|
taosArrayPush(list, &info);
|
||||||
|
}
|
||||||
|
|
||||||
|
tSkipListDestroyIter(iter);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void destroyHelper(void* param) {
|
||||||
|
if (param == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
tQueryInfo* pInfo = (tQueryInfo*)param;
|
||||||
|
if (pInfo->optr != TSDB_RELATION_IN) {
|
||||||
|
taosTFree(pInfo->q);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(param);
|
||||||
|
}
|
||||||
|
|
||||||
// handle data in cache situation
|
// handle data in cache situation
|
||||||
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
||||||
|
@ -1796,6 +1923,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueryHandle->checkFiles) {
|
if (pQueryHandle->checkFiles) {
|
||||||
|
// check if the query range overlaps with the file data block
|
||||||
bool exists = true;
|
bool exists = true;
|
||||||
|
|
||||||
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
|
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
|
||||||
|
@ -1824,150 +1952,48 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
|
STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
|
||||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
|
STimeWindow window = {INT64_MAX, INT64_MIN};
|
||||||
assert(!ASCENDING_TRAVERSE(pQueryHandle->order));
|
|
||||||
|
|
||||||
// starts from the buffer in case of descending timestamp order check data blocks
|
|
||||||
|
|
||||||
|
// NOTE: starts from the buffer in case of descending timestamp order check data blocks
|
||||||
// todo consider the query time window, current last_row does not apply the query time window
|
// todo consider the query time window, current last_row does not apply the query time window
|
||||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
|
||||||
|
for(int32_t j = 0; j < numOfGroups; ++j) {
|
||||||
|
SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
|
||||||
TSKEY key = TSKEY_INITIAL_VAL;
|
TSKEY key = TSKEY_INITIAL_VAL;
|
||||||
int32_t index = -1;
|
|
||||||
|
|
||||||
|
STableKeyInfo keyInfo = {0};
|
||||||
|
|
||||||
|
size_t numOfTables = taosArrayGetSize(pGroup);
|
||||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
|
||||||
if (pCheckInfo->pTableObj->lastKey > key) {
|
TSKEY lastKey = ((STable*)(pKeyInfo->pTable))->lastKey;
|
||||||
key = pCheckInfo->pTableObj->lastKey;
|
|
||||||
index = i;
|
if (key < lastKey) {
|
||||||
|
key = lastKey;
|
||||||
|
|
||||||
|
keyInfo.pTable = pKeyInfo->pTable;
|
||||||
|
keyInfo.lastKey = key;
|
||||||
|
pKeyInfo->lastKey = key;
|
||||||
|
|
||||||
|
if (key < window.skey) {
|
||||||
|
window.skey = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (key > window.ekey) {
|
||||||
|
window.ekey = key;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index == -1) {
|
// more than one table in each group, only one table left for each group
|
||||||
// todo add failure test cases
|
if (numOfTables > 1) {
|
||||||
return;
|
taosArrayClear(pGroup);
|
||||||
}
|
taosArrayPush(pGroup, &keyInfo);
|
||||||
|
|
||||||
// erase all other elements in array list
|
|
||||||
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
|
||||||
if (i == index) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
|
||||||
tSkipListDestroyIter(pTableCheckInfo->iter);
|
|
||||||
|
|
||||||
if (pTableCheckInfo->pDataCols != NULL) {
|
|
||||||
taosTFree(pTableCheckInfo->pDataCols->buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosTFree(pTableCheckInfo->pDataCols);
|
|
||||||
taosTFree(pTableCheckInfo->pCompInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, index);
|
|
||||||
taosArrayClear(pQueryHandle->pTableCheckInfo);
|
|
||||||
|
|
||||||
info.lastKey = key;
|
|
||||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
|
||||||
|
|
||||||
// update the query time window according to the chosen last timestamp
|
|
||||||
pQueryHandle->window = (STimeWindow) {key, key};
|
|
||||||
}
|
|
||||||
|
|
||||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
|
|
||||||
// filter the queried time stamp in the first place
|
|
||||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
|
||||||
pQueryHandle->order = TSDB_ORDER_DESC;
|
|
||||||
|
|
||||||
assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
|
|
||||||
|
|
||||||
// starts from the buffer in case of descending timestamp order check data blocks
|
|
||||||
// todo consider the query time window, current last_row does not apply the query time window
|
|
||||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
|
||||||
|
|
||||||
int32_t i = 0;
|
|
||||||
while(i < numOfTables) {
|
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
|
||||||
if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
|
|
||||||
pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
|
|
||||||
// there are no data in all the tables
|
|
||||||
if (i == numOfTables) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
STableCheckInfo info = *(STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
|
||||||
taosArrayClear(pQueryHandle->pTableCheckInfo);
|
|
||||||
|
|
||||||
info.lastKey = pQueryHandle->window.skey;
|
|
||||||
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
|
|
||||||
|
|
||||||
// update the query time window according to the chosen last timestamp
|
|
||||||
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
|
||||||
STsdbQueryHandle* pQueryHandle) {
|
|
||||||
int numOfRows = 0;
|
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryHandle->pColumns);
|
|
||||||
win->skey = TSKEY_INITIAL_VAL;
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
|
||||||
STable* pTable = pCheckInfo->pTableObj;
|
|
||||||
|
|
||||||
do {
|
|
||||||
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order);
|
|
||||||
if (row == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
TSKEY key = dataRowKey(row);
|
|
||||||
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
|
||||||
tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
|
|
||||||
pQueryHandle->window.ekey);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (win->skey == INT64_MIN) {
|
|
||||||
win->skey = key;
|
|
||||||
}
|
|
||||||
|
|
||||||
win->ekey = key;
|
|
||||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
|
|
||||||
|
|
||||||
if (++numOfRows >= maxRowsToRead) {
|
|
||||||
moveToNextRowInMem(pCheckInfo);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
} while(moveToNextRowInMem(pCheckInfo));
|
|
||||||
|
|
||||||
assert(numOfRows <= maxRowsToRead);
|
|
||||||
|
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
|
||||||
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
|
|
||||||
int32_t emptySize = maxRowsToRead - numOfRows;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
|
||||||
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t elapsedTime = taosGetTimestampUs() - st;
|
return window;
|
||||||
tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %p", pQueryHandle,
|
|
||||||
elapsedTime, numOfRows, numOfCols, pQueryHandle->qinfo);
|
|
||||||
|
|
||||||
return numOfRows;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* pDataBlockInfo) {
|
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* pDataBlockInfo) {
|
||||||
|
@ -2104,36 +2130,6 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
|
|
||||||
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
|
|
||||||
while (tSkipListIterNext(iter)) {
|
|
||||||
SSkipListNode* pNode = tSkipListIterGet(iter);
|
|
||||||
|
|
||||||
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
|
|
||||||
|
|
||||||
STableKeyInfo info = {.pTable = *pTable, .lastKey = TSKEY_INITIAL_VAL};
|
|
||||||
taosArrayPush(list, &info);
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipListDestroyIter(iter);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyHelper(void* param) {
|
|
||||||
if (param == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
tQueryInfo* pInfo = (tQueryInfo*)param;
|
|
||||||
if (pInfo->optr != TSDB_RELATION_IN) {
|
|
||||||
taosTFree(pInfo->q);
|
|
||||||
}
|
|
||||||
|
|
||||||
// tVariantDestroy(&(pInfo->q));
|
|
||||||
free(param);
|
|
||||||
}
|
|
||||||
|
|
||||||
void filterPrepare(void* expr, void* param) {
|
void filterPrepare(void* expr, void* param) {
|
||||||
tExprNode* pExpr = (tExprNode*)expr;
|
tExprNode* pExpr = (tExprNode*)expr;
|
||||||
if (pExpr->_node.info != NULL) {
|
if (pExpr->_node.info != NULL) {
|
||||||
|
@ -2160,13 +2156,7 @@ void filterPrepare(void* expr, void* param) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct STableGroupSupporter {
|
static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
||||||
int32_t numOfCols;
|
|
||||||
SColIndex* pCols;
|
|
||||||
STSchema* pTagSchema;
|
|
||||||
} STableGroupSupporter;
|
|
||||||
|
|
||||||
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
|
||||||
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
|
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
|
||||||
STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
|
STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
|
||||||
STable* pTable2 = ((STableKeyInfo*) p2)->pTable;
|
STable* pTable2 = ((STableKeyInfo*) p2)->pTable;
|
||||||
|
@ -2219,8 +2209,19 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey, STableGroupSupporter* pSupp,
|
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
|
||||||
__ext_compar_fn_t compareFn) {
|
if (((STableCheckInfo*)key1)->tableId.tid < ((STableCheckInfo*)key2)->tableId.tid) {
|
||||||
|
return -1;
|
||||||
|
} else if (((STableCheckInfo*)key1)->tableId.tid > ((STableCheckInfo*)key2)->tableId.tid) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
ASSERT(false);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
|
||||||
|
STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
|
||||||
STable* pTable = taosArrayGetP(pTableList, 0);
|
STable* pTable = taosArrayGetP(pTableList, 0);
|
||||||
|
|
||||||
SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));
|
SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));
|
||||||
|
@ -2293,7 +2294,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
|
||||||
return pTableGroup;
|
return pTableGroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool indexedNodeFilterFp(const void* pNode, void* param) {
|
static bool indexedNodeFilterFp(const void* pNode, void* param) {
|
||||||
tQueryInfo* pInfo = (tQueryInfo*) param;
|
tQueryInfo* pInfo = (tQueryInfo*) param;
|
||||||
|
|
||||||
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
|
STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
|
||||||
|
@ -2591,14 +2592,3 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
|
||||||
|
|
||||||
taosArrayDestroy(pGroupList->pGroupList);
|
taosArrayDestroy(pGroupList->pGroupList);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
|
|
||||||
if (((STableCheckInfo*)key1)->tableId.tid < ((STableCheckInfo*)key2)->tableId.tid) {
|
|
||||||
return -1;
|
|
||||||
} else if (((STableCheckInfo*)key1)->tableId.tid > ((STableCheckInfo*)key2)->tableId.tid) {
|
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
ASSERT(false);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue