Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_new_snapshot
This commit is contained in:
commit
79f71be7e0
|
@ -120,6 +120,92 @@ static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// find the earliest block that contains the required records
|
||||
static FORCE_INLINE int32_t findEarliestIndex(int32_t index, uint64_t uid, const SSttBlk* pBlockList, int32_t num, int32_t backward) {
|
||||
int32_t i = index;
|
||||
int32_t step = backward? 1:-1;
|
||||
while (i >= 0 && i < num && uid >= pBlockList[i].minUid && uid <= pBlockList[i].maxUid) {
|
||||
i += step;
|
||||
}
|
||||
return i - step;
|
||||
}
|
||||
|
||||
static int32_t binarySearchForStartBlock(SSttBlk*pBlockList, int32_t num, uint64_t uid, int32_t backward) {
|
||||
int32_t midPos = -1;
|
||||
if (num <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t firstPos = 0;
|
||||
int32_t lastPos = num - 1;
|
||||
|
||||
// find the first position which is bigger than the key
|
||||
if ((uid > pBlockList[lastPos].maxUid) || (uid < pBlockList[firstPos].minUid)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
if (uid >= pBlockList[firstPos].minUid && uid <= pBlockList[firstPos].maxUid) {
|
||||
return findEarliestIndex(firstPos, uid, pBlockList, num, backward);
|
||||
}
|
||||
|
||||
if (uid > pBlockList[lastPos].maxUid || uid < pBlockList[firstPos].minUid) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfRows = lastPos - firstPos + 1;
|
||||
midPos = (numOfRows >> 1u) + firstPos;
|
||||
|
||||
if (uid < pBlockList[midPos].minUid) {
|
||||
lastPos = midPos - 1;
|
||||
} else if (uid > pBlockList[midPos].maxUid) {
|
||||
firstPos = midPos + 1;
|
||||
} else {
|
||||
return findEarliestIndex(midPos, uid, pBlockList, num, backward);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t findEarliestRow(int32_t index, uint64_t uid, const uint64_t* uidList, int32_t num, int32_t backward) {
|
||||
int32_t i = index;
|
||||
int32_t step = backward? 1:-1;
|
||||
while (i >= 0 && i < num && uid == uidList[i]) {
|
||||
i += step;
|
||||
}
|
||||
return i - step;
|
||||
}
|
||||
|
||||
static int32_t binarySearchForStartRowIndex(uint64_t* uidList, int32_t num, uint64_t uid, int32_t backward) {
|
||||
int32_t firstPos = 0;
|
||||
int32_t lastPos = num - 1;
|
||||
|
||||
// find the first position which is bigger than the key
|
||||
if ((uid > uidList[lastPos]) || (uid < uidList[firstPos])) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
if (uid == uidList[firstPos]) {
|
||||
return findEarliestRow(firstPos, uid, uidList, num, backward);
|
||||
}
|
||||
|
||||
if (uid > uidList[lastPos] || uid < uidList[firstPos]) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfRows = lastPos - firstPos + 1;
|
||||
int32_t midPos = (numOfRows >> 1u) + firstPos;
|
||||
|
||||
if (uid < uidList[midPos]) {
|
||||
lastPos = midPos - 1;
|
||||
} else if (uid > uidList[midPos]) {
|
||||
firstPos = midPos + 1;
|
||||
} else {
|
||||
return findEarliestRow(midPos, uid, uidList, num, backward);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) {
|
||||
int32_t code = 0;
|
||||
|
@ -141,39 +227,25 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
|||
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
} else {
|
||||
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||
SArray* pTmp = taosArrayInit(size, sizeof(SSttBlk));
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||
if (p->suid == suid) {
|
||||
taosArrayPush(pTmp, p);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBlockLoadInfo->aSttBlk);
|
||||
pBlockLoadInfo->aSttBlk = pTmp;
|
||||
}
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
|
||||
|
||||
// find the start block
|
||||
int32_t index = -1;
|
||||
if (!backward) { // asc
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||
if (p->suid != suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->minUid <= uid && p->maxUid >= uid) {
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else { // desc
|
||||
for (int32_t i = size - 1; i >= 0; --i) {
|
||||
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||
if (p->suid != suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (p->minUid <= uid && p->maxUid >= uid) {
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t index = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
|
||||
(*pIter)->iSttBlk = index;
|
||||
if (index != -1) {
|
||||
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
|
||||
|
@ -193,7 +265,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
|
|||
pIter->iSttBlk += step;
|
||||
|
||||
int32_t index = -1;
|
||||
size_t size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
|
||||
size_t size = pIter->pBlockLoadInfo->aSttBlk->size;//taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
|
||||
for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
|
||||
SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
|
||||
if ((!pIter->backward) && p->minUid > pIter->uid) {
|
||||
|
@ -232,9 +304,8 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
|
|||
}
|
||||
}
|
||||
|
||||
if (index == -1) {
|
||||
pIter->pSttBlk = NULL;
|
||||
} else {
|
||||
pIter->pSttBlk = NULL;
|
||||
if (index != -1) {
|
||||
pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
|
||||
}
|
||||
}
|
||||
|
@ -247,18 +318,27 @@ static void findNextValidRow(SLDataIter *pIter) {
|
|||
|
||||
SBlockData *pBlockData = loadBlockIfMissing(pIter);
|
||||
|
||||
// mostly we only need to find the start position for a given table
|
||||
if ((((i == 0) && (!pIter->backward)) || (i == pBlockData->nRow - 1 && pIter->backward)) && pBlockData->aUid != NULL) {
|
||||
i = binarySearchForStartRowIndex((uint64_t*)pBlockData->aUid, pBlockData->nRow, pIter->uid, pIter->backward);
|
||||
if (i == -1) {
|
||||
pIter->iRow = -1;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for (; i < pBlockData->nRow && i >= 0; i += step) {
|
||||
if (pBlockData->aUid != NULL) {
|
||||
if (!pIter->backward) {
|
||||
if (pBlockData->aUid[i] < pIter->uid) {
|
||||
/*if (pBlockData->aUid[i] < pIter->uid) {
|
||||
continue;
|
||||
} else if (pBlockData->aUid[i] > pIter->uid) {
|
||||
} else */if (pBlockData->aUid[i] > pIter->uid) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (pBlockData->aUid[i] > pIter->uid) {
|
||||
/*if (pBlockData->aUid[i] > pIter->uid) {
|
||||
continue;
|
||||
} else if (pBlockData->aUid[i] < pIter->uid) {
|
||||
} else */if (pBlockData->aUid[i] < pIter->uid) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1238,6 +1238,38 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||
while (1) {
|
||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
||||
if (!hasVal) {
|
||||
return false;
|
||||
}
|
||||
|
||||
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
TSDBKEY k = TSDBROW_KEY(&row);
|
||||
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
|
||||
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
|
||||
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo);
|
||||
if (hasVal) {
|
||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
if (next1 != ts) {
|
||||
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
|
||||
// always set the newest schema version in pReader->pSchema
|
||||
if (pReader->pSchema == NULL) {
|
||||
|
@ -1389,25 +1421,54 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
|||
|
||||
STSRow* pTSRow = NULL;
|
||||
SRowMerger merge = {0};
|
||||
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
|
||||
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
|
||||
// only last block exists
|
||||
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
||||
if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
|
||||
// merge with block data if ts == key
|
||||
if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tRowMerge(&merge, &fRow1);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
|
||||
|
||||
// merge with block data if ts == key
|
||||
if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
|
||||
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
tRowMergerClear(&merge);
|
||||
}
|
||||
} else { // not merge block data
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
|
||||
|
||||
// merge with block data if ts == key
|
||||
if (mergeBlockData && (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
|
||||
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
tRowMergerClear(&merge);
|
||||
}
|
||||
|
||||
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
tRowMergerClear(&merge);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1858,21 +1919,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
|||
|
||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||
|
||||
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||
while (1) {
|
||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
||||
if (!hasVal) {
|
||||
return false;
|
||||
}
|
||||
|
||||
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
TSDBKEY k = TSDBROW_KEY(&row);
|
||||
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||
// the last block reader has been initialized for this table.
|
||||
if (pLBlockReader->uid == pScanInfo->uid) {
|
||||
|
@ -1906,8 +1952,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
|
||||
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
||||
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
TSDBKEY key = TSDBROW_KEY(&row);
|
||||
return key.ts;
|
||||
return TSDBROW_TS(&row);
|
||||
}
|
||||
|
||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
||||
|
@ -3080,11 +3125,16 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
|||
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
||||
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
|
||||
|
||||
if (pData->cid < pCol->info.colId) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pData->cid == pCol->info.colId) {
|
||||
tColDataGetValue(pData, rowIndex, &cv);
|
||||
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
|
||||
j += 1;
|
||||
} else { // the specified column does not exist in file block, fill with null data
|
||||
} else if (pData->cid > pCol->info.colId) { // the specified column does not exist in file block, fill with null data
|
||||
colDataAppendNULL(pCol, outputRowIndex);
|
||||
}
|
||||
|
||||
|
@ -3206,11 +3256,18 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
}
|
||||
}
|
||||
|
||||
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
|
||||
if (pCond->suid != 0) {
|
||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, pCond->endVersion);
|
||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1);
|
||||
if (pReader->pSchema == NULL) {
|
||||
tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
|
||||
}
|
||||
} else if (taosArrayGetSize(pTableList) > 0) {
|
||||
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
|
||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, pCond->endVersion);
|
||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1);
|
||||
if (pReader->pSchema == NULL) {
|
||||
tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pTableList);
|
||||
|
|
|
@ -333,7 +333,7 @@ python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
|
|||
python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
|
||||
python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||
python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
|
||||
python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
|
||||
|
||||
#------------querPolicy 2-----------
|
||||
|
||||
|
|
Loading…
Reference in New Issue