[td-225]
This commit is contained in:
parent
10af6ae350
commit
8795d19009
|
@ -734,10 +734,10 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
|
||||
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
|
||||
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols);
|
||||
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle);
|
||||
static void copyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
|
||||
static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos);
|
||||
|
||||
static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
|
||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||
|
@ -801,7 +801,7 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
|
|||
cur->lastKey = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.window.ekey + 1): (binfo.window.skey -1);
|
||||
} else { // partially copy to dest buffer
|
||||
int32_t endPos = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.rows - 1): 0;
|
||||
copyRowsFromFileBlock(pQueryHandle, pCheckInfo, &binfo, endPos);
|
||||
copyAllRemainRowsFromFileBlock(pQueryHandle, pCheckInfo, &binfo, endPos);
|
||||
cur->mixBlock = true;
|
||||
}
|
||||
|
||||
|
@ -929,7 +929,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
|||
return midPos;
|
||||
}
|
||||
|
||||
int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
||||
int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
||||
char* pData = NULL;
|
||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
||||
|
||||
|
@ -1154,7 +1154,7 @@ static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle) {
|
|||
}
|
||||
}
|
||||
|
||||
static void copyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
|
||||
static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos) {
|
||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||
|
||||
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
||||
|
@ -1173,21 +1173,20 @@ static void copyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
|||
SWAP(start, end, int32_t);
|
||||
}
|
||||
|
||||
int32_t numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, start, end);
|
||||
assert(pQueryHandle->outputCapacity >= (end - start + 1));
|
||||
int32_t numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, 0, start, end);
|
||||
|
||||
// the time window should always be ascending order: skey <= ekey
|
||||
cur->win = (STimeWindow) {.skey = tsArray[start], .ekey = tsArray[end]};
|
||||
cur->mixBlock = (start == 0 && end == pBlockInfo->rows - 1);
|
||||
cur->mixBlock = (start > 0 && end < pBlockInfo->rows - 1);
|
||||
cur->lastKey = tsArray[endPos] + step;
|
||||
|
||||
pos += endPos + step;
|
||||
|
||||
cur->blockCompleted =
|
||||
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
|
||||
cur->blockCompleted = true;
|
||||
|
||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||
moveDataToFront(pQueryHandle, numOfRows, numOfCols);
|
||||
|
||||
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
|
||||
pos = endPos + step;
|
||||
updateInfoAfterMerge(pQueryHandle, pCheckInfo, numOfRows, pos);
|
||||
doCheckGeneratedBlockRange(pQueryHandle);
|
||||
|
||||
|
@ -1245,7 +1244,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
|
||||
// no data in buffer, load data from file directly
|
||||
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
|
||||
copyRowsFromFileBlock(pQueryHandle, pCheckInfo, &blockInfo, endPos);
|
||||
copyAllRemainRowsFromFileBlock(pQueryHandle, pCheckInfo, &blockInfo, endPos);
|
||||
return;
|
||||
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
||||
SSkipListNode* node = NULL;
|
||||
|
@ -1297,7 +1296,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
int32_t qstart = 0, qend = 0;
|
||||
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
|
||||
|
||||
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
|
||||
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
|
||||
pos += (qend - qstart + 1) * step;
|
||||
|
||||
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
|
||||
|
@ -1321,7 +1320,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
int32_t start = -1, end = -1;
|
||||
getQualifiedRowsPos(pQueryHandle, pos, endPos, numOfRows, &start, &end);
|
||||
|
||||
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
||||
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
|
||||
pos += (end - start + 1) * step;
|
||||
|
||||
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
|
||||
|
@ -2192,7 +2191,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
|
|||
}
|
||||
|
||||
// todo refactor
|
||||
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
|
||||
int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
|
||||
|
||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
|
||||
|
|
|
@ -532,7 +532,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
|
|||
|
||||
if (pCacheObj->numOfElemsInTrash == 0) {
|
||||
if (pCacheObj->pTrash != NULL) {
|
||||
uError("cache:%s, key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash);
|
||||
uError("cache:%s, key:inconsistency data in cache, numOfElem in trashcan:%d", pCacheObj->name, pCacheObj->numOfElemsInTrash);
|
||||
}
|
||||
|
||||
pCacheObj->pTrash = NULL;
|
||||
|
@ -549,7 +549,7 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) {
|
|||
}
|
||||
|
||||
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
|
||||
uError("cache:%s, key:%p, %p removed from trashcan. numOfElem in trash:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data,
|
||||
uError("cache:%s, key:%p, %p removed from trashcan. numOfElem in trashcan:%d", pCacheObj->name, pElem->pData->key, pElem->pData->data,
|
||||
pCacheObj->numOfElemsInTrash - 1);
|
||||
|
||||
STrashElem *p = pElem;
|
||||
|
|
Loading…
Reference in New Issue