refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-03-21 17:47:40 +08:00
parent 486a83671c
commit 8bdd12c3a8
4 changed files with 111 additions and 35 deletions

View File

@ -117,11 +117,69 @@ static int32_t pkComp2(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slotId, SRowKey* pKey) {
pKey->ts = pBlock->aTSKEY[irow];
if (slotId == -1) {
pKey->numOfPKs = 0;
return;
}
pKey->numOfPKs = 1;
SColData* pColData = &pBlock->aColData[slotId];
SColVal cv;
tColDataGetValue(pColData, irow, &cv);
if (IS_NUMERIC_TYPE(cv.value.type)) {
pKey->pks[0].val = cv.value.val;
} else {
pKey->pks[0].nData = cv.value.nData;
memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData);
}
}
// for test purpose, todo remove it
static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) {
int32_t n = 0;
n += tGetI8(p + n, &index->type);
n += tGetU32v(p + n, &index->offset);
return n;
}
static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
pKey->ts = pKey->ts;
pKey->numOfPKs = pRow->numOfPKs;
if (pKey->numOfPKs == 0) {
return;
}
SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
uint8_t *data = pRow->data;
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
data += tGetPrimaryKeyIndex(data, &indices[i]);
}
// primary keys
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
pKey->pks[i].type = indices[i].type;
if (IS_VAR_DATA_TYPE(indices[i].type)) {
tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData);
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData);
pKey->pks[i].pData += pKey->pks[i].nData;
} else {
pKey->pks[i].val = *(int64_t*) data + indices[i].offset;
}
}
}
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) {
pSupInfo->pk.pk = 0;
pSupInfo->numOfPks = 0;
pSupInfo->pk.slotId = -1;
pSupInfo->pkSrcSlot = -1;
pSupInfo->pkDstSlot = -1;
pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols;
@ -145,7 +203,8 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
if (pCols[i].pk) {
pSupInfo->pk = pCols[i];
pSupInfo->pk.slotId = pSlotIdList[i];
pSupInfo->pkSrcSlot = i;
pSupInfo->pkDstSlot = pSlotIdList[i];
pSupInfo->numOfPks += 1;
}
}
@ -712,6 +771,24 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int
// pDumpInfo->lastKey.key.ts = maxKey + step;
}
static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks,
bool asc) {
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = numOfPks;
if (pKey->numOfPKs <= 0) {
return;
}
if (IS_NUMERIC_TYPE(pKey->pks[0].type)) {
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
} else {
uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData;
pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen;
memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData);
}
}
static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
@ -1116,7 +1193,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
pResBlock->info.rows = dumpedRows;
pDumpInfo->rowIndex += step * dumpedRows;
tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey);
tColRowGetKeyDeepCopy(pBlockData, pDumpInfo->rowIndex - step, pSupInfo->pkSrcSlot, pLastProcKey);
// check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
@ -2277,12 +2354,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
}
if (copied) {
if (pReader->suppInfo.numOfPks == 0) {
pBlockScanInfo->lastProcKey.ts = key;
} else { // todo use deep copy instead of shallow copy
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
}
// if (pReader->suppInfo.numOfPks == 0) {
// pBlockScanInfo->lastProcKey.ts = key;
// } else { // todo use deep copy instead of shallow copy
// int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
// tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
// }
return TSDB_CODE_SUCCESS;
} else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
@ -2328,7 +2405,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
}
if (copied) {
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
// tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
@ -2436,7 +2513,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf
pResBlock->info.version = pReader->info.verRange.maxVer;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pk.slotId, ASCENDING_TRAVERSE(pReader->info.order));
blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pkDstSlot, ASCENDING_TRAVERSE(pReader->info.order));
setComposedBlockFlag(pReader, true);
// todo update the pk range for current return data block
@ -2822,16 +2899,10 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
}
}
// update the last key for the corresponding table
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
// update the last key for the corresponding table
SRowKey* pKey = &pScanInfo->lastProcKey;
pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey;
pKey->numOfPKs = pReader->suppInfo.numOfPks;
// todo opt allocation, and handle varchar primary key
pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val;
updateLastKeyInfo(&pScanInfo->lastProcKey, pBlockInfo, pInfo, pReader->suppInfo.numOfPks, asc);
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
@ -3928,11 +3999,6 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
pBlock->info.dataLoad = 1;
pBlock->info.rows += 1;
// todo no version
TSDBROW row = {.pTSRow = pTSRow, .type = TSDBROW_ROW_FMT};
tRowGetKeyEx(&row, &pScanInfo->lastProcKey);
// pScanInfo->lastProcKey = pTSRow->ts;
return TSDB_CODE_SUCCESS;
}
@ -4008,13 +4074,14 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
return code;
}
tRowGetKey(row.pTSRow, &pBlockScanInfo->lastProcKey);
tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey);
} else {
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) {
return code;
}
tColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey);
tColRowGetKeyDeepCopy(row.pBlockData, row.iRow, pReader->suppInfo.pkSrcSlot, &pBlockScanInfo->lastProcKey);
}
// no data in buffer, return immediately

View File

@ -160,6 +160,8 @@ typedef struct SBlockLoadSuppInfo {
int32_t numOfCols;
int32_t numOfPks;
SColumnInfo pk;
int32_t pkSrcSlot;
int32_t pkDstSlot;
bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo;

View File

@ -640,15 +640,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) {
if (IS_NUMERIC_TYPE(pVal->type)) {
continue;
}
uint8_t *p = taosMemoryMalloc(pVal->nData);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
memcpy(p, pVal->pData, pVal->nData);
pVal->pData = p;
memcpy(pVal->pData, pVal->pData, pVal->nData);
}
}

View File

@ -1197,6 +1197,21 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
{ // todo :refactor:
for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i);
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId);
pInfo->pResBlock->info.pks[0].type = pInfoData->info.type;
pInfo->pResBlock->info.pks[1].type = pInfoData->info.type;
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
pInfo->pResBlock->info.pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
pInfo->pResBlock->info.pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
}
}
}
}
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;