more work

This commit is contained in:
Hongze Cheng 2022-06-20 11:28:45 +00:00
parent d17d6c5009
commit 5028420791
3 changed files with 88 additions and 8 deletions

View File

@ -109,6 +109,7 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph);
#define tColDataInit() ((SColData){0}) #define tColDataInit() ((SColData){0})
void tColDataReset(SColData *pColData); void tColDataReset(SColData *pColData);
void tColDataClear(SColData *pColData); void tColDataClear(SColData *pColData);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataCmprFn(const void *p1, const void *p2); int32_t tColDataCmprFn(const void *p1, const void *p2);
// SBlockData // SBlockData
#define tBlockDataInit() ((SBlockData){0}) #define tBlockDataInit() ((SBlockData){0})
@ -352,6 +353,7 @@ struct SColData {
int32_t bytes; int32_t bytes;
uint8_t flags; uint8_t flags;
uint8_t *pBitMap; uint8_t *pBitMap;
int32_t *pOfst;
uint32_t nData; uint32_t nData;
uint8_t *pData; uint8_t *pData;
}; };
@ -362,7 +364,7 @@ struct SBlockData {
int64_t *aVersion; int64_t *aVersion;
TSKEY *aTSKEY; TSKEY *aTSKEY;
int32_t maxCol; int32_t maxCol;
int32_t nCol; int32_t nColData;
SColData *aColData; SColData *aColData;
}; };

View File

@ -922,7 +922,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// other columns // other columns
offset = 0; offset = 0;
tMapDataClear(&pSubBlock->mBlockCol); tMapDataClear(&pSubBlock->mBlockCol);
for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) { for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
SColData *pColData = &pBlockData->aColData[iCol]; SColData *pColData = &pBlockData->aColData[iCol];
ASSERT(pColData->flags); ASSERT(pColData->flags);

View File

@ -506,7 +506,7 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
SColData *pColData; SColData *pColData;
void *p; void *p;
p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->aColData, pRow->pBlockData->nCol, p = taosbsearch(&(SColData){.cid = pTColumn->colId}, pRow->pBlockData->aColData, pRow->pBlockData->nColData,
sizeof(SBlockCol), tColDataCmprFn, TD_EQ); sizeof(SBlockCol), tColDataCmprFn, TD_EQ);
if (p) { if (p) {
pColData = (SColData *)p; pColData = (SColData *)p;
@ -705,29 +705,107 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
// SBlockData ====================================================== // SBlockData ======================================================
static int32_t tsdbBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { static int32_t tsdbBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
int32_t nRow = pBlockData->nRow;
TSDBKEY key = tsdbRowKey(pRow);
int32_t iColumn;
int32_t nColumn;
int32_t iColData;
SColVal cv;
// aKey ASSERT(pTSchema);
pBlockData->nRow++;
// TSDBKEY (todo)
pBlockData->aVersion[nRow] = key.version;
pBlockData->aTSKEY[nRow] = key.ts;
// other cols // other cols
iColumn = 1;
nColumn = pTSchema->numOfCols;
iColData = 0;
while (iColumn < nColumn || iColData < pBlockData->nColData) {
STColumn *pTColumn = NULL;
SColData *pColData = NULL;
if (iColumn < nColumn) {
pTColumn = &pTSchema->columns[iColumn];
}
if (iColData < pBlockData->nColData) {
pColData = &pBlockData->aColData[iColData];
}
if (pTColumn && pColData) {
if (pTColumn->colId == pColData->cid) {
tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv);
} else if (pTColumn->colId < pColData->cid) {
// add a new SColData, and append the column value cv to the SColData
} else {
// add a None to the column value
}
} else if (pTColumn) {
tsdbRowGetColVal(pRow, pTSchema, iColumn, &cv);
// add a new SColData, and append the column value cv to the SColData
} else {
iColData++;
}
}
return code; return code;
} }
static int32_t tsdbBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow) { static int32_t tsdbBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
int32_t nRow = pBlockData->nRow;
TSDBKEY key = tsdbRowKey(pRow);
int32_t iColData;
int32_t iColDataRow;
int32_t nColDataRow;
// aKey pBlockData->nRow++;
// aKey (TODO)
pBlockData->aVersion[nRow] = key.version;
pBlockData->aTSKEY[nRow] = key.ts;
// other cols // other cols
iColData = 0;
iColDataRow = 0;
nColDataRow = pRow->pBlockData->nColData;
while (iColData < pBlockData->nColData || iColDataRow < nColDataRow) {
SColData *pColData = NULL;
SColData *pColDataRow = NULL;
if (iColData < pBlockData->nColData) {
pColData = &pBlockData->aColData[iColData];
}
if (iColDataRow < nColDataRow) {
pColDataRow = &pRow->pBlockData->aColData[iColDataRow];
}
if (pColData && pColDataRow) {
if (pColData->cid == pColDataRow->cid) {
// TODO
} else if (pColData->cid < pColDataRow->cid) {
// TODO
} else {
// TODO
}
} else if (pColData) {
// TODO
} else {
// TODO
}
}
return code; return code;
} }
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = tsdbRowKey(pRow);
if (pRow->type == 0) { if (pRow->type == 0) {
ASSERT(pTSchema);
code = tsdbBlockDataAppendRow0(pBlockData, pRow, pTSchema); code = tsdbBlockDataAppendRow0(pBlockData, pRow, pTSchema);
} else if (pRow->type == 1) { } else if (pRow->type == 1) {
code = tsdbBlockDataAppendRow1(pBlockData, pRow); code = tsdbBlockDataAppendRow1(pBlockData, pRow);
@ -738,13 +816,13 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
void tBlockDataReset(SBlockData *pBlockData) { void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->nRow = 0; pBlockData->nRow = 0;
pBlockData->nCol = 0; pBlockData->nColData = 0;
} }
void tBlockDataClear(SBlockData *pBlockData) { void tBlockDataClear(SBlockData *pBlockData) {
tsdbFree((uint8_t *)pBlockData->aVersion); tsdbFree((uint8_t *)pBlockData->aVersion);
tsdbFree((uint8_t *)pBlockData->aTSKEY); tsdbFree((uint8_t *)pBlockData->aTSKEY);
for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) { for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
tsdbFree(pBlockData->aColData[iCol].pBitMap); tsdbFree(pBlockData->aColData[iCol].pBitMap);
tsdbFree(pBlockData->aColData[iCol].pData); tsdbFree(pBlockData->aColData[iCol].pData);
} }