This commit is contained in:
Hongze Cheng 2022-06-02 05:06:11 +00:00
parent 554c66d78c
commit 89d53fb60a
1 changed files with 190 additions and 11 deletions

View File

@ -242,7 +242,7 @@ static void tTupleTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) {
} }
} }
static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) { static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow, uint8_t flags) {
int32_t nColVal = taosArrayGetSize(pArray); int32_t nColVal = taosArrayGetSize(pArray);
STColumn *pTColumn; STColumn *pTColumn;
SColVal *pColVal; SColVal *pColVal;
@ -250,6 +250,7 @@ static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) {
ASSERT(nColVal > 0); ASSERT(nColVal > 0);
pRow->sver = pTSchema->version; pRow->sver = pTSchema->version;
pRow->flags = 0;
// ts // ts
pTColumn = &pTSchema->columns[0]; pTColumn = &pTSchema->columns[0];
@ -265,7 +266,6 @@ static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) {
uint32_t nv = 0; uint32_t nv = 0;
uint8_t *pv = NULL; uint8_t *pv = NULL;
uint8_t *pidx = NULL; uint8_t *pidx = NULL;
uint8_t flags = 0;
int16_t nCol = 0; int16_t nCol = 0;
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
pTColumn = &pTSchema->columns[iColumn]; pTColumn = &pTSchema->columns[iColumn];
@ -296,29 +296,208 @@ static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) {
} }
_set_none: _set_none:
flags |= TSROW_HAS_NONE; pRow->flags |= TSROW_HAS_NONE;
continue; continue;
_set_null: _set_null:
flags != TSROW_HAS_NULL; pRow->flags |= TSROW_HAS_NULL;
pidx[nCol++] = nv; if (pidx) pidx[nCol++] = nv;
// nv += tPutColVal(pv ? pv + nv : pv, pColVal, pTColumn->type, 0); nv += tPutI16v(pv ? pv + nv : pv, -pTColumn->colId);
continue; continue;
_set_value: _set_value:
flags != TSROW_HAS_VAL; pRow->flags != TSROW_HAS_VAL;
pidx[nCol++] = nv; if (pidx) pidx[nCol++] = nv;
// nv += tPutColVal(pv ? pv + nv : pv, pColVal, pTColumn->type, 0); nv += tPutI16v(pv ? pv + nv : pv, pTColumn->colId);
nv += tPutValue(pv ? pv + nv : pv, &pColVal->value, pTColumn->type);
continue; continue;
} }
if (nv <= UINT8_MAX) { if (nv <= UINT8_MAX) {
pRow->flags |= TSROW_KV_SMALL;
// small // small
} else if (nv <= UINT16_MAX) { } else if (nv <= UINT16_MAX) {
pRow->flags |= TSROW_KV_MID;
// mid // mid
} else { } else {
pRow->flags |= TSROW_KV_BIG;
// large // large
} }
ASSERT(flags == 0 || pRow->flags == flags);
}
static void tTSRowNewImpl(SArray *pArray, STSchema *pTSchema, STSRow2 *pRowT, STSRow2 *pRowK) {
int32_t nColVal = taosArrayGetSize(pArray);
STColumn *pTColumn;
SColVal *pColVal;
uint8_t tflags = 0;
uint8_t kflags = 0;
ASSERT(nColVal > 0);
// prepare
uint8_t *pb = NULL;
uint8_t *pf = NULL;
uint8_t *ptv = NULL;
uint32_t ntv = 0;
int8_t isBit1 = 0;
STSKVRow kvRow = {0};
STSKVRow *pTSKVRow = &kvRow;
uint8_t *pidx = NULL;
uint8_t *pkv = NULL;
uint32_t nkv = 0;
uint32_t maxIdx = 0;
if (pRowT) {
tflags = pRowT->flags;
pRowT->flags = 0;
pRowT->sver = pTSchema->version;
pRowT->nData = 0;
if (tflags) { // build
} else { // try
}
}
if (pRowK) {
kflags = pRowK->flags;
pRowK->flags = 0;
pRowK->sver = pTSchema->version;
pRowK->nData = 0;
if (kflags) { // build
} else { // try
}
}
// ts
pTColumn = &pTSchema->columns[0];
pColVal = (SColVal *)taosArrayGet(pArray, 0);
ASSERT(pTColumn->colId == 0 && pColVal->cid == 0);
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
if (pRowT) {
pRowT->ts = pColVal->value.ts;
}
if (pRowK) {
pRowK->ts = pColVal->value.ts;
}
// other
int32_t iColVal = 1;
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
pTColumn = &pTSchema->columns[iColumn];
if (iColVal < nColVal) {
pColVal = (SColVal *)taosArrayGet(pArray, iColVal);
} else {
pColVal = NULL;
}
if (pColVal) {
if (pColVal->cid == pTColumn->colId) {
iColVal++;
if (pColVal->isNone) {
goto _set_none;
} else if (pColVal->isNull) {
goto _set_null;
} else {
goto _set_value;
}
} else if (pColVal->cid > pTColumn->colId) {
goto _set_none;
} else {
ASSERT(0);
}
} else {
goto _set_none;
}
_set_none:
if (pRowT) {
pRowT->flags |= TSROW_HAS_NONE;
// TODO
}
if (pRowK) {
pRowK->flags |= TSROW_HAS_NONE;
}
continue;
_set_null:
if (pRowT) {
pRowT->flags |= TSROW_HAS_NULL;
// TODO
}
if (pRowK) {
pRowK->flags |= TSROW_HAS_NULL;
if (kflags) {
if (kflags & TSROW_KV_SMALL) {
((uint8_t *)pidx)[pTSKVRow->nCols] = nkv;
} else if (kflags & TSROW_KV_MID) {
((uint16_t *)pidx)[pTSKVRow->nCols] = nkv;
} else {
((uint32_t *)pidx)[pTSKVRow->nCols] = nkv;
}
}
maxIdx = nkv;
pTSKVRow->nCols++;
nkv += tPutI16v(pkv ? pkv + nkv : pkv, -pTColumn->colId);
}
continue;
_set_value:
if (pRowT) {
pRowT->flags |= TSROW_HAS_VAL;
// TODO
}
if (pRowK) {
pRowK->flags |= TSROW_HAS_VAL;
if (kflags) {
if (kflags & TSROW_KV_SMALL) {
((uint8_t *)pidx)[pTSKVRow->nCols] = nkv;
} else if (kflags & TSROW_KV_MID) {
((uint16_t *)pidx)[pTSKVRow->nCols] = nkv;
} else {
((uint32_t *)pidx)[pTSKVRow->nCols] = nkv;
}
}
maxIdx = nkv;
pTSKVRow->nCols++;
nkv += tPutI16v(pkv ? pkv + nkv : pkv, pTColumn->colId);
nkv += tPutValue(pkv ? pkv + nkv : pkv, &pColVal->value, pTColumn->type);
}
continue;
}
// finalize (todo)
if (pRowT) {
}
if (pRowK) {
if (pTSKVRow->nCols == 0) {
pRowK->nData = 0;
pRowK->flags |= TSROW_KV_SMALL;
} else {
if (maxIdx <= UINT8_MAX) {
pRowK->flags |= TSROW_KV_SMALL;
pRowK->nData = sizeof(STSKVRow) + sizeof(uint8_t) * pTSKVRow->nCols;
} else if (maxIdx <= UINT16_MAX) {
pRowK->flags |= TSROW_KV_MID;
pRowK->nData = sizeof(STSKVRow) + sizeof(uint16_t) * pTSKVRow->nCols;
} else {
pRowK->flags |= TSROW_KV_BIG;
pRowK->nData = sizeof(STSKVRow) + sizeof(uint32_t) * pTSKVRow->nCols;
}
}
ASSERT(kflags == 0 || pRowK->flags == kflags);
}
} }
// try-decide-build // try-decide-build
@ -329,13 +508,13 @@ int32_t tTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) {
// try // try
tTupleTSRowNew(pArray, pTSchema, &rowT); tTupleTSRowNew(pArray, pTSchema, &rowT);
tMapTSRowNew(pArray, pTSchema, &rowM); tMapTSRowNew(pArray, pTSchema, &rowM, 0);
// decide & build // decide & build
if (rowT.nData <= rowM.nData) { if (rowT.nData <= rowM.nData) {
tTupleTSRowNew(pArray, pTSchema, &rowT); tTupleTSRowNew(pArray, pTSchema, &rowT);
} else { } else {
tMapTSRowNew(pArray, pTSchema, &rowM); tMapTSRowNew(pArray, pTSchema, &rowM, rowM.flags);
} }
return code; return code;