refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-03-21 10:44:27 +08:00
parent 100bc2de4b
commit 0c578ab6c0
5 changed files with 79 additions and 39 deletions

View File

@ -199,6 +199,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
const SDataBlockInfo* pBlockInfo); const SDataBlockInfo* pBlockInfo);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);

View File

@ -490,9 +490,9 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
return 0; return 0;
} }
if (pDataBlock->info.rows > 0) { // if (pDataBlock->info.rows > 0) {
// ASSERT(pDataBlock->info.dataLoad == 1); // ASSERT(pDataBlock->info.dataLoad == 1);
} // }
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
if (numOfCols <= 0) { if (numOfCols <= 0) {
@ -515,6 +515,51 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
return 0; return 0;
} }
int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) {
if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) {
return 0;
}
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
if (numOfCols <= 0) {
return -1;
}
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
return 0;
}
void* skey = colDataGetData(pColInfoData, 0);
void* ekey = colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
if (asc) {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int64_t*) skey;
pDataBlock->info.pks[1].val = *(int64_t*) ekey;
} else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
pDataBlock->info.pks[0].nData = varDataLen(skey);
memcpy(pDataBlock->info.pks[1].pData, varDataVal(ekey), varDataLen(ekey));
pDataBlock->info.pks[1].nData = varDataLen(ekey);
}
} else {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int64_t*) ekey;
pDataBlock->info.pks[1].val = *(int64_t*) skey;
} else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
pDataBlock->info.pks[0].nData = varDataLen(ekey);
memcpy(pDataBlock->info.pks[1].pData, varDataVal(skey), varDataLen(skey));
pDataBlock->info.pks[1].nData = varDataLen(skey);
}
}
return 0;
}
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
int32_t capacity = pDest->info.capacity; int32_t capacity = pDest->info.capacity;

View File

@ -110,6 +110,10 @@ static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) {
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) { int32_t numOfCols) {
pSupInfo->pk.pk = 0;
pSupInfo->numOfPks = 0;
pSupInfo->pk.slotId = -1;
pSupInfo->smaValid = true; pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols; pSupInfo->numOfCols = numOfCols;
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES)); pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
@ -131,7 +135,8 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
} }
if (pCols[i].pk) { if (pCols[i].pk) {
pSupInfo->pkSlotId = pCols[i].slotId; pSupInfo->pk = pCols[i];
pSupInfo->numOfPks += 1;
} }
} }
@ -201,7 +206,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
pLReader->order = pReader->info.order; pLReader->order = pReader->info.order;
pLReader->window = pReader->info.window; pLReader->window = pReader->info.window;
pLReader->verRange = pReader->info.verRange; pLReader->verRange = pReader->info.verRange;
pLReader->numOfPks = pReader->numOfPks; pLReader->numOfPks = pReader->suppInfo.numOfPks;
pLReader->uid = 0; pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pLReader->mergeTree);
@ -434,8 +439,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pReader->type = pCond->type; pReader->type = pCond->type;
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
pReader->numOfPks = -1;
pReader->pkChecked = false;
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -453,6 +456,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols); setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
if (pSup->numOfPks > 0) {
pReader->pkComparFn = getComparFunc(pSup->pk.type, 0);
}
code = tBlockDataCreate(&pReader->status.fileBlockData); code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
@ -1523,12 +1530,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowGetKey(pNextRow, &nextKey); tsdbRowGetKey(pNextRow, &nextKey);
if (!pReader->pkChecked) {
pReader->pkComparFn = getComparFunc(pSttKey->key.pks[0].type, 0);
pReader->pkChecked = true;
pReader->numOfPks = pSttKey->key.numOfPKs;
}
if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) { if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) {
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
if (code) { if (code) {
@ -1782,7 +1783,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
} }
// key == tsLast. ts is equal and the primary key exists // key == tsLast. ts is equal and the primary key exists
if (pReader->numOfPks > 0) { if (pReader->suppInfo.numOfPks > 0) {
int32_t res = pkComp1(pReader, pSttKey, &fRow); int32_t res = pkComp1(pReader, pSttKey, &fRow);
if (res < 0) { if (res < 0) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
@ -2277,7 +2278,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
} }
if (copied) { if (copied) {
if (pReader->numOfPks == 0) { if (pReader->suppInfo.numOfPks == 0) {
pBlockScanInfo->lastProcKey.key.ts = key; pBlockScanInfo->lastProcKey.key.ts = key;
} else { // todo use deep copy instead of shallow copy } else { // todo use deep copy instead of shallow copy
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
@ -2438,6 +2439,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
pResBlock->info.version = pReader->info.verRange.maxVer; pResBlock->info.version = pReader->info.verRange.maxVer;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pk.slotId, ASCENDING_TRAVERSE(pReader->info.order));
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
// todo update the pk range for current return data block // todo update the pk range for current return data block
@ -2810,20 +2812,30 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
pInfo->version = pReader->info.verRange.maxVer; pInfo->version = pReader->info.verRange.maxVer;
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
if (pReader->suppInfo.pk.pk) {
if (IS_NUMERIC_TYPE(pReader->suppInfo.pk.type)) {
pInfo->pks[0].val = pBlockInfo->firstPk.val;
pInfo->pks[1].val = pBlockInfo->lastPk.val;
} else {
memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen);
memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen);
pInfo->pks[0].nData = pBlockInfo->firstPKLen;
pInfo->pks[1].nData = pBlockInfo->lastPKLen;
}
}
setComposedBlockFlag(pReader, false); setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
// update the last key for the corresponding table // update the last key for the corresponding table
SRowKey* pKey = &pScanInfo->lastProcKey.key; SRowKey* pKey = &pScanInfo->lastProcKey.key;
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = pReader->numOfPks; pKey->numOfPKs = pReader->suppInfo.numOfPks;
// todo opt allocation, and handle varchar primary key // todo opt allocation, and handle varchar primary key
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
pInfo->pks[0].val = pBlockInfo->firstPk.val;
pInfo->pks[1].val = pBlockInfo->lastPk.val;
tsdbDebug("%p uid:%" PRIu64 tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, " " clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
@ -3482,14 +3494,6 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
int32_t order = pReader->info.order; int32_t order = pReader->info.order;
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
if (!pReader->pkChecked) {
STsdbRowKey k;
tsdbRowGetKey(pRow, &k);
pReader->pkComparFn = getComparFunc(k.key.pks[0].type, 0);
pReader->pkChecked = true;
}
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
if (outOfTimeWindow(key.ts, &pReader->info.window)) { if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false; pIter->hasVal = false;

View File

@ -372,12 +372,6 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor
pBlockInfo->count = record->count; pBlockInfo->count = record->count;
SRowKey* pFirstKey = &record->firstKey.key; SRowKey* pFirstKey = &record->firstKey.key;
if (!pReader->pkChecked) {
pReader->pkChecked = true;
pReader->numOfPks = pFirstKey->numOfPKs;
pReader->pkComparFn = getComparFunc(pFirstKey->pks[0].type, 0);
}
if (pFirstKey->numOfPKs > 0) { if (pFirstKey->numOfPKs > 0) {
if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
pBlockInfo->firstPk.val = pFirstKey->pks[0].val; pBlockInfo->firstPk.val = pFirstKey->pks[0].val;
@ -404,8 +398,6 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
pBlockIter->numOfBlocks = numOfBlocks; pBlockIter->numOfBlocks = numOfBlocks;
taosArrayClear(pBlockIter->blockList); taosArrayClear(pBlockIter->blockList);
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order. // access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = taosArrayGetSize(pTableList); int32_t numOfTables = taosArrayGetSize(pTableList);

View File

@ -157,9 +157,10 @@ typedef struct SBlockLoadSuppInfo {
SColumnDataAgg tsColAgg; SColumnDataAgg tsColAgg;
int16_t* colId; int16_t* colId;
int16_t* slotId; int16_t* slotId;
int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
int16_t pkSlotId; int32_t numOfCols;
int32_t numOfPks;
SColumnInfo pk;
bool smaValid; // the sma on all queried columns are activated bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
@ -218,7 +219,6 @@ typedef struct SDataBlockIter {
SArray* blockList; // SArray<SFileDataBlockInfo> SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order; int32_t order;
SDataBlk block; // current SDataBlk data SDataBlk block; // current SDataBlk data
SSHashObj* pTableMap;
} SDataBlockIter; } SDataBlockIter;
typedef struct SFileBlockDumpInfo { typedef struct SFileBlockDumpInfo {
@ -281,8 +281,6 @@ struct STsdbReader {
TsdReaderNotifyCbFn notifyFn; TsdReaderNotifyCbFn notifyFn;
void* notifyParam; void* notifyParam;
__compar_fn_t pkComparFn; __compar_fn_t pkComparFn;
int32_t numOfPks;
bool pkChecked;
}; };
typedef struct SBrinRecordIter { typedef struct SBrinRecordIter {