Merge pull request #15555 from taosdata/fix/TD-17968
fix: update schema to newest version to parsing rows
This commit is contained in:
commit
c2faef7377
|
@ -90,6 +90,9 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2);
|
||||||
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
SColVal *tRowIterNext(SRowIter *pIter);
|
SColVal *tRowIterNext(SRowIter *pIter);
|
||||||
// SRowMerger
|
// SRowMerger
|
||||||
|
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
|
||||||
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
void tRowMergerClear(SRowMerger *pMerger);
|
void tRowMergerClear(SRowMerger *pMerger);
|
||||||
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
|
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
|
||||||
|
|
|
@ -80,15 +80,15 @@ typedef struct SFilesetIter {
|
||||||
typedef struct SFileDataBlockInfo {
|
typedef struct SFileDataBlockInfo {
|
||||||
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
|
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
int32_t tbBlockIdx;
|
int32_t tbBlockIdx;
|
||||||
} SFileDataBlockInfo;
|
} SFileDataBlockInfo;
|
||||||
|
|
||||||
typedef struct SDataBlockIter {
|
typedef struct SDataBlockIter {
|
||||||
int32_t numOfBlocks;
|
int32_t numOfBlocks;
|
||||||
int32_t index;
|
int32_t index;
|
||||||
SArray* blockList; // SArray<SFileDataBlockInfo>
|
SArray* blockList; // SArray<SFileDataBlockInfo>
|
||||||
int32_t order;
|
int32_t order;
|
||||||
SBlock block; // current SBlock data
|
SBlock block; // current SBlock data
|
||||||
SHashObj* pTableMap;
|
SHashObj* pTableMap;
|
||||||
} SDataBlockIter;
|
} SDataBlockIter;
|
||||||
|
|
||||||
|
@ -124,8 +124,8 @@ struct STsdbReader {
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
int32_t capacity;
|
int32_t capacity;
|
||||||
SReaderStatus status;
|
SReaderStatus status;
|
||||||
char* idStr; // query info handle, for debug purpose
|
char* idStr; // query info handle, for debug purpose
|
||||||
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
||||||
SBlockLoadSuppInfo suppInfo;
|
SBlockLoadSuppInfo suppInfo;
|
||||||
STsdbReadSnap* pReadSnap;
|
STsdbReadSnap* pReadSnap;
|
||||||
SIOCostSummary cost;
|
SIOCostSummary cost;
|
||||||
|
@ -133,8 +133,8 @@ struct STsdbReader {
|
||||||
SDataFReader* pFileReader;
|
SDataFReader* pFileReader;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
|
|
||||||
int32_t step;
|
int32_t step;
|
||||||
STsdbReader* innerReader[2];
|
STsdbReader* innerReader[2];
|
||||||
};
|
};
|
||||||
|
|
||||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
||||||
|
@ -143,7 +143,7 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
|
||||||
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
||||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
|
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
|
||||||
SRowMerger* pMerger);
|
SRowMerger* pMerger);
|
||||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
||||||
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
|
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
|
||||||
|
@ -212,8 +212,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
pTsdbReader->idStr);
|
pTsdbReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo)*numOfTables)/1024.0,
|
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
|
||||||
pTsdbReader->idStr);
|
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
|
||||||
|
|
||||||
return pTableMap;
|
return pTableMap;
|
||||||
}
|
}
|
||||||
|
@ -397,7 +397,8 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
|
||||||
return pResBlock;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, const char* idstr) {
|
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
|
||||||
|
const char* idstr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t level = 0;
|
int8_t level = 0;
|
||||||
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||||
|
@ -577,7 +578,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
||||||
|
|
||||||
int64_t et2 = taosGetTimestampUs();
|
int64_t et2 = taosGetTimestampUs();
|
||||||
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
|
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
|
||||||
(int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx)/1024.0, pReader->idStr);
|
(int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
|
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
|
||||||
|
|
||||||
|
@ -592,7 +593,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
||||||
*numOfValidTables = 0;
|
*numOfValidTables = 0;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
|
|
||||||
STableBlockScanInfo* px = NULL;
|
STableBlockScanInfo* px = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -642,9 +643,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st)/1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
|
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
|
||||||
numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, el, pReader->idStr);
|
numOfTables, *numOfBlocks, *numOfValidTables, size / 1000.0, el, pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.numOfBlocks += (*numOfBlocks);
|
pReader->cost.numOfBlocks += (*numOfBlocks);
|
||||||
pReader->cost.headFileLoadTime += el;
|
pReader->cost.headFileLoadTime += el;
|
||||||
|
@ -680,9 +681,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||||
return pFBlockInfo;
|
return pFBlockInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) {
|
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
|
||||||
return &pBlockIter->block;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
@ -690,7 +689,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
|
|
||||||
SBlockData* pBlockData = &pStatus->fileBlockData;
|
SBlockData* pBlockData = &pStatus->fileBlockData;
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
|
||||||
|
|
||||||
|
@ -772,17 +771,17 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||||
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
SBlock* pBlock = getCurrentBlock(pBlockIter);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
||||||
int32_t code = tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols,
|
int32_t code =
|
||||||
pBlockData, NULL, NULL);
|
tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, NULL, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -857,7 +856,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
|
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||||
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||||
|
|
||||||
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
|
||||||
|
@ -882,7 +881,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
SBlockOrderSupporter sup = {0};
|
SBlockOrderSupporter sup = {0};
|
||||||
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
|
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -937,8 +936,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", pReader, cnt,
|
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
|
||||||
(et - st)/1000.0, pReader->idStr);
|
pReader, cnt, (et - st) / 1000.0, pReader->idStr);
|
||||||
|
|
||||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||||
cleanupBlockOrderSupporter(&sup);
|
cleanupBlockOrderSupporter(&sup);
|
||||||
|
@ -976,7 +975,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et-st)/1000.0, pReader->idStr);
|
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et - st) / 1000.0,
|
||||||
|
pReader->idStr);
|
||||||
cleanupBlockOrderSupporter(&sup);
|
cleanupBlockOrderSupporter(&sup);
|
||||||
taosMemoryFree(pTree);
|
taosMemoryFree(pTree);
|
||||||
|
|
||||||
|
@ -1024,7 +1024,7 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
*nextIndex = pFBlockInfo->tbBlockIdx + step;
|
*nextIndex = pFBlockInfo->tbBlockIdx + step;
|
||||||
|
|
||||||
SBlock *pBlock = taosMemoryCalloc(1, sizeof(SBlock));
|
SBlock* pBlock = taosMemoryCalloc(1, sizeof(SBlock));
|
||||||
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||||
|
|
||||||
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
|
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
|
||||||
|
@ -1196,10 +1196,10 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
|
|
||||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug(
|
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
|
||||||
"%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
|
" - %" PRId64 " %s",
|
||||||
" - %" PRId64 " %s",
|
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
|
||||||
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr);
|
pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.buildmemBlock += elapsedTime;
|
pReader->cost.buildmemBlock += elapsedTime;
|
||||||
return code;
|
return code;
|
||||||
|
@ -1230,7 +1230,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
|
||||||
tRowMerge(&merge, pRow);
|
tRowMerge(&merge, pRow);
|
||||||
doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
}
|
}
|
||||||
|
@ -1246,7 +1246,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMerge(&merge, &fRow);
|
tRowMerge(&merge, &fRow);
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
@ -1291,12 +1291,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (ik.ts == key) {
|
if (ik.ts == key) {
|
||||||
tRowMerge(&merge, piRow);
|
tRowMerge(&merge, piRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (k.ts == key) {
|
if (k.ts == key) {
|
||||||
tRowMerge(&merge, pRow);
|
tRowMerge(&merge, pRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
@ -1336,11 +1336,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
updateSchema(pRow, uid, pReader);
|
updateSchema(pRow, uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
if (ik.ts == k.ts) {
|
if (ik.ts == k.ts) {
|
||||||
tRowMerge(&merge, piRow);
|
tRowMerge(&merge, piRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (k.ts == key) {
|
if (k.ts == key) {
|
||||||
|
@ -1514,9 +1514,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
setComposedBlockFlag(pReader, true);
|
setComposedBlockFlag(pReader, true);
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader,
|
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
||||||
pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
|
" rows:%d, elapsed time:%.2f ms %s",
|
||||||
(et - st)/1000.0, pReader->idStr);
|
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
|
pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1694,7 +1695,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
|
||||||
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
|
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||||
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -2111,7 +2112,8 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) {
|
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||||
|
STsdbReader* pReader) {
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||||
if (!pIter->hasVal) {
|
if (!pIter->hasVal) {
|
||||||
|
@ -2130,7 +2132,19 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMer
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMerge(pMerger, pRow);
|
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||||
|
STSchema* pTSchema = NULL;
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||||
|
} else {
|
||||||
|
pTSchema = pReader->pSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
tRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
|
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2227,7 +2241,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
CHECK_FILEBLOCK_STATE st;
|
CHECK_FILEBLOCK_STATE st;
|
||||||
|
|
||||||
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
|
SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||||
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
|
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
|
||||||
if (st == CHECK_FILEBLOCK_QUIT) {
|
if (st == CHECK_FILEBLOCK_QUIT) {
|
||||||
break;
|
break;
|
||||||
|
@ -2254,12 +2268,23 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
updateSchema(pRow, uid, pReader);
|
// updateSchema(pRow, uid, pReader);
|
||||||
|
int32_t sversion = TSDBROW_SVERSION(pRow);
|
||||||
|
STSchema* pTSchema = NULL;
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
|
||||||
|
} else {
|
||||||
|
pTSchema = pReader->pSchema;
|
||||||
|
}
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit2(&merge, pReader->pSchema, pRow, pTSchema);
|
||||||
doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
|
doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader);
|
||||||
tRowMergerGetRow(&merge, pTSRow);
|
tRowMergerGetRow(&merge, pTSRow);
|
||||||
tRowMergerClear(&merge);
|
tRowMergerClear(&merge);
|
||||||
|
|
||||||
|
if (sversion != pReader->pSchema->version) {
|
||||||
|
taosMemoryFree(pTSchema);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
|
||||||
|
@ -2273,18 +2298,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
|
||||||
updateSchema(piRow, pBlockScanInfo->uid, pReader);
|
updateSchema(piRow, pBlockScanInfo->uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, piRow, pReader->pSchema);
|
tRowMergerInit(&merge, piRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMerge(&merge, pRow);
|
tRowMerge(&merge, pRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
} else {
|
} else {
|
||||||
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
||||||
|
|
||||||
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
tRowMergerInit(&merge, pRow, pReader->pSchema);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
|
|
||||||
tRowMerge(&merge, piRow);
|
tRowMerge(&merge, piRow);
|
||||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, pTSRow);
|
tRowMergerGetRow(&merge, pTSRow);
|
||||||
|
@ -2522,7 +2547,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
// update the SQueryTableDataCond to create inner reader
|
// update the SQueryTableDataCond to create inner reader
|
||||||
STimeWindow w = pCond->twindows;
|
STimeWindow w = pCond->twindows;
|
||||||
int32_t order = pCond->order;
|
int32_t order = pCond->order;
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
pCond->twindows.ekey = pCond->twindows.skey;
|
pCond->twindows.ekey = pCond->twindows.skey;
|
||||||
pCond->twindows.skey = INT64_MIN;
|
pCond->twindows.skey = INT64_MIN;
|
||||||
|
@ -2541,7 +2566,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
pCond->twindows.skey = w.ekey;
|
pCond->twindows.skey = w.ekey;
|
||||||
pCond->twindows.ekey = INT64_MAX;
|
pCond->twindows.ekey = INT64_MAX;
|
||||||
} else {
|
} else {
|
||||||
pCond->twindows.skey = INT64_MIN;
|
pCond->twindows.skey = INT64_MIN;
|
||||||
pCond->twindows.ekey = w.ekey;
|
pCond->twindows.ekey = w.ekey;
|
||||||
}
|
}
|
||||||
|
@ -2589,10 +2614,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STsdbReader* pPrevReader = pReader->innerReader[0];
|
STsdbReader* pPrevReader = pReader->innerReader[0];
|
||||||
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
||||||
|
|
||||||
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr);
|
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
|
||||||
|
pPrevReader->idStr);
|
||||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
|
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
// no data in files, let's try buffer in memory
|
||||||
|
@ -2647,12 +2673,14 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
SIOCostSummary* pCost = &pReader->cost;
|
SIOCostSummary* pCost = &pReader->cost;
|
||||||
|
|
||||||
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%"PRId64" SMA-time:%.2f ms, "
|
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
|
||||||
"fileBlocks:%"PRId64", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
|
" SMA-time:%.2f ms, "
|
||||||
"size:%.2f Kb %s",
|
"fileBlocks:%" PRId64
|
||||||
|
", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
|
||||||
|
"size:%.2f Kb %s",
|
||||||
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
|
||||||
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
|
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
|
||||||
numOfTables * sizeof(STableBlockScanInfo) /1000.0, pReader->idStr);
|
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
|
||||||
|
|
||||||
taosMemoryFree(pReader->idStr);
|
taosMemoryFree(pReader->idStr);
|
||||||
taosMemoryFree(pReader->pSchema);
|
taosMemoryFree(pReader->pSchema);
|
||||||
|
@ -2731,9 +2759,9 @@ static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
|
||||||
|
|
||||||
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
|
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
|
||||||
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
if (pReader->step == EXTERNAL_ROWS_MAIN) {
|
if (pReader->step == EXTERNAL_ROWS_MAIN) {
|
||||||
setBlockInfo(pReader, pDataBlockInfo);
|
setBlockInfo(pReader, pDataBlockInfo);
|
||||||
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
|
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
|
||||||
setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
|
setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
|
||||||
} else {
|
} else {
|
||||||
setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
|
setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
|
||||||
|
@ -2747,7 +2775,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*allHave = false;
|
*allHave = false;
|
||||||
|
|
||||||
if(pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
*pBlockStatis = NULL;
|
*pBlockStatis = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2758,7 +2786,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
|
|
||||||
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||||
int64_t stime = taosGetTimestampUs();
|
int64_t stime = taosGetTimestampUs();
|
||||||
|
@ -2863,7 +2891,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->order = pCond->order;
|
pReader->order = pCond->order;
|
||||||
pReader->type = TIMEWINDOW_RANGE_CONTAINED;
|
pReader->type = TIMEWINDOW_RANGE_CONTAINED;
|
||||||
pReader->status.loadFromFile = true;
|
pReader->status.loadFromFile = true;
|
||||||
pReader->status.pTableIter = NULL;
|
pReader->status.pTableIter = NULL;
|
||||||
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||||
|
@ -2882,7 +2910,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
|
||||||
resetDataBlockScanInfo(pReader->status.pTableMap);
|
resetDataBlockScanInfo(pReader->status.pTableMap);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
// no data in files, let's try buffer in memory
|
||||||
|
@ -2891,8 +2919,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
} else {
|
} else {
|
||||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
|
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
|
||||||
pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tdataformat.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
// SMapData =======================================================================
|
// SMapData =======================================================================
|
||||||
|
@ -548,6 +549,103 @@ SColVal *tRowIterNext(SRowIter *pIter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SRowMerger ======================================================
|
// SRowMerger ======================================================
|
||||||
|
|
||||||
|
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
STColumn *pTColumn;
|
||||||
|
int32_t iCol, jCol = 0;
|
||||||
|
|
||||||
|
pMerger->pTSchema = pResTSchema;
|
||||||
|
pMerger->version = key.version;
|
||||||
|
|
||||||
|
pMerger->pArray = taosArrayInit(pResTSchema->numOfCols, sizeof(SColVal));
|
||||||
|
if (pMerger->pArray == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ts
|
||||||
|
pTColumn = &pTSchema->columns[jCol++];
|
||||||
|
|
||||||
|
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
|
|
||||||
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts});
|
||||||
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
// other
|
||||||
|
for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) {
|
||||||
|
pTColumn = &pResTSchema->columns[iCol];
|
||||||
|
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
|
||||||
|
++jCol;
|
||||||
|
--iCol;
|
||||||
|
continue;
|
||||||
|
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
||||||
|
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
|
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; iCol < pResTSchema->numOfCols; ++iCol) {
|
||||||
|
pTColumn = &pResTSchema->columns[iCol];
|
||||||
|
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
|
int32_t code = 0;
|
||||||
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
SColVal *pColVal = &(SColVal){0};
|
||||||
|
STColumn *pTColumn;
|
||||||
|
int32_t iCol, jCol = 1;
|
||||||
|
|
||||||
|
ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts);
|
||||||
|
|
||||||
|
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
|
||||||
|
pTColumn = &pMerger->pTSchema->columns[iCol];
|
||||||
|
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
|
||||||
|
++jCol;
|
||||||
|
--iCol;
|
||||||
|
continue;
|
||||||
|
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
|
|
||||||
|
if (key.version > pMerger->version) {
|
||||||
|
if (!pColVal->isNone) {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
|
} else if (key.version < pMerger->version) {
|
||||||
|
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
if (tColVal->isNone && !pColVal->isNone) {
|
||||||
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pMerger->version = key.version;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
|
Loading…
Reference in New Issue