finish more code

This commit is contained in:
Hongze Cheng 2022-12-02 14:24:23 +08:00
parent 5b83fcc1fc
commit 2cfdf7bf62
4 changed files with 40 additions and 27 deletions

View File

@ -89,12 +89,12 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow); void tRowDestroy(SRow *pRow);
void tRowSort(SArray *aRowP); void tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData);
// SRowIter ================================ // SRowIter ================================
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
void tRowIterClose(SRowIter **ppIter); void tRowIterClose(SRowIter **ppIter);
SColVal *tRowIterNext(SRowIter *pIter); SColVal *tRowIterNext(SRowIter *pIter);
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData);
// STag ================================ // STag ================================
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);

View File

@ -117,7 +117,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tsdbRowCmprFn(const void *p1, const void *p2); int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// STSDBRowIter // STSDBRowIter
void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowClose(STSDBRowIter *pIter);
SColVal *tsdbRowIterNext(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger // SRowMerger
int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
@ -571,10 +572,14 @@ struct SDFileSet {
}; };
struct STSDBRowIter { struct STSDBRowIter {
TSDBROW *pRow; TSDBROW *pRow;
STSchema *pTSchema; union {
SColVal colVal; SRowIter *pIter;
int32_t i; struct {
int32_t iColData;
SColVal cv;
};
};
}; };
struct SRowMerger { struct SRowMerger {
STSchema *pTSchema; STSchema *pTSchema;

View File

@ -596,7 +596,7 @@ int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTS
if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts; if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts;
STSDBRowIter iter = {0}; STSDBRowIter iter = {0};
tsdbRowIterInit(&iter, pRow, pTSchema); tsdbRowIterOpen(&iter, pRow, pTSchema);
SColVal *pColVal = tsdbRowIterNext(&iter); SColVal *pColVal = tsdbRowIterNext(&iter);
for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) {

View File

@ -594,42 +594,50 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
} }
// STSDBRowIter ====================================================== // STSDBRowIter ======================================================
void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
pIter->pRow = pRow; pIter->pRow = pRow;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
ASSERT(pTSchema); code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter);
pIter->pTSchema = pTSchema; if (code) goto _exit;
pIter->i = 1;
} else if (pRow->type == TSDBROW_COL_FMT) { } else if (pRow->type == TSDBROW_COL_FMT) {
pIter->pTSchema = NULL; pIter->iColData = 0;
pIter->i = 0;
} else { } else {
ASSERT(0); ASSERT(0);
} }
_exit:
return code;
}
void tsdbRowClose(STSDBRowIter *pIter) {
if (pIter->pRow->type == TSDBROW_ROW_FMT) {
tRowIterClose(&pIter->pIter);
}
} }
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
if (pIter->pRow->type == TSDBROW_ROW_FMT) { if (pIter->pRow->type == TSDBROW_ROW_FMT) {
if (pIter->i < pIter->pTSchema->numOfCols) { return tRowIterNext(pIter->pIter);
tRowGet(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal);
pIter->i++;
return &pIter->colVal;
}
} else if (pIter->pRow->type == TSDBROW_COL_FMT) { } else if (pIter->pRow->type == TSDBROW_COL_FMT) {
if (pIter->i < pIter->pRow->pBlockData->nColData) { if (pIter->iColData == 0) {
SColData *pColData = tBlockDataGetColDataByIdx(pIter->pRow->pBlockData, pIter->i); pIter->cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP,
(SValue){.val = pIter->pRow->pBlockData->aTSKEY[pIter->pRow->iRow]});
++pIter->iColData;
return &pIter->cv;
}
tColDataGetValue(pColData, pIter->pRow->iRow, &pIter->colVal); if (pIter->iColData < pIter->pRow->pBlockData->nColData) {
pIter->i++; tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData], pIter->pRow->iRow, &pIter->cv);
++pIter->iColData;
return &pIter->colVal; return &pIter->cv;
} else {
return NULL;
} }
} else { } else {
ASSERT(0); ASSERT(0);
} }
return NULL;
} }
// SRowMerger ====================================================== // SRowMerger ======================================================