Merge pull request #25507 from taosdata/fix/3_liaohj

fix(tsdb): overlap check take pk into consideration.
This commit is contained in:
Haojun Liao 2024-04-26 09:06:38 +08:00 committed by GitHub
commit feb176149e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 56 additions and 22 deletions

View File

@ -64,13 +64,15 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
col_id_t colId = -1; col_id_t colId = -1;
SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t)); SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t));
for (int32_t i = 0; i < pReader->numOfCols; ++i) { for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
int32_t funcType = FUNCTION_TYPE_CACHE_LAST; int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) { if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) {
funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i); funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i);
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
} }
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
if (slotIds[i] == -1) { if (slotIds[i] == -1) {
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {

View File

@ -1325,9 +1325,9 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer); (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
} }
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order, STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord) { SBrinRecord* pRecord) {
bool asc = ASCENDING_TRAVERSE(order); bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
@ -1391,12 +1391,40 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
} }
// todo: this attribute could be acquired during extractin the global ordered block list. // todo: this attribute could be acquired during extractin the global ordered block list.
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) { static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order, int32_t pkType, int32_t numOfPk) {
// it is the last block in current file, no chance to overlap with neighbor blocks. // it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) { if (ASCENDING_TRAVERSE(order)) {
return pBlock->lastKey == pRec->firstKey.key.ts; if (pBlock->lastKey == pRec->firstKey.key.ts) {
if (numOfPk > 0) {
SValue v1 = {.type = pkType};
if (IS_VAR_DATA_TYPE(pkType)) {
v1.pData = (uint8_t*)varDataVal(pBlock->lastPk.pData), v1.nData = varDataLen(pBlock->lastPk.pData);
} else {
v1.val = pBlock->lastPk.val;
}
return (tValueCompare(&v1, &pRec->firstKey.key.pks[0]) == 0);
} else { // no pk
return true;
}
} else {
return false;
}
} else { } else {
return pBlock->firstKey == pRec->lastKey.key.ts; if (pBlock->firstKey == pRec->lastKey.key.ts) {
if (numOfPk > 0) {
SValue v1 = {.type = pkType};
if (IS_VAR_DATA_TYPE(pkType)) {
v1.pData = (uint8_t*)varDataVal(pBlock->firstPk.pData), v1.nData = varDataLen(pBlock->firstPk.pData);
} else {
v1.val = pBlock->firstPk.val;
}
return (tValueCompare(&v1, &pRec->lastKey.key.pks[0]) == 0);
} else { // no pk
return true;
}
} else {
return false;
}
} }
} }
@ -1430,15 +1458,18 @@ static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersio
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) { STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
SBrinRecord rec = {0}; SBrinRecord rec = {0};
int32_t neighborIndex = 0; int32_t neighborIndex = 0;
int32_t order = pReader->info.order;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, bool hasNeighbor =
pReader->info.order, &rec); getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec);
// overlap with neighbor // overlap with neighbor
if (hasNeighbor) { if (hasNeighbor) {
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order); pInfo->overlapWithNeighborBlock =
overlapWithNeighborBlock2(pBlockInfo, &rec, order, pSupInfo->pk.type, pSupInfo->numOfPks);
} }
SBrinRecord pRecord; SBrinRecord pRecord;
@ -1446,7 +1477,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
// has duplicated ts of different version in this block // has duplicated ts of different version in this block
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0); pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order); pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, order);
// todo handle the primary key overlap case // todo handle the primary key overlap case
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
@ -2380,28 +2411,29 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, bool* loadNeighbor) { STsdbReader* pReader, bool* loadNeighbor) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; int32_t order = pReader->info.order;
int32_t nextIndex = -1; SDataBlockIter* pIter = &pReader->status.blockIter;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
int32_t nextIndex = -1;
SBrinRecord rec = {0};
*loadNeighbor = false; *loadNeighbor = false;
bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec);
SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex,
pReader->info.order, &rec);
if (!hasNeighbor) { // do nothing if (!hasNeighbor) { // do nothing
return code; return code;
} }
if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order)) { // load next block // load next block
if (overlapWithNeighborBlock2(pBlockInfo, &rec, order, pReader->suppInfo.pk.type, pReader->suppInfo.numOfPks)) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list // 1. find the next neighbor block in the scan block list
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex); STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
// 2. remove it from the scan block list // 2. remove it from the scan block list
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step); setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block // 3. load the neighbor block, and set it to be the currently accessed file data block