diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index fa6e4c3ae8..290c90fde9 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -199,6 +199,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); +int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 79d0357c4f..d4c054af9d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1037,6 +1037,7 @@ typedef struct { uint8_t scale; int32_t bytes; int8_t type; + uint8_t pk; } SColumnInfo; typedef struct STimeWindow { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f0ecf2365c..5e334fb1ff 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -490,9 +490,9 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) return 0; } - if (pDataBlock->info.rows > 0) { +// if (pDataBlock->info.rows > 0) { // ASSERT(pDataBlock->info.dataLoad == 1); - } +// } size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); if (numOfCols <= 0) { @@ -515,6 +515,51 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) return 0; } +int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc) { + if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0 || pkColumnIndex == -1) { + return 0; + } + + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + if (numOfCols <= 0) { + return -1; + } + + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex); + if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) { + return 0; + } + + void* skey = colDataGetData(pColInfoData, 0); + void* ekey = colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); + + if (asc) { + if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { + pDataBlock->info.pks[0].val = *(int64_t*) skey; + pDataBlock->info.pks[1].val = *(int64_t*) ekey; + } else { // todo refactor + memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey)); + pDataBlock->info.pks[0].nData = varDataLen(skey); + + memcpy(pDataBlock->info.pks[1].pData, varDataVal(ekey), varDataLen(ekey)); + pDataBlock->info.pks[1].nData = varDataLen(ekey); + } + } else { + if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { + pDataBlock->info.pks[0].val = *(int64_t*) ekey; + pDataBlock->info.pks[1].val = *(int64_t*) skey; + } else { // todo refactor + memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey)); + pDataBlock->info.pks[0].nData = varDataLen(ekey); + + memcpy(pDataBlock->info.pks[1].pData, varDataVal(skey), varDataLen(skey)); + pDataBlock->info.pks[1].nData = varDataLen(skey); + } + } + + return 0; +} + int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { int32_t capacity = pDest->info.capacity; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 8b20b9e0f1..654e765ef7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -110,6 +110,10 @@ static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) { static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { + pSupInfo->pk.pk = 0; + pSupInfo->numOfPks = 0; + pSupInfo->pk.slotId = -1; + pSupInfo->smaValid = true; pSupInfo->numOfCols = numOfCols; pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES)); @@ -129,6 +133,11 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC } else { pSupInfo->buildBuf[i] = NULL; } + + if (pCols[i].pk) { + pSupInfo->pk = pCols[i]; + pSupInfo->numOfPks += 1; + } } return TSDB_CODE_SUCCESS; @@ -197,7 +206,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pLReader->order = pReader->info.order; pLReader->window = pReader->info.window; pLReader->verRange = pReader->info.verRange; - pLReader->numOfPks = pReader->numOfPks; + pLReader->numOfPks = pReader->suppInfo.numOfPks; pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); @@ -430,8 +439,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->type = pCond->type; pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket - pReader->numOfPks = -1; - pReader->pkChecked = false; code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); if (code != TSDB_CODE_SUCCESS) { @@ -449,6 +456,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols); + if (pSup->numOfPks > 0) { + pReader->pkComparFn = getComparFunc(pSup->pk.type, 0); + } + code = tBlockDataCreate(&pReader->status.fileBlockData); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -1519,12 +1530,6 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowGetKey(pNextRow, &nextKey); - if (!pReader->pkChecked) { - pReader->pkComparFn = getComparFunc(pSttKey->key.pks[0].type, 0); - pReader->pkChecked = true; - pReader->numOfPks = pSttKey->key.numOfPKs; - } - if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); if (code) { @@ -1778,7 +1783,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } // key == tsLast. ts is equal and the primary key exists - if (pReader->numOfPks > 0) { + if (pReader->suppInfo.numOfPks > 0) { int32_t res = pkComp1(pReader, pSttKey, &fRow); if (res < 0) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); @@ -2273,7 +2278,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } if (copied) { - if (pReader->numOfPks == 0) { + if (pReader->suppInfo.numOfPks == 0) { pBlockScanInfo->lastProcKey.key.ts = key; } else { // todo use deep copy instead of shallow copy int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; @@ -2434,6 +2439,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf pResBlock->info.version = pReader->info.verRange.maxVer; blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); + blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pk.slotId, ASCENDING_TRAVERSE(pReader->info.order)); setComposedBlockFlag(pReader, true); // todo update the pk range for current return data block @@ -2806,19 +2812,29 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->version = pReader->info.verRange.maxVer; pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey}; + if (pReader->suppInfo.pk.pk) { + if (IS_NUMERIC_TYPE(pReader->suppInfo.pk.type)) { + pInfo->pks[0].val = pBlockInfo->firstPk.val; + pInfo->pks[1].val = pBlockInfo->lastPk.val; + } else { + memcpy(pInfo->pks[0].pData, pBlockInfo->firstPk.pData, pBlockInfo->firstPKLen); + memcpy(pInfo->pks[1].pData, pBlockInfo->lastPk.pData, pBlockInfo->lastPKLen); + + pInfo->pks[0].nData = pBlockInfo->firstPKLen; + pInfo->pks[1].nData = pBlockInfo->lastPKLen; + } + } + setComposedBlockFlag(pReader, false); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); // update the last key for the corresponding table SRowKey* pKey = &pScanInfo->lastProcKey.key; pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; - pKey->numOfPKs = pReader->numOfPks; + pKey->numOfPKs = pReader->suppInfo.numOfPks; // todo opt allocation, and handle varchar primary key - pKey->pks[0].val = asc ? pBlockInfo->lastPrimaryKey.val : pBlockInfo->firstPrimaryKey.val; - - pInfo->pks[0].val = pBlockInfo->firstPrimaryKey.val; - pInfo->pks[1].val = pBlockInfo->lastPrimaryKey.val; + pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " @@ -3478,14 +3494,6 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p int32_t order = pReader->info.order; TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); - if (!pReader->pkChecked) { - STsdbRowKey k; - tsdbRowGetKey(pRow, &k); - - pReader->pkComparFn = getComparFunc(k.key.pks[0].type, 0); - pReader->pkChecked = true; - } - TSDBKEY key = TSDBROW_KEY(pRow); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index e99ea3467a..4afacea145 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -372,27 +372,21 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor pBlockInfo->count = record->count; SRowKey* pFirstKey = &record->firstKey.key; - if (!pReader->pkChecked) { - pReader->pkChecked = true; - pReader->numOfPks = pFirstKey->numOfPKs; - pReader->pkComparFn = getComparFunc(pFirstKey->pks[0].type, 0); - } - if (pFirstKey->numOfPKs > 0) { if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { - pBlockInfo->firstPrimaryKey.val = pFirstKey->pks[0].val; - pBlockInfo->lastPrimaryKey.val = record->lastKey.key.pks[0].val; + pBlockInfo->firstPk.val = pFirstKey->pks[0].val; + pBlockInfo->lastPk.val = record->lastKey.key.pks[0].val; pBlockInfo->firstPKLen = 0; pBlockInfo->lastPKLen = 0; } else { // todo handle memory alloc error, opt memory alloc perf pBlockInfo->firstPKLen = pFirstKey->pks[0].nData; - pBlockInfo->firstPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); - memcpy(pBlockInfo->firstPrimaryKey.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen); + pBlockInfo->firstPk.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); + memcpy(pBlockInfo->firstPk.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen); pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData; - pBlockInfo->lastPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen); - memcpy(pBlockInfo->lastPrimaryKey.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen); + pBlockInfo->lastPk.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen); + memcpy(pBlockInfo->lastPk.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen); } } } @@ -404,8 +398,6 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 pBlockIter->numOfBlocks = numOfBlocks; taosArrayClear(pBlockIter->blockList); - pBlockIter->pTableMap = pReader->status.pTableMap; - // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = taosArrayGetSize(pTableList); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 2dbae87c56..738b508206 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -157,8 +157,10 @@ typedef struct SBlockLoadSuppInfo { SColumnDataAgg tsColAgg; int16_t* colId; int16_t* slotId; - int32_t numOfCols; char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. + int32_t numOfCols; + int32_t numOfPks; + SColumnInfo pk; bool smaValid; // the sma on all queried columns are activated } SBlockLoadSuppInfo; @@ -189,13 +191,13 @@ typedef struct SFileDataBlockInfo { union { int64_t val; uint8_t* pData; - } firstPrimaryKey; + } firstPk; int64_t lastKey; union { int64_t val; uint8_t* pData; - } lastPrimaryKey; + } lastPk; int32_t firstPKLen; int32_t lastPKLen; @@ -217,7 +219,6 @@ typedef struct SDataBlockIter { SArray* blockList; // SArray int32_t order; SDataBlk block; // current SDataBlk data - SSHashObj* pTableMap; } SDataBlockIter; typedef struct SFileBlockDumpInfo { @@ -280,8 +281,6 @@ struct STsdbReader { TsdReaderNotifyCbFn notifyFn; void* notifyParam; __compar_fn_t pkComparFn; - int32_t numOfPks; - bool pkChecked; }; typedef struct SBrinRecordIter { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 5e11cd1115..beaa0684ad 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -370,7 +370,8 @@ static EDealRes getColumn(SNode** pNode, void* pContext) { pSColumnNode->slotId = pData->index++; SColumnInfo cInfo = {.colId = pSColumnNode->colId, .type = pSColumnNode->node.resType.type, - .bytes = pSColumnNode->node.resType.bytes}; + .bytes = pSColumnNode->node.resType.bytes, + .pk = pSColumnNode->isPk}; #if TAG_FILTER_DEBUG qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type); #endif @@ -1763,6 +1764,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi pCond->colList[j].type = pColNode->node.resType.type; pCond->colList[j].bytes = pColNode->node.resType.bytes; pCond->colList[j].colId = pColNode->colId; + pCond->colList[j].pk = pColNode->isPk; pCond->pSlotList[j] = pNode->slotId; j += 1; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 0c421bf354..1f41a0c7b3 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2276,6 +2276,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC pCond->colList->colId = 1; pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; pCond->colList->bytes = sizeof(TSKEY); + pCond->colList->pk = 0; pCond->pSlotList[0] = 0; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b4b45ced46..c1793bfb90 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -277,7 +277,12 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols return SCAN_TYPE_TABLE; } -static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) { + +static bool hasPkInTable(const STableMeta* pTableMeta) { + return pTableMeta->tableInfo.numOfColumns>=2 && pTableMeta->schema[1].flags & COL_IS_KEY; +} + +static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema, const STableMeta* pMeta) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { return NULL; @@ -287,11 +292,13 @@ static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) { pCol->tableId = tableId; pCol->colId = pSchema->colId; pCol->colType = COLUMN_TYPE_COLUMN; + pCol->isPk = pSchema->flags & COL_IS_KEY; + pCol->tableHasPk = hasPkInTable(pMeta); strcpy(pCol->colName, pSchema->name); return (SNode*)pCol; } -static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) { +static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols, const STableMeta* pMeta) { bool found = false; SNode* pCol = NULL; FOREACH(pCol, *pCols) { @@ -302,12 +309,12 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeL } if (!found) { - return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema)); + return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema, pMeta)); } return TSDB_CODE_SUCCESS; } -static int32_t addPkCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) { +static int32_t addPkCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols, const STableMeta* pMeta) { bool found = false; SNode* pCol = NULL; FOREACH(pCol, *pCols) { @@ -318,30 +325,26 @@ static int32_t addPkCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pC } if (!found) { - return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema)); + return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema, pMeta)); } return TSDB_CODE_SUCCESS; } -static bool hasPkInTable(const STableMeta* pTableMeta) { - return pTableMeta->tableInfo.numOfColumns>=2 && pTableMeta->schema[1].flags & COL_IS_KEY; -} - -static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) { +static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols, const STableMeta* pMeta) { if (LIST_LENGTH(*pCols) > 0) { return TSDB_CODE_SUCCESS; } - return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema)); + return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema, pMeta)); } static int32_t addDefaultScanCol(const STableMeta* pMeta, SNodeList** pCols) { if (TSDB_SYSTEM_TABLE == pMeta->tableType) { - return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols); + return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols, pMeta); } if (hasPkInTable(pMeta)) { - addPkCol(pMeta->uid, pMeta->schema + 1, pCols); + addPkCol(pMeta->uid, pMeta->schema + 1, pCols, pMeta); } - return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols); + return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols, pMeta); } static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealTable, bool hasRepeatScanFuncs,