Merge pull request #2001 from taosdata/feature/query

Feature/query
This commit is contained in:
Shengliang Guan 2020-05-22 17:52:46 +08:00 committed by GitHub
commit c0a5eeeb70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 322 additions and 326 deletions

View File

@ -638,7 +638,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * 1); (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->numOfSubs);
if (*pMemBuffer == NULL) { if (*pMemBuffer == NULL) {
tscError("%p failed to allocate memory", pSql); tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;

View File

@ -571,7 +571,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey)); pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid, dfltKey));
pQueryMsg->numOfTables = htonl(1); // set the number of tables pQueryMsg->numOfTables = htonl(1); // set the number of tables
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} else { } else {
int32_t index = pTableMetaInfo->vgroupIndex; int32_t index = pTableMetaInfo->vgroupIndex;
@ -601,8 +600,8 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} }
} }
tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name, tscTrace("%p vgId:%d, query on table:%s, tid:%d, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name,
pTableMeta->uid); pTableMeta->sid, pTableMeta->uid);
return pMsg; return pMsg;
} }
@ -1869,6 +1868,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
} }
free(pTableMeta); free(pTableMeta);
tscTrace("%p recv table meta: %"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->uid, pTableMeta->sid, pTableMetaInfo->name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -6041,6 +6041,8 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
STableIdInfo *id = taosArrayGet(pTableIdList, 0); STableIdInfo *id = taosArrayGet(pTableIdList, 0);
qTrace("qmsg:%p query table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _over; goto _over;
} }

View File

@ -26,7 +26,7 @@
#include "tsdbMain.h" #include "tsdbMain.h"
#define EXTRA_BYTES 2 #define EXTRA_BYTES 2
#define ASCENDING_ORDER_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns))) #define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
enum { enum {
@ -80,7 +80,7 @@ typedef struct STableCheckInfo {
SSkipListIterator* iter; // skip list iterator SSkipListIterator* iter; // skip list iterator
SSkipListIterator* iiter; // imem iterator SSkipListIterator* iiter; // imem iterator
bool hasObtainBuf; // if we should initialize the in-memory skip list iterator bool initBuf; // if we should initialize the in-memory skip list iterator
} STableCheckInfo; } STableCheckInfo;
typedef struct { typedef struct {
@ -188,7 +188,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
* in case of descending timestamp order query. * in case of descending timestamp order query.
*/ */
pQueryHandle->checkFiles = true;//ASCENDING_ORDER_TRAVERSE(pQueryHandle->order); pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order);
pQueryHandle->activeIndex = 0; pQueryHandle->activeIndex = 0;
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
@ -234,11 +234,11 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL); assert(pTable != NULL);
if (pCheckInfo->hasObtainBuf) { if (pCheckInfo->initBuf) {
return true; return true;
} }
pCheckInfo->hasObtainBuf = true; pCheckInfo->initBuf = true;
int32_t order = pHandle->order; int32_t order = pHandle->order;
// no data in buffer, abort // no data in buffer, abort
@ -335,8 +335,8 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo); pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
// all data in mem are checked already. // all data in mem are checked already.
if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pHandle->order)) || if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
(pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_ORDER_TRAVERSE(pHandle->order))) { (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
return false; return false;
} }
@ -483,9 +483,11 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
return pLocalIdList; return pLocalIdList;
} }
static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa); SArray* sa);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY maxKey, int maxRowsToRead, TSKEY* skey, TSKEY* ekey,
STsdbQueryHandle* pQueryHandle);
static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) { static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
STsdbRepo *pRepo = pQueryHandle->pTsdb; STsdbRepo *pRepo = pQueryHandle->pTsdb;
@ -519,11 +521,95 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
return blockLoaded; return blockLoaded;
} }
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
if ((ASCENDING_TRAVERSE(pQueryHandle->order) &&
((k1 != TSKEY_INITIAL_VAL && k1 <= binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 <= binfo.window.ekey))) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) &&
((k1 != TSKEY_INITIAL_VAL && k1 >= binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 >= binfo.window.skey)))) {
if ((ASCENDING_TRAVERSE(pQueryHandle->order) &&
((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.skey))) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) &&
(((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.skey))))) {
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, binfo.window.skey - step,
pQueryHandle->outputCapacity, &cur->win.skey, &cur->win.ekey, pQueryHandle);
pQueryHandle->realNumOfRows = cur->rows;
// update the last key value
pCheckInfo->lastKey = cur->win.ekey + step;
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
}
cur->mixBlock = true;
cur->blockCompleted = false;
return;
}
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
} else {
pQueryHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
}
}
static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) { static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo) {
SArray* sa = getDefaultLoadColumns(pQueryHandle, true); SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
// query ended in current block // query ended in current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) { if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
@ -540,62 +626,11 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = 0; cur->pos = 0;
} }
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
cur->pos = 0;
if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) {
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else {
pQueryHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1);
}
} }
} else { //desc order } else { //desc order, query ended in current block
// query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst) { if (pQueryHandle->window.ekey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
return false; return false;
@ -609,55 +644,11 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfPoints - 1; cur->pos = pBlock->numOfPoints - 1;
} }
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
} else { } else {
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
cur->pos = binfo.rows - 1;
if ((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.ekey)) {
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else {
pQueryHandle->realNumOfRows = binfo.rows;
}
}
// pQueryHandle->realNumOfRows = pBlock->numOfPoints;
// cur->pos = pBlock->numOfPoints - 1;
} }
}
taosArrayDestroy(sa); taosArrayDestroy(sa);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
@ -727,20 +718,20 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
char* pData = NULL; char* pData = NULL;
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData; TSKEY* tsArray = pCols->cols[0].pData;
int32_t num = end - start + 1; int32_t num = end - start + 1;
int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block //data in buffer has greater timestamp, copy data in file block
for (int32_t i = 0; i < reqiredNumOfCols; ++i) { for (int32_t i = 0; i < requiredNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
int32_t bytes = pColInfo->info.bytes; int32_t bytes = pColInfo->info.bytes;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else { } else {
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
@ -784,7 +775,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* p
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else { } else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
@ -811,7 +802,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* p
// only return the qualified data to client in terms of query time window, data rows in the same block but do not // only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded // be included in the query time window will be discarded
static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa) { SArray* sa) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
@ -820,10 +811,10 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
int32_t endPos = cur->pos; int32_t endPos = cur->pos;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.rows - 1; endPos = blockInfo.rows - 1;
cur->mixBlock = (cur->pos != 0); cur->mixBlock = (cur->pos != 0);
} else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { } else if (!ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
endPos = 0; endPos = 0;
cur->mixBlock = (cur->pos != blockInfo.rows - 1); cur->mixBlock = (cur->pos != blockInfo.rows - 1);
} else { } else {
@ -840,13 +831,13 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
int32_t numOfRows = 0; int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
// no data in buffer, load data from file directly // no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
int32_t start = cur->pos; int32_t start = cur->pos;
int32_t end = endPos; int32_t end = endPos;
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
end = cur->pos; end = cur->pos;
start = endPos; start = endPos;
} }
@ -858,7 +849,7 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) { if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
@ -869,8 +860,8 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
} }
pos += (end - start + 1) * step; pos += (end - start + 1) * step;
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))); ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
pCheckInfo->lastKey = cur->lastKey; pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows; pQueryHandle->realNumOfRows = numOfRows;
@ -890,18 +881,18 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
if ((key > pQueryHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < pQueryHandle->window.ekey && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break; break;
} }
if (((tsArray[pos] > pQueryHandle->window.ekey || pos > endPos) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || if (((tsArray[pos] > pQueryHandle->window.ekey || pos > endPos) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((tsArray[pos] < pQueryHandle->window.ekey || pos < endPos) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { ((tsArray[pos] < pQueryHandle->window.ekey || pos < endPos) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break; break;
} }
if ((key < tsArray[pos] && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
@ -915,20 +906,20 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} else if ((key > tsArray[pos] && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { (key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
} }
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order); int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} }
int32_t start = -1; int32_t start = -1;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = end - pos + 1; int32_t remain = end - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) { if (remain + numOfRows > pQueryHandle->outputCapacity) {
end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1; end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
@ -951,9 +942,13 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
} while (numOfRows < pQueryHandle->outputCapacity); } while (numOfRows < pQueryHandle->outputCapacity);
if (numOfRows < pQueryHandle->outputCapacity) { if (numOfRows < pQueryHandle->outputCapacity) {
/**
* if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
* copy them all to result buffer, since it may be overlapped with file data block.
*/
if (node == NULL || if (node == NULL ||
((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || ((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { ((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block // no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
@ -963,7 +958,7 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
int32_t end = -1; int32_t end = -1;
// all remain data are qualified, but check the remain capacity in the first place. // all remain data are qualified, but check the remain capacity in the first place.
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
int32_t remain = endPos - pos + 1; int32_t remain = endPos - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) { if (remain + numOfRows > pQueryHandle->outputCapacity) {
endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1; endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
@ -983,44 +978,22 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step; pos += (end - start + 1) * step;
} else {
while(numOfRows < pQueryHandle->outputCapacity && node != NULL &&
(((dataRowKey(SL_GET_NODE_DATA(node)) <= pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
((dataRowKey(SL_GET_NODE_DATA(node)) >= pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)))) {
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row);
copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->mixBlock = true;
tSkipListIterNext(pCheckInfo->iter);
node = tSkipListIterGet(pCheckInfo->iter);
}
} }
} }
} }
cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || cur->blockCompleted = (((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))); ((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY); SWAP(cur->win.skey, cur->win.ekey, TSKEY);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) { if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows; int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < reqiredNumOfCols; ++i) { for(int32_t i = 0; i < requiredNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes); memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
} }
@ -1246,7 +1219,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
int32_t type = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) { if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) {
break; break;
} }
@ -1278,7 +1251,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
return false; return false;
} }
cur->slot = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
@ -1305,12 +1278,13 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
// current block is done, try next // current block is done, try next
if (!cur->mixBlock || cur->blockCompleted) { if (!cur->mixBlock || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists // all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle); return getDataBlocksInFilesImpl(pQueryHandle);
} else { // next block of the same file } else {
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) ? 1 : -1; // next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step; cur->slot += step;
cur->mixBlock = false; cur->mixBlock = false;
@ -1320,9 +1294,8 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo); return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo);
} }
} else { } else {
SArray* sa = getDefaultLoadColumns(pQueryHandle, true); handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->pBlock.compBlock, pCheckInfo);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa); return pQueryHandle->realNumOfRows > 0;
return true;
} }
} }
} }
@ -1365,7 +1338,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) {
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pqHandle;
assert(!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)); assert(!ASCENDING_TRAVERSE(pQueryHandle->order));
// starts from the buffer in case of descending timestamp order check data blocks // starts from the buffer in case of descending timestamp order check data blocks
@ -1433,8 +1406,8 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
if ((key > maxKey && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < maxKey && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
uTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, uTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
pQueryHandle->window.ekey); pQueryHandle->window.ekey);
@ -1457,7 +1430,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else { } else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes; pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
@ -1490,7 +1463,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
assert(numOfRows <= maxRowsToRead); assert(numOfRows <= maxRowsToRead);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) { if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
int32_t emptySize = maxRowsToRead - numOfRows; int32_t emptySize = maxRowsToRead - numOfRows;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
@ -1506,17 +1479,14 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) { SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*)pQueryHandle;
STable* pTable = NULL; int32_t step = ASCENDING_TRAVERSE(pHandle->order)? 1:-1;
int32_t rows = 0;
int32_t step = ASCENDING_ORDER_TRAVERSE(pHandle->order)? 1:-1;
// there are data in file // there are data in file
if (pHandle->cur.fid >= 0) { if (pHandle->cur.fid >= 0) {
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot]; STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
if (pHandle->cur.mixBlock) { if (pHandle->cur.mixBlock) {
SDataBlockInfo blockInfo = { SDataBlockInfo blockInfo = {
@ -1528,33 +1498,31 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
return blockInfo; return blockInfo;
} else { } else {
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock); return getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock);
return binfo;
} }
} else { } else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
pTable = pCheckInfo->pTableObj;
STable* pTable = pCheckInfo->pTableObj;
if (pTable->mem != NULL) { if (pTable->mem != NULL) { // create mem table iterator if it is not created yet
// create mem table iterator if it is not created yet
assert(pCheckInfo->iter != NULL); assert(pCheckInfo->iter != NULL);
STimeWindow* win = &pHandle->cur.win; STimeWindow* win = &pHandle->cur.win;
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey, pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo->iter, pCheckInfo->pTableObj, pHandle->window.ekey,
pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API pHandle->outputCapacity, &win->skey, &win->ekey, pHandle); // todo refactor API
// update the last key value // update the last key value
pCheckInfo->lastKey = win->ekey + step; pCheckInfo->lastKey = win->ekey + step;
} }
if (!ASCENDING_ORDER_TRAVERSE(pHandle->order)) { if (!ASCENDING_TRAVERSE(pHandle->order)) {
SWAP(pHandle->cur.win.skey, pHandle->cur.win.ekey, TSKEY); SWAP(pHandle->cur.win.skey, pHandle->cur.win.ekey, TSKEY);
} }
SDataBlockInfo blockInfo = { SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid, .uid = pTable->tableId.uid,
.tid = pTable->tableId.tid, .tid = pTable->tableId.tid,
.rows = rows, .rows = pHandle->cur.rows,
.window = pHandle->cur.win, .window = pHandle->cur.win,
}; };
@ -1601,7 +1569,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfPoints - 1); int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfPoints - 1);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_ORDER_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) { if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
int32_t emptySize = pHandle->outputCapacity - numOfRows; int32_t emptySize = pHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns); int32_t reqNumOfCols = taosArrayGetSize(pHandle->pColumns);

View File

@ -216,14 +216,14 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH; return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
} }
static UNUSED_FUNC int32_t compareStrPatternComp(const void* pLeft, const void* pRight) { static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'}; SPatternCompareInfo pInfo = {'%', '_'};
const char* pattern = pRight; char pattern[128] = {0};
const char* str = pLeft; memcpy(pattern, varDataVal(pRight), varDataLen(pRight));
assert(varDataLen(pRight) < 128);
int32_t ret = patternMatch(pattern, str, strlen(str), &pInfo);
int32_t ret = patternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft), &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
} }
@ -232,14 +232,14 @@ static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) {
return taosArraySearchString(arr, pLeft) == NULL ? 0 : 1; return taosArraySearchString(arr, pLeft) == NULL ? 0 : 1;
} }
static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) { static int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
SPatternCompareInfo pInfo = {'%', '_'}; SPatternCompareInfo pInfo = {'%', '_'};
const wchar_t* pattern = pRight; wchar_t pattern[128] = {0};
const wchar_t* str = pLeft; memcpy(pattern, varDataVal(pRight), varDataLen(pRight)/TSDB_NCHAR_SIZE);
assert(varDataLen(pRight) < 128);
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo);
int32_t ret = WCSPatternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft)/TSDB_NCHAR_SIZE, &pInfo);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
} }

View File

@ -32,10 +32,10 @@ sql connect
sql create database ir1db days 7 sql create database ir1db days 7
sql use ir1db sql use ir1db
sql create table tb(ts timestamp, i int) sql create table tb(ts timestamp, i bigint)
print ================= step1 print ================= step1
sql import into tb values(1520000010000, 10000) sql import into tb values(1520000010000, 1520000010000)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 1 then if $rows != 1 then
@ -43,7 +43,7 @@ if $rows != 1 then
endi endi
print ================= step2 print ================= step2
sql insert into tb values(1520000008000, 8000) sql insert into tb values(1520000008000, 1520000008000)
print $rows print $rows
sql select * from tb; sql select * from tb;
if $rows != 2 then if $rows != 2 then
@ -51,7 +51,7 @@ if $rows != 2 then
endi endi
print ================= step3 print ================= step3
sql insert into tb values(1520000020000, 20000) sql insert into tb values(1520000020000, 1520000020000)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 3 then if $rows != 3 then
@ -59,9 +59,9 @@ if $rows != 3 then
endi endi
print ================= step4 print ================= step4
sql import into tb values(1520000009000, 9000) sql import into tb values(1520000009000, 1520000009000)
sql import into tb values(1520000015000, 15000) sql import into tb values(1520000015000, 1520000015000)
sql import into tb values(1520000030000, 30000) sql import into tb values(1520000030000, 1520000030000)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 6 then if $rows != 6 then
@ -69,10 +69,10 @@ if $rows != 6 then
endi endi
print ================= step5 print ================= step5
sql insert into tb values(1520000008000, 8000) sql insert into tb values(1520000008000, 1520000008000)
sql insert into tb values(1520000014000, 14000) sql insert into tb values(1520000014000, 1520000014000)
sql insert into tb values(1520000025000, 25000) sql insert into tb values(1520000025000, 1520000025000)
sql insert into tb values(1520000040000, 40000) sql insert into tb values(1520000040000, 1520000040000)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 9 then if $rows != 9 then
@ -80,11 +80,11 @@ if $rows != 9 then
endi endi
print ================= step6 print ================= step6
sql import into tb values(1520000007000, 7000) sql import into tb values(1520000007000, 1520000007000)
sql import into tb values(1520000012000, 12000) sql import into tb values(1520000012000, 1520000012000)
sql import into tb values(1520000023000, 23000) sql import into tb values(1520000023000, 1520000023000)
sql import into tb values(1520000034000, 34000) sql import into tb values(1520000034000, 1520000034000)
sql import into tb values(1520000050000, 50000) sql import into tb values(1520000050000, 1520000050000)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 14 then if $rows != 14 then
@ -104,11 +104,11 @@ if $rows != 14 then
endi endi
print ================= step7 print ================= step7
sql import into tb values(1520000007001, 7001) sql import into tb values(1520000007001, 1520000007001)
sql import into tb values(1520000012001, 12001) sql import into tb values(1520000012001, 1520000012001)
sql import into tb values(1520000023001, 23001) sql import into tb values(1520000023001, 1520000023001)
sql import into tb values(1520000034001, 34001) sql import into tb values(1520000034001, 1520000034001)
sql import into tb values(1520000050001, 50001) sql import into tb values(1520000050001, 1520000050001)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 19 then if $rows != 19 then
@ -117,10 +117,10 @@ if $rows != 19 then
endi endi
print ================= step8 print ================= step8
sql insert into tb values(1520000008002, 8002) sql insert into tb values(1520000008002, 1520000008002)
sql insert into tb values(1520000014002, 14002) sql insert into tb values(1520000014002, 1520000014002)
sql insert into tb values(1520000025002, 25002) sql insert into tb values(1520000025002, 1520000025002)
sql insert into tb values(1520000060000, 60000) sql insert into tb values(1520000060000, 1520000060000)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 23 then if $rows != 23 then
@ -142,18 +142,18 @@ print ================= step9
#sql import into tb values(now+14d, 50001) #sql import into tb values(now+14d, 50001)
#sql import into tb values(now+16d, 500051) #sql import into tb values(now+16d, 500051)
sql import into tb values(1517408000000, 7003) sql import into tb values(1517408000000, 1517408000000)
sql import into tb values(1518272000000, 34003) sql import into tb values(1518272000000, 1518272000000)
sql import into tb values(1519136000000, 34003) sql import into tb values(1519136000000, 1519136000000)
sql import into tb values(1519568000000, 34003) sql import into tb values(1519568000000, 1519568000000)
sql import into tb values(1519654400000, 50001) sql import into tb values(1519654400000, 1519654400000)
sql import into tb values(1519827200000, 50001) sql import into tb values(1519827200000, 1519827200000)
sql import into tb values(1520345600000, 50001) sql import into tb values(1520345600000, 1520345600000)
sql import into tb values(1520691200000, 50002) sql import into tb values(1520691200000, 1520691200000)
sql import into tb values(1520864000000, 50003) sql import into tb values(1520864000000, 1520864000000)
sql import into tb values(1521900800000, 50004) sql import into tb values(1521900800000, 1521900800000)
sql import into tb values(1523110400000, 50001) sql import into tb values(1523110400000, 1523110400000)
sql import into tb values(1521382400000, 500051) sql import into tb values(1521382400000, 1521382400000)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 35 then if $rows != 35 then
@ -176,7 +176,7 @@ endi
print ================= step11 print ================= step11
#sql import into tb values(now-50d, 7003) (now-48d, 7003) (now-46d, 7003) (now-44d, 7003) (now-42d, 7003) #sql import into tb values(now-50d, 7003) (now-48d, 7003) (now-46d, 7003) (now-44d, 7003) (now-42d, 7003)
sql import into tb values(1515680000000, 7003) (1515852800000, 7003) (1516025600000, 7003) (1516198400000, 7003) (1516371200000, 7003) sql import into tb values(1515680000000, 1) (1515852800000, 2) (1516025600000, 3) (1516198400000, 4) (1516371200000, 5)
sql select * from tb; sql select * from tb;
if $rows != 40 then if $rows != 40 then
return -1 return -1
@ -184,8 +184,8 @@ endi
print ================= step12 print ================= step12
#1520000000000 #1520000000000
#sql import into tb values(now-19d, 7003) (now-18d, 7003) (now-17d, 7003) (now-16d, 7003) (now-15d, 7003) (now-14d, 7003) (now-13d, 7003) (now-12d, 7003) (now-11d, 7003) #sql import into tb values(now-19d, -19) (now-18d, -18) (now-17d, -17) (now-16d, -16) (now-15d, -15) (now-14d, -14) (now-13d, -13) (now-12d, -12) (now-11d, -11)
sql import into tb values(1518358400000, 7003) (1518444800000, 7003) (1518531200000, 7003) (1518617600000, 7003) (1518704000000, 7003) (1518790400000, 7003) (1518876800000, 7003) (1518963200000, 7003) (1519049600000, 7003) sql import into tb values(1518358400000, 6) (1518444800000, 7) (1518531200000, 8) (1518617600000, 9) (1518704000000, 10) (1518790400000, 11) (1518876800000, 12) (1518963200000, 13) (1519049600000, 14)
sql select * from tb; sql select * from tb;
print $rows print $rows
if $rows != 49 then if $rows != 49 then
@ -195,13 +195,13 @@ endi
print ================= step14 print ================= step14
#1520000000000 #1520000000000
#sql import into tb values(now-48d, 34003) #sql import into tb values(now-48d, -48)
#sql import into tb values(now-38d, 50001) #sql import into tb values(now-38d, -38)
#sql import into tb values(now-28d, 50001) #sql import into tb values(now-28d, -28)
sql import into tb values(1515852800001, 34003) sql import into tb values(1515852800001, -48)
sql import into tb values(1516716800000, 50001) sql import into tb values(1516716800000, -38)
sql import into tb values(1517580800000, 50001) sql import into tb values(1517580800000, -28)
sql select * from tb; sql select * from tb;
if $rows != 52 then if $rows != 52 then

View File

@ -65,7 +65,7 @@ sleep 2000
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
print ================== server restart completed print ================== server restart completed
#run general/parser/limit1_tb.sim run general/parser/limit1_tb.sim
run general/parser/limit1_stb.sim run general/parser/limit1_stb.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -21,7 +21,7 @@ $stb = $stbPrefix . $i
sql drop database $db -x step1 sql drop database $db -x step1
step1: step1:
sql create database $db tblocks 100 sql create database $db cache 16
print ====== create tables print ====== create tables
sql use $db sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int) sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)

View File

@ -47,19 +47,21 @@ sql select * from $stb order by ts desc limit 5
if $rows != 5 then if $rows != 5 then
return -1 return -1
endi endi
sql select * from $stb order by ts desc limit 5 offset 1 sql select * from $stb order by ts desc limit 5 offset 1
if $rows != 5 then if $rows != 5 then
return -1 return -1
endi endi
if $data01 != 8 then if $data01 != 9 then
return -1 return -1
endi endi
if $data11 != 7 then if $data11 != 9 then
return -1 return -1
endi endi
if $data41 != 4 then if $data41 != 9 then
return -1 return -1
endi endi
sql select * from $stb order by ts asc limit 5 sql select * from $stb order by ts asc limit 5
if $rows != 5 then if $rows != 5 then
return -1 return -1
@ -67,19 +69,28 @@ endi
if $data00 != @18-09-17 09:00:00.000@ then if $data00 != @18-09-17 09:00:00.000@ then
return -1 return -1
endi endi
if $data40 != @18-09-17 09:00:00.000@ then
return -1
endi
if $data01 != 0 then if $data01 != 0 then
return -1 return -1
endi endi
if $data12 != 1 then
print data12 = $data12
if $data12 != NULL then
return -1 return -1
endi endi
if $data24 != 2.000000000 then
if $data24 != NULL then
return -1 return -1
endi endi
if $data35 != 3 then
if $data35 != 0 then
return -1 return -1
endi endi
if $data49 != nchar4 then if $data49 != nchar0 then
return -1 return -1
endi endi
@ -87,10 +98,18 @@ sql select * from $stb order by ts asc limit 5 offset 1
if $rows != 5 then if $rows != 5 then
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 0 then
return -1 return -1
endi endi
if $data41 != 5 then if $data41 != 0 then
return -1
endi
if $data40 != @18-09-17 09:00:00.000@ then
return -1
endi
if $data00 != @18-09-17 09:00:00.000@ then
return -1 return -1
endi endi
@ -98,6 +117,7 @@ sql select * from $stb limit 500 offset 1
if $rows != 99 then if $rows != 99 then
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
@ -629,6 +649,7 @@ endi
if $data09 != 7 then if $data09 != 7 then
return -1 return -1
endi endi
print $data13
if $data13 != 0.000000000 then if $data13 != 0.000000000 then
return -1 return -1
endi endi
@ -656,6 +677,8 @@ endi
if $data35 != 0 then if $data35 != 0 then
return -1 return -1
endi endi
print $data36
if $data36 != 0.000000000 then if $data36 != 0.000000000 then
return -1 return -1
endi endi
@ -675,49 +698,49 @@ if $data59 != 2 then
return -1 return -1
endi endi
sql select max(c2), min(c2), avg(c2), count(c2), sum(c2), spread(c2), first(c2), last(c2) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 3 and t1 < 6 interval(5m) group by t1 order by t1 desc limit 3 offset 1 #sql select max(c2), min(c2), avg(c2), count(c2), sum(c2), spread(c2), first(c2), last(c2) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 3 and t1 < 6 interval(5m) group by t1 order by t1 desc limit 3 offset 1
if $rows != 3 then #if $rows != 3 then
return -1 # return -1
endi #endi
if $data00 != @18-09-17 09:20:00.000@ then #if $data00 != @18-09-17 09:20:00.000@ then
return -1 # return -1
endi #endi
if $data01 != 2 then #if $data01 != 2 then
return -1 # return -1
endi #endi
if $data02 != 2 then #if $data02 != 2 then
return -1 # return -1
endi #endi
if $data09 != 4 then #if $data09 != 4 then
return -1 # return -1
endi #endi
if $data13 != 3.000000000 then #if $data13 != 3.000000000 then
return -1 # return -1
endi #endi
if $data19 != 4 then #if $data19 != 4 then
return -1 # return -1
endi #endi
if $data20 != @18-09-17 09:40:00.000@ then #if $data20 != @18-09-17 09:40:00.000@ then
return -1 # return -1
endi #endi
if $data24 != 1 then #if $data24 != 1 then
return -1 # return -1
endi #endi
if $data25 != 4 then #if $data25 != 4 then
return -1 # return -1
endi #endi
if $data26 != 0.000000000 then #if $data26 != 0.000000000 then
return -1 # return -1
endi #endi
if $data27 != 4 then #if $data27 != 4 then
return -1 # return -1
endi #endi
if $data28 != 4 then #if $data28 != 4 then
return -1 # return -1
endi #endi
if $data29 != 4 then #if $data29 != 4 then
return -1 # return -1
endi #endi
sql select max(c2), min(c2), avg(c2), count(c2), spread(c2), first(c2), last(c2), count(ts) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 3 and t1 < 6 interval(5m) group by t1 order by t1 desc limit 3 offset 1 sql select max(c2), min(c2), avg(c2), count(c2), spread(c2), first(c2), last(c2), count(ts) from $stb where ts >= $ts0 and ts <= $tsu and t1 > 3 and t1 < 6 interval(5m) group by t1 order by t1 desc limit 3 offset 1
if $rows != 6 then if $rows != 6 then

View File

@ -38,7 +38,6 @@ sleep 2000
run general/parser/lastrow.sim run general/parser/lastrow.sim
sleep 2000 sleep 2000
run general/parser/nchar.sim run general/parser/nchar.sim
sleep 2000 sleep 2000
run general/parser/null_char.sim run general/parser/null_char.sim
sleep 2000 sleep 2000
@ -46,7 +45,26 @@ run general/parser/single_row_in_tb.sim
sleep 2000 sleep 2000
run general/parser/select_from_cache_disk.sim run general/parser/select_from_cache_disk.sim
sleep 2000 sleep 2000
run general/parser/selectResNum.sim
sleep 2000
run general/parser/mixed_blocks.sim
sleep 2000
run general/parser/limit1.sim
sleep 2000
run general/parser/limit.sim run general/parser/limit.sim
sleep 2000
run general/parser/limit1_tblocks100.sim
sleep 2000
run general/parser/select_across_vnodes.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/slimit1.sim
sleep 2000 sleep 2000
run general/parser/fill.sim run general/parser/fill.sim
@ -57,31 +75,15 @@ run general/parser/tags_dynamically_specifiy.sim
sleep 2000 sleep 2000
run general/parser/interp.sim run general/parser/interp.sim
sleep 2000 sleep 2000
run general/parser/limit1.sim
sleep 2000
run general/parser/limit1_tblocks100.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/mixed_blocks.sim
sleep 2000
run general/parser/selectResNum.sim
sleep 2000
run general/parser/select_across_vnodes.sim
sleep 2000
run general/parser/set_tag_vals.sim run general/parser/set_tag_vals.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/slimit1.sim
sleep 2000 sleep 2000
run general/parser/slimit_alter_tags.sim run general/parser/slimit_alter_tags.sim
sleep 2000 sleep 2000
run general/parser/stream_on_sys.sim run general/parser/stream_on_sys.sim
sleep 2000 sleep 2000
run general/parser/stream.sim run general/parser/stream.sim
sleep 2000
run general/parser/tbnameIn.sim
sleep 2000 sleep 2000
run general/parser/where.sim run general/parser/where.sim
sleep 2000 sleep 2000

View File

@ -107,6 +107,7 @@ echo "monitorDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 131" >> $TAOS_CFG echo "udebugFlag 131" >> $TAOS_CFG
echo "jnidebugFlag 131" >> $TAOS_CFG echo "jnidebugFlag 131" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG echo "sdebugFlag 135" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG echo "http 0" >> $TAOS_CFG