fix(tsdb): overlap check take pk into consideration.
This commit is contained in:
parent
eef5e6e409
commit
59a3e8ca40
|
@ -64,13 +64,15 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
||||||
col_id_t colId = -1;
|
col_id_t colId = -1;
|
||||||
|
|
||||||
SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t));
|
SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t));
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||||
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
|
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
|
||||||
|
|
||||||
if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) {
|
if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) {
|
||||||
funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i);
|
funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i);
|
||||||
}
|
|
||||||
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
|
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
|
||||||
|
}
|
||||||
|
|
||||||
if (slotIds[i] == -1) {
|
if (slotIds[i] == -1) {
|
||||||
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
|
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
|
||||||
|
|
|
@ -1325,7 +1325,7 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
|
||||||
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
|
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
|
static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
|
||||||
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
|
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
|
||||||
SBrinRecord* pRecord) {
|
SBrinRecord* pRecord) {
|
||||||
bool asc = ASCENDING_TRAVERSE(order);
|
bool asc = ASCENDING_TRAVERSE(order);
|
||||||
|
@ -1391,12 +1391,40 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: this attribute could be acquired during extractin the global ordered block list.
|
// todo: this attribute could be acquired during extractin the global ordered block list.
|
||||||
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) {
|
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order, int32_t pkType, int32_t numOfPk) {
|
||||||
// it is the last block in current file, no chance to overlap with neighbor blocks.
|
// it is the last block in current file, no chance to overlap with neighbor blocks.
|
||||||
if (ASCENDING_TRAVERSE(order)) {
|
if (ASCENDING_TRAVERSE(order)) {
|
||||||
return pBlock->lastKey == pRec->firstKey.key.ts;
|
if (pBlock->lastKey == pRec->firstKey.key.ts) {
|
||||||
|
if (numOfPk > 0) {
|
||||||
|
SValue v1 = {.type = pkType};
|
||||||
|
if (IS_VAR_DATA_TYPE(pkType)) {
|
||||||
|
v1.pData = (uint8_t*)varDataVal(pBlock->lastPk.pData), v1.nData = varDataLen(pBlock->lastPk.pData);
|
||||||
} else {
|
} else {
|
||||||
return pBlock->firstKey == pRec->lastKey.key.ts;
|
v1.val = pBlock->lastPk.val;
|
||||||
|
}
|
||||||
|
return (tValueCompare(&v1, &pRec->firstKey.key.pks[0]) == 0);
|
||||||
|
} else { // no pk
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pBlock->firstKey == pRec->lastKey.key.ts) {
|
||||||
|
if (numOfPk > 0) {
|
||||||
|
SValue v1 = {.type = pkType};
|
||||||
|
if (IS_VAR_DATA_TYPE(pkType)) {
|
||||||
|
v1.pData = (uint8_t*)varDataVal(pBlock->firstPk.pData), v1.nData = varDataLen(pBlock->firstPk.pData);
|
||||||
|
} else {
|
||||||
|
v1.val = pBlock->firstPk.val;
|
||||||
|
}
|
||||||
|
return (tValueCompare(&v1, &pRec->lastKey.key.pks[0]) == 0);
|
||||||
|
} else { // no pk
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1432,13 +1460,16 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
|
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
|
||||||
SBrinRecord rec = {0};
|
SBrinRecord rec = {0};
|
||||||
int32_t neighborIndex = 0;
|
int32_t neighborIndex = 0;
|
||||||
|
int32_t order = pReader->info.order;
|
||||||
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
|
||||||
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
|
bool hasNeighbor =
|
||||||
pReader->info.order, &rec);
|
getNeighborBlockOfTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, order, &rec);
|
||||||
|
|
||||||
// overlap with neighbor
|
// overlap with neighbor
|
||||||
if (hasNeighbor) {
|
if (hasNeighbor) {
|
||||||
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order);
|
pInfo->overlapWithNeighborBlock =
|
||||||
|
overlapWithNeighborBlock2(pBlockInfo, &rec, order, pSupInfo->pk.type, pSupInfo->numOfPks);
|
||||||
}
|
}
|
||||||
|
|
||||||
SBrinRecord pRecord;
|
SBrinRecord pRecord;
|
||||||
|
@ -1446,7 +1477,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
|
|
||||||
// has duplicated ts of different version in this block
|
// has duplicated ts of different version in this block
|
||||||
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
|
pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0);
|
||||||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order);
|
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, order);
|
||||||
|
|
||||||
// todo handle the primary key overlap case
|
// todo handle the primary key overlap case
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||||
|
@ -2381,27 +2412,28 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
||||||
STsdbReader* pReader, bool* loadNeighbor) {
|
STsdbReader* pReader, bool* loadNeighbor) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
|
int32_t order = pReader->info.order;
|
||||||
|
SDataBlockIter* pIter = &pReader->status.blockIter;
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
|
||||||
int32_t nextIndex = -1;
|
int32_t nextIndex = -1;
|
||||||
|
SBrinRecord rec = {0};
|
||||||
|
|
||||||
*loadNeighbor = false;
|
*loadNeighbor = false;
|
||||||
|
bool hasNeighbor = getNeighborBlockOfTable(pIter, pBlockInfo, pBlockScanInfo, &nextIndex, order, &rec);
|
||||||
SBrinRecord rec = {0};
|
|
||||||
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex,
|
|
||||||
pReader->info.order, &rec);
|
|
||||||
if (!hasNeighbor) { // do nothing
|
if (!hasNeighbor) { // do nothing
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order)) { // load next block
|
// load next block
|
||||||
|
if (overlapWithNeighborBlock2(pBlockInfo, &rec, order, pReader->suppInfo.pk.type, pReader->suppInfo.numOfPks)) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
|
||||||
// 1. find the next neighbor block in the scan block list
|
// 1. find the next neighbor block in the scan block list
|
||||||
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
|
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
|
||||||
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
|
|
||||||
|
|
||||||
// 2. remove it from the scan block list
|
// 2. remove it from the scan block list
|
||||||
|
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
|
||||||
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);
|
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);
|
||||||
|
|
||||||
// 3. load the neighbor block, and set it to be the currently accessed file data block
|
// 3. load the neighbor block, and set it to be the currently accessed file data block
|
||||||
|
|
Loading…
Reference in New Issue