diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6c410df0ce..9a5fa0871f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -117,11 +117,69 @@ static int32_t pkComp2(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { return comparFn(&p1->pks[0].val, &p2->pks[0].val); } +static void tColRowGetKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_t slotId, SRowKey* pKey) { + pKey->ts = pBlock->aTSKEY[irow]; + if (slotId == -1) { + pKey->numOfPKs = 0; + return; + } + + pKey->numOfPKs = 1; + + SColData* pColData = &pBlock->aColData[slotId]; + SColVal cv; + tColDataGetValue(pColData, irow, &cv); + + if (IS_NUMERIC_TYPE(cv.value.type)) { + pKey->pks[0].val = cv.value.val; + } else { + pKey->pks[0].nData = cv.value.nData; + memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData); + } +} + +// for test purpose, todo remove it +static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) { + int32_t n = 0; + n += tGetI8(p + n, &index->type); + n += tGetU32v(p + n, &index->offset); + return n; +} + +static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { + pKey->ts = pKey->ts; + pKey->numOfPKs = pRow->numOfPKs; + if (pKey->numOfPKs == 0) { + return; + } + + SPrimaryKeyIndex indices[TD_MAX_PK_COLS]; + + uint8_t *data = pRow->data; + for (int32_t i = 0; i < pRow->numOfPKs; i++) { + data += tGetPrimaryKeyIndex(data, &indices[i]); + } + + // primary keys + for (int32_t i = 0; i < pRow->numOfPKs; i++) { + pKey->pks[i].type = indices[i].type; + + if (IS_VAR_DATA_TYPE(indices[i].type)) { + tGetU32v(pKey->pks[i].pData, &pKey->pks[i].nData); + pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData); + pKey->pks[i].pData += pKey->pks[i].nData; + } else { + pKey->pks[i].val = *(int64_t*) data + indices[i].offset; + } + } +} + static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { pSupInfo->pk.pk = 0; pSupInfo->numOfPks = 0; - pSupInfo->pk.slotId = -1; + pSupInfo->pkSrcSlot = -1; + pSupInfo->pkDstSlot = -1; pSupInfo->smaValid = true; pSupInfo->numOfCols = numOfCols; @@ -145,7 +203,8 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC if (pCols[i].pk) { pSupInfo->pk = pCols[i]; - pSupInfo->pk.slotId = pSlotIdList[i]; + pSupInfo->pkSrcSlot = i; + pSupInfo->pkDstSlot = pSlotIdList[i]; pSupInfo->numOfPks += 1; } } @@ -712,6 +771,24 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int // pDumpInfo->lastKey.key.ts = maxKey + step; } +static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDataBlockInfo* pInfo, int32_t numOfPks, + bool asc) { + pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; + + pKey->numOfPKs = numOfPks; + if (pKey->numOfPKs <= 0) { + return; + } + + if (IS_NUMERIC_TYPE(pKey->pks[0].type)) { + pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; + } else { + uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; + pKey->pks[0].nData = asc ? pBlockInfo->lastPKLen : pBlockInfo->firstPKLen; + memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); + } +} + static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) { if (IS_VAR_DATA_TYPE(pColVal->value.type)) { @@ -1116,7 +1193,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro pResBlock->info.rows = dumpedRows; pDumpInfo->rowIndex += step * dumpedRows; - tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey); + tColRowGetKeyDeepCopy(pBlockData, pDumpInfo->rowIndex - step, pSupInfo->pkSrcSlot, pLastProcKey); // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { @@ -2277,12 +2354,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } if (copied) { - if (pReader->suppInfo.numOfPks == 0) { - pBlockScanInfo->lastProcKey.ts = key; - } else { // todo use deep copy instead of shallow copy - int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; - tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey); - } +// if (pReader->suppInfo.numOfPks == 0) { +// pBlockScanInfo->lastProcKey.ts = key; +// } else { // todo use deep copy instead of shallow copy +// int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; +// tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey); +// } return TSDB_CODE_SUCCESS; } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); @@ -2328,7 +2405,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn } if (copied) { - tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); +// tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); return TSDB_CODE_SUCCESS; } else { code = tsdbRowMergerAdd(pMerger, &fRow, NULL); @@ -2436,7 +2513,7 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf pResBlock->info.version = pReader->info.verRange.maxVer; blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); - blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pk.slotId, ASCENDING_TRAVERSE(pReader->info.order)); + blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pkDstSlot, ASCENDING_TRAVERSE(pReader->info.order)); setComposedBlockFlag(pReader, true); // todo update the pk range for current return data block @@ -2822,16 +2899,10 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn } } + // update the last key for the corresponding table setComposedBlockFlag(pReader, false); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); - - // update the last key for the corresponding table - SRowKey* pKey = &pScanInfo->lastProcKey; - pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; - pKey->numOfPKs = pReader->suppInfo.numOfPks; - - // todo opt allocation, and handle varchar primary key - pKey->pks[0].val = asc ? pBlockInfo->lastPk.val : pBlockInfo->firstPk.val; + updateLastKeyInfo(&pScanInfo->lastProcKey, pBlockInfo, pInfo, pReader->suppInfo.numOfPks, asc); tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " @@ -3928,11 +3999,6 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT pBlock->info.dataLoad = 1; pBlock->info.rows += 1; - - // todo no version - TSDBROW row = {.pTSRow = pTSRow, .type = TSDBROW_ROW_FMT}; - tRowGetKeyEx(&row, &pScanInfo->lastProcKey); -// pScanInfo->lastProcKey = pTSRow->ts; return TSDB_CODE_SUCCESS; } @@ -4008,13 +4074,14 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return code; } - tRowGetKey(row.pTSRow, &pBlockScanInfo->lastProcKey); + tRowGetKeyDeepCopy(row.pTSRow, &pBlockScanInfo->lastProcKey); } else { code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); if (code) { return code; } - tColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey); + + tColRowGetKeyDeepCopy(row.pBlockData, row.iRow, pReader->suppInfo.pkSrcSlot, &pBlockScanInfo->lastProcKey); } // no data in buffer, return immediately diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 70fe50fa96..56e6ca9ba7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -160,6 +160,8 @@ typedef struct SBlockLoadSuppInfo { int32_t numOfCols; int32_t numOfPks; SColumnInfo pk; + int32_t pkSrcSlot; + int32_t pkDstSlot; bool smaValid; // the sma on all queried columns are activated } SBlockLoadSuppInfo; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 02a14d03d7..550bedf173 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -640,15 +640,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { if (IS_NUMERIC_TYPE(pVal->type)) { continue; } - - uint8_t *p = taosMemoryMalloc(pVal->nData); - if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; - } - - memcpy(p, pVal->pData, pVal->nData); - pVal->pData = p; + memcpy(pVal->pData, pVal->pData, pVal->nData); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6894b4bc09..dbb0761046 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1197,6 +1197,21 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); + { // todo :refactor: + for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) { + SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i); + if (pItem->isPk) { + SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId); + pInfo->pResBlock->info.pks[0].type = pInfoData->info.type; + pInfo->pResBlock->info.pks[1].type = pInfoData->info.type; + + if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { + pInfo->pResBlock->info.pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + pInfo->pResBlock->info.pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + } + } + } + } code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error;