[td-225] support the mem/imem records timestamp overlap with data in file block.

This commit is contained in:
Haojun Liao 2020-05-18 22:46:46 +08:00
parent af407cb50f
commit 3f8c72184f
1 changed files with 324 additions and 159 deletions

View File

@ -51,6 +51,7 @@ typedef struct SQueryFilePos {
int64_t lastKey; int64_t lastKey;
int32_t rows; int32_t rows;
bool mixBlock; bool mixBlock;
bool blockCompleted;
STimeWindow win; STimeWindow win;
} SQueryFilePos; } SQueryFilePos;
@ -187,7 +188,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place * For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
* in case of descending timestamp order query. * in case of descending timestamp order query.
*/ */
pQueryHandle->checkFiles = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order); pQueryHandle->checkFiles = true;//ASCENDING_ORDER_TRAVERSE(pQueryHandle->order);
pQueryHandle->activeIndex = 0; pQueryHandle->activeIndex = 0;
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
@ -229,7 +230,7 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond*
return pQueryHandle; return pQueryHandle;
} }
static bool initSkipListIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) { static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCheckInfo) {
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
assert(pTable != NULL); assert(pTable != NULL);
@ -254,7 +255,7 @@ static bool initSkipListIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
if (pTable->imem) { if (pTable->imem) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pTable->imem->pData, (const char*) &pCheckInfo->lastKey, pCheckInfo->iiter = tSkipListCreateIterFromVal(pTable->imem->pData, (const char*) &pCheckInfo->lastKey,
TSDB_DATA_TYPE_TIMESTAMP, order); TSDB_DATA_TYPE_TIMESTAMP, order);
} }
// both iterators are NULL, no data in buffer right now // both iterators are NULL, no data in buffer right now
@ -539,10 +540,51 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else { // the whole block is loaded in to buffer } else { // the whole block is loaded in to buffer
pQueryHandle->realNumOfRows = pBlock->numOfPoints; SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
cur->pos = 0; /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) {
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else {
pQueryHandle->realNumOfRows = binfo.rows;
cur->pos = 0;
}
} }
} else { } else { //desc order
// query ended in current block // query ended in current block
if (pQueryHandle->window.ekey > pBlock->keyFirst) { if (pQueryHandle->window.ekey > pBlock->keyFirst) {
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) { if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
@ -557,13 +599,53 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
cur->pos = pBlock->numOfPoints - 1; cur->pos = pBlock->numOfPoints - 1;
} }
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa); mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else { } else {
pQueryHandle->realNumOfRows = pBlock->numOfPoints; SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
cur->pos = pBlock->numOfPoints - 1; /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
cur->pos = binfo.rows - 1;
if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) {
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
}
}
// pQueryHandle->realNumOfRows = pBlock->numOfPoints;
// cur->pos = pBlock->numOfPoints - 1;
} }
}
taosArrayDestroy(sa); taosArrayDestroy(sa);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
@ -632,17 +714,14 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
} }
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo, int32_t capacity, static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo, int32_t capacity,
int32_t numOfRows, int32_t* pos, int32_t endPos) { int32_t numOfRows, int32_t start, int32_t end) {
char* pData = NULL; char* pData = NULL;
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1 : -1;
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
TSKEY* tsArray = pCols->cols[0].pData; TSKEY* tsArray = pCols->cols[0].pData;
int32_t numOfCols = pCols->numOfCols; int32_t num = end - start + 1;
int32_t n = (*pos); // todo: the output buffer limitation and the query time window?
while(n < pBlockInfo->rows && n <= endPos && ((n - (*pos) + numOfRows) < capacity)) { n++;}
int32_t num = n - (*pos);
int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block //data in buffer has greater timestamp, copy data in file block
@ -653,21 +732,21 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockI
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else { } else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
} }
for (int32_t j = 0; j < numOfCols; ++j) { // todo opt performance for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance
SDataCol* src = &pCols->cols[j]; SDataCol* src = &pCols->cols[j];
if (pColInfo->info.colId == src->colId) { if (pColInfo->info.colId == src->colId) {
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
memmove(pData, src->pData + bytes * (*pos), bytes * num); memmove(pData, src->pData + bytes * start, bytes * num);
} else { // handle the var-string } else { // handle the var-string
char* dst = pData; char* dst = pData;
// todo refactor, only copy one-by-one // todo refactor, only copy one-by-one
for (int32_t k = (*pos); k < num + (*pos); ++k) { for (int32_t k = start; k < num + start; ++k) {
char* p = tdGetColDataOfRow(src, k); char* p = tdGetColDataOfRow(src, k);
memcpy(dst, p, varDataTLen(p)); memcpy(dst, p, varDataTLen(p));
dst += bytes; dst += bytes;
@ -679,13 +758,10 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, SDataBlockI
} }
} }
*pos += num; pQueryHandle->cur.win.ekey = tsArray[end];
numOfRows += num; pQueryHandle->cur.lastKey = tsArray[end] + step;
pQueryHandle->cur.win.ekey = tsArray[(*pos) - 1]; return numOfRows + num;
pQueryHandle->cur.lastKey = pQueryHandle->cur.win.ekey + 1; // todo ???
return numOfRows;
} }
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity, static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity,
@ -729,103 +805,212 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
initSkipListIterator(pQueryHandle, pCheckInfo); initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
int32_t endPos = cur->pos; int32_t endPos = cur->pos;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.rows - 1; endPos = blockInfo.rows - 1;
cur->mixBlock = (cur->pos != 0);
} else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { } else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
endPos = 0; endPos = 0;
cur->mixBlock = (cur->pos != blockInfo.rows - 1);
} else { } else {
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order); endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order);
cur->mixBlock = true;
// if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
// if (endPos < cur->pos) {
// pQueryHandle->realNumOfRows = 0;
// return;
// } else {
// pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
// }
//
// pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
// } else {
// if (endPos > cur->pos) {
// pQueryHandle->realNumOfRows = 0;
// return;
// } else {
// pQueryHandle->realNumOfRows = cur->pos - endPos + 1;
// }
// }
} }
// compared with the data from in-memory buffer, to generate the correct timestamp array list // compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t pos = MIN(cur->pos, endPos); int32_t pos = cur->pos;
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0); assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == 0);
TSKEY* tsArray = pCols->cols[0].pData; TSKEY* tsArray = pCols->cols[0].pData;
int32_t numOfRows = 0; int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1;
// no data in buffer, load data from file directly // no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) { if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
cur->win.skey = tsArray[pos]; int32_t start = cur->pos;
copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); int32_t end = endPos;
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
end = cur->pos;
start = endPos;
}
cur->win.skey = tsArray[start];
cur->win.ekey = tsArray[end];
// todo opt in case of no data in buffer
numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, start, end);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < reqNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
return; return;
} else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) { } else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) {
// } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) { // } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) {
// } else { // iter and iiter are all not NULL, three-way merge data block // } else { // iter and iiter are all not NULL, three-way merge data block
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj); STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
SSkipListNode* node = NULL;
while (1) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); do {
node = tSkipListIterGet(pCheckInfo->iter);
if (node == NULL) { if (node == NULL) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
}
numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos);
break; break;
} }
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
if ((key > pQueryHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
if (key < tsArray[pos]) { (key < pQueryHandle->window.ekey && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
break;
}
if (((tsArray[pos] > pQueryHandle->window.ekey || pos > endPos) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
((tsArray[pos] < pQueryHandle->window.ekey || pos < endPos) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
break;
}
if ((key < tsArray[pos] && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema); copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema);
numOfRows += 1; numOfRows += 1;
cur->mixBlock = true;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
} }
cur->win.ekey = key; cur->win.ekey = key;
cur->lastKey = key + step;
cur->mixBlock = true;
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
if (numOfRows >= pQueryHandle->outputCapacity) {
break;
}
} else if (key == tsArray[pos]) { //data in buffer has the same timestamp of data in file block, ignore it
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} else if (key > tsArray[pos]) { } else if ((key > tsArray[pos] && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
} }
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
int32_t start = -1;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
int32_t remain = end - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
end = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
}
start = pos;
} else {
int32_t remain = (pos - end) + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
end = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
}
start = end;
end = pos;
}
numOfRows =
copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
if (numOfRows < pQueryHandle->outputCapacity) {
if (node == NULL ||
((dataRowKey(SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
((dataRowKey(SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
}
int32_t start = -1;
int32_t end = -1;
// all remain data are qualified, but check the remain capacity in the first place.
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
int32_t remain = endPos - pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
endPos = (pQueryHandle->outputCapacity - numOfRows) + pos - 1;
}
start = pos;
end = endPos;
} else {
int32_t remain = pos + 1;
if (remain + numOfRows > pQueryHandle->outputCapacity) {
endPos = pos + 1 - (pQueryHandle->outputCapacity - numOfRows);
}
start = endPos;
end = pos;
}
numOfRows =
copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
} else {
numOfRows = copyDataFromFileBlock(pQueryHandle, &blockInfo, pQueryHandle->outputCapacity, numOfRows, &pos, endPos); while(numOfRows < pQueryHandle->outputCapacity && node != NULL &&
(((dataRowKey(SL_GET_NODE_DATA(node)) <= pQueryHandle->window.ekey) && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
if (numOfRows >= pQueryHandle->outputCapacity || ((dataRowKey(SL_GET_NODE_DATA(node)) >= pQueryHandle->window.ekey) && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)))) {
pQueryHandle->cur.lastKey >= blockInfo.window.ekey || SDataRow row = SL_GET_NODE_DATA(node);
pQueryHandle->cur.lastKey > pQueryHandle->window.ekey) { TSKEY key = dataRowKey(row);
break;
copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->mixBlock = true;
tSkipListIterNext(pCheckInfo->iter);
node = tSkipListIterGet(pCheckInfo->iter);
} }
} }
} }
} }
cur->blockCompleted = ((pos >= endPos && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
(pos <= endPos && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)));
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
int32_t reqiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < reqiredNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
}
}
pCheckInfo->lastKey = cur->lastKey; pCheckInfo->lastKey = cur->lastKey;
pQueryHandle->realNumOfRows = numOfRows; pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows; cur->rows = numOfRows;
@ -1103,7 +1288,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
// current block is done, try next // current block is done, try next
if (!cur->mixBlock || cur->pos >= pBlockInfo->pBlock.compBlock->numOfPoints) { if (!cur->mixBlock || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) || if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { (cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists // all data blocks in current file has been checked already, try next file if exists
@ -1111,14 +1296,17 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
} else { // next block of the same file } else { // next block of the same file
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) ? 1 : -1; int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) ? 1 : -1;
cur->slot += step; cur->slot += step;
cur->mixBlock = false;
cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo); return loadFileDataBlock(pQueryHandle, pNext->pBlock.compBlock, pNext->pTableCheckInfo);
} }
} else { } else {
SArray* sa = getDefaultLoadColumns(pQueryHandle, true); SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa); mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa);
return pQueryHandle->pColumns; return true;
} }
} }
} }
@ -1145,28 +1333,44 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pqHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0); assert(numOfTables > 0);
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) { if (pQueryHandle->checkFiles) {
if (pQueryHandle->checkFiles) { if (getDataBlocksInFiles(pQueryHandle)) {
if (getDataBlocksInFiles(pQueryHandle)) { return true;
return true;
}
pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false;
} }
return doHasDataInBuffer(pQueryHandle); pQueryHandle->activeIndex = 0;
} else { // starts from the buffer in case of descending timestamp order check data blocks pQueryHandle->checkFiles = false;
if (!pQueryHandle->checkFiles) {
if (doHasDataInBuffer(pQueryHandle)) {
return true;
}
pQueryHandle->checkFiles = true;
}
return getDataBlocksInFiles(pQueryHandle);
} }
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
return doHasDataInBuffer(pQueryHandle);
} else {
// assert(0);
return false;
}
// if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
// if (pQueryHandle->checkFiles) {
// if (getDataBlocksInFiles(pQueryHandle)) {
// return true;
// }
//
// pQueryHandle->activeIndex = 0;
// pQueryHandle->checkFiles = false;
// }
//
// return doHasDataInBuffer(pQueryHandle);
// } else { // starts from the buffer in case of descending timestamp order check data blocks
// if (!pQueryHandle->checkFiles) {
// if (doHasDataInBuffer(pQueryHandle)) {
// return true;
// }
//
// pQueryHandle->checkFiles = true;
// }
//
// return getDataBlocksInFiles(pQueryHandle);
// }
} }
void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
@ -1321,51 +1525,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo; STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
pTable = pCheckInfo->pTableObj; pTable = pCheckInfo->pTableObj;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock);
/*bool hasData = */initSkipListIterator(pHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) { if (pHandle->cur.mixBlock) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
assert(0);
if ((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.ekey)) {
doLoadFileDataBlock(pHandle, pBlockInfo->pBlock.compBlock, pCheckInfo);
SArray* sa = getDefaultLoadColumns(pHandle, true);
mergeDataInDataBlock(pHandle, pCheckInfo, pBlockInfo->pBlock.compBlock, sa);
taosArrayDestroy(sa);
SDataBlockInfo blockInfo = { SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid, .uid = pTable->tableId.uid,
.tid = pTable->tableId.tid, .tid = pTable->tableId.tid,
@ -1375,26 +1536,30 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle) {
return blockInfo; return blockInfo;
} else { } else {
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->pBlock.compBlock);
return binfo;
}
// else {
/* /*
* no data in mem or imem, or data in mem|imem with greater timestamp, no need to load data in buffer * no data in mem or imem, or data in mem/imem with greater timestamp, no need to load data in buffer
* return the file block info directly * return the file block info directly
*/ */
if (!pHandle->cur.mixBlock && pHandle->cur.rows == pBlockInfo->pBlock.compBlock->numOfPoints) { // if (!pHandle->cur.mixBlock || pHandle->cur.rows == pBlockInfo->pBlock.compBlock->numOfPoints) {
pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + step; // pBlockInfo->pTableCheckInfo->lastKey = pBlockInfo->pBlock.compBlock->keyLast + step;
assert(pHandle->outputCapacity >= pBlockInfo->pBlock.compBlock->numOfPoints); // assert(pHandle->outputCapacity >= pBlockInfo->pBlock.compBlock->numOfPoints);
//
return binfo; // return binfo;
} else { // } else {
SDataBlockInfo blockInfo = { // SDataBlockInfo blockInfo = {
.uid = pTable->tableId.uid, // .uid = pTable->tableId.uid,
.tid = pTable->tableId.tid, // .tid = pTable->tableId.tid,
.rows = pHandle->cur.rows, // .rows = pHandle->cur.rows,
.window = pHandle->cur.win, // .window = pHandle->cur.win,
}; // };
//
return blockInfo; // return blockInfo;
} // }
} // }
} else { } else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
pTable = pCheckInfo->pTableObj; pTable = pCheckInfo->pTableObj;