Merge pull request #17078 from taosdata/enh/load_by_column
enh: load by col opt
This commit is contained in:
commit
1b3721b42e
|
@ -32,6 +32,12 @@ extern "C" {
|
|||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||
// clang-format on
|
||||
|
||||
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
|
||||
if (CODE) { \
|
||||
LINO = __LINE__; \
|
||||
goto LABEL; \
|
||||
}
|
||||
|
||||
typedef struct TSDBROW TSDBROW;
|
||||
typedef struct TABLEID TABLEID;
|
||||
typedef struct TSDBKEY TSDBKEY;
|
||||
|
@ -150,7 +156,7 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs);
|
|||
|
||||
int32_t tBlockDataCreate(SBlockData *pBlockData);
|
||||
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
|
||||
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema);
|
||||
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid);
|
||||
int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
|
||||
void tBlockDataReset(SBlockData *pBlockData);
|
||||
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
|
||||
|
@ -272,6 +278,7 @@ int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
|
|||
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
|
||||
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
|
||||
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData);
|
||||
int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData);
|
||||
// SDelFWriter
|
||||
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
|
||||
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
|
||||
|
@ -633,6 +640,9 @@ typedef struct SSttBlockLoadInfo {
|
|||
int32_t currentLoadBlockIndex;
|
||||
int32_t loadBlocks;
|
||||
double elapsedTime;
|
||||
STSchema *pSchema;
|
||||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
} SSttBlockLoadInfo;
|
||||
|
||||
typedef struct SMergeTree {
|
||||
|
@ -652,13 +662,14 @@ typedef struct {
|
|||
} SSkmInfo;
|
||||
|
||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr);
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pBlockLoadInfo, STSchema *pSchema,
|
||||
int16_t *pCols, int32_t numOfCols, const char *idStr);
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||
void tMergeTreeClose(SMergeTree *pMTree);
|
||||
|
||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo();
|
||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
|
||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
|
||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
|
|
|
@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
|||
|
||||
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL);
|
||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, NULL, NULL, NULL, 0, NULL);
|
||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||
if (!hasVal) {
|
||||
state->state = SFSLASTNEXTROW_FILESET;
|
||||
|
@ -612,7 +612,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
|
|||
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
|
||||
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
|
||||
tBlockDataReset(state->pBlockData);
|
||||
code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema);
|
||||
TABLEID tid = {.suid = state->suid, .uid = state->uid};
|
||||
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData);
|
||||
|
|
|
@ -437,7 +437,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
|||
|
||||
pIter->iSttBlk = 0;
|
||||
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
|
||||
code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
|
||||
code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
pIter->iRow = 0;
|
||||
|
@ -1049,7 +1049,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
|
|||
if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
|
||||
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
|
||||
|
||||
code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
|
||||
code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
|
||||
if (code) goto _exit;
|
||||
|
||||
pIter->iRow = 0;
|
||||
|
@ -1305,7 +1305,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
|
|||
if (!pBDatal->suid && !pBDatal->uid) {
|
||||
ASSERT(pCommitter->skmTable.suid == id.suid);
|
||||
ASSERT(pCommitter->skmTable.uid == id.uid);
|
||||
code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema);
|
||||
TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
|
||||
code = tBlockDataInit(pBDatal, &tid, pCommitter->skmTable.pTSchema, NULL, 0);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
|
@ -1428,9 +1429,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
|
|||
// impl
|
||||
code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
|
||||
if (code) goto _err;
|
||||
code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
|
||||
code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
|
||||
if (code) goto _err;
|
||||
code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema);
|
||||
code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
/* merge with data in .data file */
|
||||
|
|
|
@ -28,11 +28,10 @@ struct SLDataIter {
|
|||
uint64_t uid;
|
||||
STimeWindow timeWindow;
|
||||
SVersionRange verRange;
|
||||
|
||||
SSttBlockLoadInfo* pBlockLoadInfo;
|
||||
};
|
||||
|
||||
SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
|
||||
SSttBlockLoadInfo* tCreateLastBlockLoadInfo(STSchema* pSchema, int16_t* colList, int32_t numOfCols) {
|
||||
SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
|
||||
if (pLoadInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -55,6 +54,9 @@ SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
|
|||
}
|
||||
|
||||
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
||||
pLoadInfo[i].pSchema = pSchema;
|
||||
pLoadInfo[i].colIds = colList;
|
||||
pLoadInfo[i].numOfCols = numOfCols;
|
||||
}
|
||||
|
||||
return pLoadInfo;
|
||||
|
@ -111,7 +113,19 @@ static SBlockData* loadLastBlock(SLDataIter *pIter, const char* idStr) {
|
|||
pInfo->currentLoadBlockIndex ^= 1;
|
||||
if (pIter->pSttBlk != NULL) { // current block not loaded yet
|
||||
int64_t st = taosGetTimestampUs();
|
||||
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]);
|
||||
|
||||
SBlockData* pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
|
||||
|
||||
TABLEID id = {0};
|
||||
if (pIter->pSttBlk->suid != 0) {
|
||||
id.suid = pIter->pSttBlk->suid;
|
||||
} else {
|
||||
id.uid = pIter->uid;
|
||||
}
|
||||
|
||||
tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
|
||||
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
|
||||
|
||||
double el = (taosGetTimestampUs() - st)/ 1000.0;
|
||||
pInfo->elapsedTime += el;
|
||||
pInfo->loadBlocks += 1;
|
||||
|
@ -460,7 +474,8 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
|
|||
}
|
||||
|
||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, const char* idStr) {
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo, STSchema* pSchema,
|
||||
int16_t* pCols, int32_t numOfCols, const char* idStr) {
|
||||
pMTree->backward = backward;
|
||||
pMTree->pIter = NULL;
|
||||
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
|
||||
|
@ -475,9 +490,10 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
|||
|
||||
SSttBlockLoadInfo* pLoadInfo = NULL;
|
||||
if (pBlockLoadInfo == NULL) {
|
||||
ASSERT(0);
|
||||
if (pMTree->pLoadInfo == NULL) {
|
||||
pMTree->destroyLoadInfo = true;
|
||||
pMTree->pLoadInfo = tCreateLastBlockLoadInfo();
|
||||
pMTree->pLoadInfo = tCreateLastBlockLoadInfo(pSchema, pCols, numOfCols);
|
||||
}
|
||||
|
||||
pLoadInfo = pMTree->pLoadInfo;
|
||||
|
|
|
@ -79,6 +79,7 @@ typedef struct SBlockLoadSuppInfo {
|
|||
SColumnDataAgg tsColAgg;
|
||||
SColumnDataAgg** plist;
|
||||
int16_t* colIds; // column ids for loading file block data
|
||||
int32_t numOfCols;
|
||||
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
||||
} SBlockLoadSuppInfo;
|
||||
|
||||
|
@ -203,6 +204,7 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
|||
|
||||
size_t numOfCols = blockDataGetNumOfCols(pBlock);
|
||||
|
||||
pSupInfo->numOfCols = numOfCols;
|
||||
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
|
||||
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
|
||||
|
@ -352,7 +354,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
|
|||
tMergeTreeClose(&pLReader->mergeTree);
|
||||
|
||||
if (pLReader->pInfo == NULL) {
|
||||
pLReader->pInfo = tCreateLastBlockLoadInfo();
|
||||
// here we ignore the first column, which is always be the primary timestamp column
|
||||
pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pReader->suppInfo.colIds[1], pReader->suppInfo.numOfCols - 1);
|
||||
if (pLReader->pInfo == NULL) {
|
||||
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
|
||||
return terrno;
|
||||
|
@ -1741,7 +1744,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
}
|
||||
|
||||
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
|
||||
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
||||
|
||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||
|
@ -1995,7 +1998,8 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
|
||||
int32_t code =
|
||||
tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
||||
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, pReader->idStr);
|
||||
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo,
|
||||
pReader->pSchema, pReader->suppInfo.colIds, pReader->suppInfo.numOfCols, pReader->idStr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
@ -2009,12 +2013,12 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
|||
}
|
||||
|
||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||
if (pBlockData->nRow > 0) {
|
||||
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
|
||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||
if (pBlockData->nRow > 0) {
|
||||
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
|
||||
}
|
||||
|
||||
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
||||
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
||||
}
|
||||
|
||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||
|
@ -2452,7 +2456,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
code = buildComposedDataBlock(pReader);
|
||||
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
|
||||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
|
||||
TABLEID tid = {.suid = pReader->suid, .uid = pScanInfo->uid};
|
||||
code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2932,7 +2937,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
|
||||
// 3. load the neighbor block, and set it to be the currently accessed file data block
|
||||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema);
|
||||
TABLEID tid = {.suid = pReader->suid, .uid = pFBlock->uid};
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -3404,10 +3410,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
|||
// we need only one row
|
||||
pPrevReader->capacity = 1;
|
||||
pPrevReader->status.pTableMap = pReader->status.pTableMap;
|
||||
pPrevReader->pSchema = pReader->pSchema;
|
||||
pPrevReader->pMemSchema = pReader->pMemSchema;
|
||||
pPrevReader->pReadSnap = pReader->pReadSnap;
|
||||
|
||||
pNextReader->capacity = 1;
|
||||
pNextReader->status.pTableMap = pReader->status.pTableMap;
|
||||
pNextReader->pSchema = pReader->pSchema;
|
||||
pNextReader->pMemSchema = pReader->pMemSchema;
|
||||
pNextReader->pReadSnap = pReader->pReadSnap;
|
||||
|
||||
code = doOpenReaderImpl(pPrevReader);
|
||||
|
@ -3441,11 +3451,19 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
|
||||
{
|
||||
if (pReader->innerReader[0] != NULL) {
|
||||
pReader->innerReader[0]->status.pTableMap = NULL;
|
||||
pReader->innerReader[0]->pReadSnap = NULL;
|
||||
STsdbReader* p = pReader->innerReader[0];
|
||||
|
||||
pReader->innerReader[1]->status.pTableMap = NULL;
|
||||
pReader->innerReader[1]->pReadSnap = NULL;
|
||||
p->status.pTableMap = NULL;
|
||||
p->pReadSnap = NULL;
|
||||
p->pSchema = NULL;
|
||||
p->pMemSchema = NULL;
|
||||
|
||||
p = pReader->innerReader[1];
|
||||
|
||||
p->status.pTableMap = NULL;
|
||||
p->pReadSnap = NULL;
|
||||
p->pSchema = NULL;
|
||||
p->pMemSchema = NULL;
|
||||
|
||||
tsdbReaderClose(pReader->innerReader[0]);
|
||||
tsdbReaderClose(pReader->innerReader[1]);
|
||||
|
@ -3684,7 +3702,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
|
|||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||
|
||||
tBlockDataReset(&pStatus->fileBlockData);
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
|
||||
TABLEID tid = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
||||
int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
|
|
|
@ -926,12 +926,13 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData) {
|
||||
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData,
|
||||
int32_t iStt) {
|
||||
int32_t code = 0;
|
||||
|
||||
tBlockDataClear(pBlockData);
|
||||
|
||||
STsdbFD *pFD = pReader->pDataFD;
|
||||
STsdbFD *pFD = (iStt < 0) ? pReader->pDataFD : pReader->aSttFD[iStt];
|
||||
|
||||
// uid + version + tskey
|
||||
code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
|
||||
|
@ -1070,9 +1071,12 @@ _err:
|
|||
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
|
||||
int32_t code = 0;
|
||||
|
||||
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData);
|
||||
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData, -1);
|
||||
if (code) goto _err;
|
||||
|
||||
ASSERT(pDataBlk->nSubBlock == 1);
|
||||
|
||||
#if 0
|
||||
if (pDataBlk->nSubBlock > 1) {
|
||||
SBlockData bData1;
|
||||
SBlockData bData2;
|
||||
|
@ -1113,6 +1117,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData
|
|||
tBlockDataDestroy(&bData1, 1);
|
||||
tBlockDataDestroy(&bData2, 1);
|
||||
}
|
||||
#endif
|
||||
|
||||
return code;
|
||||
|
||||
|
@ -1123,23 +1128,38 @@ _err:
|
|||
|
||||
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbReadBlockDataImpl(pReader, &pSttBlk->bInfo, pBlockData, iStt);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// alloc
|
||||
code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// read
|
||||
code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// decmpr
|
||||
code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read stt block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -140,7 +140,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
|
|||
if (pSttBlk->minVer > pReader->ever) continue;
|
||||
if (pSttBlk->maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadSttBlock(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
|
||||
code = tsdbReadSttBlockEx(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) {
|
||||
|
@ -223,7 +223,7 @@ static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) {
|
|||
|
||||
if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue;
|
||||
|
||||
code = tsdbReadSttBlock(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
|
||||
code = tsdbReadSttBlockEx(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData);
|
||||
if (code) goto _err;
|
||||
|
||||
pIter->iRow = -1;
|
||||
|
@ -319,7 +319,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
|
|||
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema);
|
||||
code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
|
||||
|
@ -715,7 +715,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
|
|||
if (code) goto _err;
|
||||
|
||||
tMapDataReset(&pWriter->dWriter.mDataBlk);
|
||||
code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema);
|
||||
code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
|
||||
if (code) goto _err;
|
||||
|
||||
return code;
|
||||
|
@ -1000,7 +1000,8 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
|
|||
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema);
|
||||
TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid};
|
||||
code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
|
|
@ -948,24 +948,47 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) {
|
|||
pBlockData->aColData = NULL;
|
||||
}
|
||||
|
||||
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema) {
|
||||
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(suid || uid);
|
||||
ASSERT(pId->suid || pId->uid);
|
||||
|
||||
pBlockData->suid = suid;
|
||||
pBlockData->uid = uid;
|
||||
pBlockData->suid = pId->suid;
|
||||
pBlockData->uid = pId->uid;
|
||||
pBlockData->nRow = 0;
|
||||
|
||||
taosArrayClear(pBlockData->aIdx);
|
||||
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
|
||||
if (aCid) {
|
||||
int32_t iColumn = 1;
|
||||
STColumn *pTColumn = &pTSchema->columns[iColumn];
|
||||
for (int32_t iCid = 0; iCid < nCid; iCid++) {
|
||||
while (pTColumn && pTColumn->colId < aCid[iCid]) {
|
||||
iColumn++;
|
||||
pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
|
||||
}
|
||||
|
||||
SColData *pColData;
|
||||
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
|
||||
if (code) goto _exit;
|
||||
if (pTColumn == NULL) {
|
||||
break;
|
||||
} else if (pTColumn->colId == aCid[iCid]) {
|
||||
SColData *pColData;
|
||||
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
|
||||
if (code) goto _exit;
|
||||
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
|
||||
|
||||
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
|
||||
iColumn++;
|
||||
pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
|
||||
STColumn *pTColumn = &pTSchema->columns[iColumn];
|
||||
|
||||
SColData *pColData;
|
||||
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
|
||||
if (code) goto _exit;
|
||||
|
||||
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
|
Loading…
Reference in New Issue