refact more code

This commit is contained in:
Hongze Cheng 2024-02-29 14:50:24 +08:00
parent a8f95f53b7
commit 4348781d3c
2 changed files with 420 additions and 473 deletions

View File

@ -1,4 +1,5 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
@ -47,8 +48,6 @@ typedef struct SValueColumn SValueColumn;
#define HAS_NULL ((uint8_t)0x2)
#define HAS_VALUE ((uint8_t)0x4)
#define HAS_MULTI_KEY ((uint8_t)0x8)
// bitmap ================================
const static uint8_t BIT1_MAP[8] = {0b11111110, 0b11111101, 0b11111011, 0b11110111,
0b11101111, 0b11011111, 0b10111111, 0b01111111};
@ -208,24 +207,27 @@ struct STSchema {
STColumn columns[];
};
/* TODO: here may change
/*
* 1. Tuple format:
* SRow + [bit map +] fix-length data + [var-length data +] [(type + offset) * numPrimaryKeyCols + fixedLen +
* numOfCols + numPrimaryKeyCols]
* SRow + [(type, offset) * numOfPKs +] [bit map +] fix-length data + [var-length data]
*
* 2. K-V format:
* SRow + numColsNotNone + u8/u16/u32 * numColsNotNone + ([-]cid [+ data]) * numColsNotNone + [(type + index) *
* numPrimaryKeyCols + numPrimaryKeyCols
* SRow + [(type, offset) * numOfPKs +] offset array + ([-]cid [+ data]) * numColsNotNone
*/
struct SRow {
uint8_t flag;
uint8_t rsv;
uint8_t numOfPKs;
uint16_t sver;
uint32_t len;
TSKEY ts;
uint8_t data[];
};
typedef struct {
int8_t type;
uint32_t offset;
} SPrimaryKeyIndex;
struct SValue {
int8_t type;
union {

View File

@ -44,7 +44,6 @@ typedef struct {
#define ROW_SET_BITMAP(PB, FLAG, IDX, VAL) \
do { \
if (PB) { \
switch (FLAG) { \
case (HAS_NULL | HAS_NONE): \
SET_BIT1(PB, IDX, VAL); \
@ -59,333 +58,349 @@ typedef struct {
SET_BIT2(PB, IDX, VAL); \
break; \
default: \
ASSERT(0); \
break; \
} \
} \
} while (0)
static int32_t tPutPrimaryKeyIndex(uint8_t *p, const SPrimaryKeyIndex *index) {
int32_t n = 0;
n += tPutI8(p ? p + n : p, index->type);
n += tPutU32v(p ? p + n : p, index->offset);
return n;
}
static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) {
int32_t n = 0;
n += tGetI8(p + n, &index->type);
n += tGetU32v(p + n, &index->offset);
return n;
}
typedef struct {
int32_t numOfNone;
int32_t numOfNull;
int32_t numOfValue;
int32_t numOfPKs;
int8_t flag;
// tuple
int8_t tupleFlag;
SPrimaryKeyIndex tupleIndices[TD_MAX_PK_COLS];
int32_t tuplePKSize; // primary key size
int32_t tupleBitmapSize; // bitmap size
int32_t tupleFixedSize; // fixed part size
int32_t tupleVarSize; // var part size
int32_t tupleRowSize;
// key-value
int8_t kvFlag;
SPrimaryKeyIndex kvIndices[TD_MAX_PK_COLS];
int32_t kvMaxOffset;
int32_t kvPKSize; // primary key size
int32_t kvIndexSize; // offset array size
int32_t kvPayloadSize; // payload size
int32_t kvRowSize;
} SRowBuildScanInfo;
static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo) { sinfo->numOfNone++; }
static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) {
sinfo->numOfNull++;
sinfo->kvMaxOffset = sinfo->kvPayloadSize;
sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId);
}
static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) {
bool isPK = ((pTColumn->flags & COL_IS_KEY) != 0);
if (isPK) {
ASSERTS(sinfo->numOfPKs < TD_MAX_PK_COLS, "too many primary keys");
sinfo->tupleIndices[sinfo->numOfPKs].type = colVal->value.type;
sinfo->tupleIndices[sinfo->numOfPKs].offset =
IS_VAR_DATA_TYPE(pTColumn->type) ? sinfo->tupleVarSize + sinfo->tupleFixedSize : pTColumn->offset;
sinfo->kvIndices[sinfo->numOfPKs].type = colVal->value.type;
sinfo->kvIndices[sinfo->numOfPKs].offset = sinfo->kvPayloadSize;
sinfo->numOfPKs++;
}
sinfo->kvMaxOffset = sinfo->kvPayloadSize;
if (IS_VAR_DATA_TYPE(colVal->value.type)) {
sinfo->tupleVarSize += tPutU32v(NULL, colVal->value.nData) // size
+ colVal->value.nData; // value
sinfo->kvPayloadSize += tPutI16v(NULL, colVal->cid) // colId
+ tPutU32v(NULL, colVal->value.nData) // size
+ colVal->value.nData; // value
} else {
sinfo->kvPayloadSize += tPutI16v(NULL, colVal->cid) // colId
+ tDataTypes[colVal->value.type].bytes; // value
}
sinfo->numOfValue++;
}
static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) {
int32_t colValIndex = 1;
int32_t numOfColVals = TARRAY_SIZE(colVals);
SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals);
ASSERT(numOfColVals > 0);
ASSERT(colValArray[0].cid == PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(colValArray[0].value.type == TSDB_DATA_TYPE_TIMESTAMP);
*sinfo = (SRowBuildScanInfo){
.tupleFixedSize = schema->flen,
};
// loop scan
for (int32_t i = 1; i < schema->numOfCols; i++) {
bool isPK = ((schema->columns[i].flags & COL_IS_KEY) != 0);
for (;;) {
if (colValIndex >= numOfColVals) {
ASSERTS(!isPK, "Primary key should not be NONE or NULL");
tRowBuildScanAddNone(sinfo);
break;
}
if (colValArray[colValIndex].cid == schema->columns[i].colId) {
ASSERT(colValArray[colValIndex].value.type == schema->columns[i].type);
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) {
tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i);
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) {
ASSERTS(!isPK, "Primary key should not be NULL or NONE");
tRowBuildScanAddNull(sinfo, schema->columns + i);
} else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) {
ASSERTS(!isPK, "Primary key should not be NULL or NONE");
tRowBuildScanAddNone(sinfo);
}
colValIndex++;
break;
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) {
ASSERTS(!isPK, "Primary key should not be NONE or NULL");
tRowBuildScanAddNone(sinfo);
break;
} else { // skip useless value
colValIndex++;
}
}
}
if (sinfo->numOfNone) {
sinfo->flag |= HAS_NONE;
}
if (sinfo->numOfNull) {
sinfo->flag |= HAS_NULL;
}
if (sinfo->numOfValue) {
sinfo->flag |= HAS_VALUE;
}
// Tuple
sinfo->tupleFlag = sinfo->flag;
if (sinfo->flag == HAS_NONE || sinfo->flag == HAS_NULL || sinfo->flag == HAS_VALUE) {
sinfo->tupleBitmapSize = 0;
} else if (sinfo->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
sinfo->tupleBitmapSize = BIT2_SIZE(schema->numOfCols - 1);
} else {
sinfo->tupleBitmapSize = BIT1_SIZE(schema->numOfCols - 1);
}
for (int32_t i = 0; i < sinfo->numOfPKs; i++) {
sinfo->tupleIndices[i].offset += sinfo->tupleBitmapSize;
sinfo->tuplePKSize += tPutPrimaryKeyIndex(NULL, sinfo->tupleIndices + i);
}
sinfo->tupleRowSize = sizeof(SRow) // SRow
+ sinfo->tuplePKSize // primary keys
+ sinfo->tupleBitmapSize // bitmap
+ sinfo->tupleFixedSize // fixed part
+ sinfo->tupleVarSize; // var part
// Key-Value
if (sinfo->kvMaxOffset <= UINT8_MAX) {
sinfo->kvFlag = (KV_FLG_LIT | sinfo->flag);
sinfo->kvIndexSize = sizeof(SKVIdx) + (sinfo->numOfNull + sinfo->numOfValue) * sizeof(uint8_t);
} else if (sinfo->kvMaxOffset <= UINT16_MAX) {
sinfo->kvFlag = (KV_FLG_MID | sinfo->flag);
sinfo->kvIndexSize = sizeof(SKVIdx) + (sinfo->numOfNull + sinfo->numOfValue) * sizeof(uint16_t);
} else {
sinfo->kvFlag = (KV_FLG_BIG | sinfo->flag);
sinfo->kvIndexSize = sizeof(SKVIdx) + (sinfo->numOfNull + sinfo->numOfValue) * sizeof(uint32_t);
}
for (int32_t i = 0; i < sinfo->numOfPKs; i++) {
sinfo->tupleIndices[i].offset += sinfo->kvIndexSize;
sinfo->kvPKSize += tPutPrimaryKeyIndex(NULL, sinfo->kvIndices + i);
}
sinfo->kvRowSize = sizeof(SRow) // SRow
+ sinfo->kvPKSize // primary keys
+ sinfo->kvIndexSize // index array
+ sinfo->kvPayloadSize; // payload
return 0;
}
static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema,
SRow **ppRow) {
SColVal *colValArray = (SColVal *)TARRAY_DATA(aColVal);
*ppRow = (SRow *)taosMemoryCalloc(1, sinfo->tupleRowSize);
if (*ppRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRow)->flag = sinfo->tupleFlag;
(*ppRow)->numOfPKs = sinfo->numOfPKs;
(*ppRow)->sver = schema->version;
(*ppRow)->len = sinfo->tupleRowSize;
(*ppRow)->ts = colValArray[0].value.val;
if (sinfo->tupleFlag == HAS_NONE || sinfo->tupleFlag == HAS_NULL) {
return 0;
}
uint8_t *primaryKeys = (*ppRow)->data;
uint8_t *bitmap = primaryKeys + sinfo->tuplePKSize;
uint8_t *fixed = bitmap + sinfo->tupleBitmapSize;
uint8_t *varlen = fixed + schema->flen;
// primary keys
for (int32_t i = 0; i < sinfo->numOfPKs; i++) {
primaryKeys += tPutPrimaryKeyIndex(primaryKeys, sinfo->tupleIndices + i);
}
ASSERT(primaryKeys == bitmap);
// bitmap + fixed + varlen
int32_t numOfColVals = TARRAY_SIZE(aColVal);
int32_t colValIndex = 1;
for (int32_t i = 1; i < schema->numOfCols; i++) {
for (;;) {
if (colValIndex >= numOfColVals) { // NONE
ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NONE);
break;
}
if (colValArray[colValIndex].cid == schema->columns[i].colId) {
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { // value
ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_VALUE);
if (IS_VAR_DATA_TYPE(schema->columns[i].type)) {
*(int32_t *)(fixed + schema->columns[i].offset) = varlen - fixed - sinfo->tupleFixedSize;
varlen += tPutU32v(varlen, colValArray[colValIndex].value.nData);
if (colValArray[colValIndex].value.nData) {
memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
varlen += colValArray[colValIndex].value.nData;
}
} else {
memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val,
tDataTypes[schema->columns[i].type].bytes);
}
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL
ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NULL);
} else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { // NONE
ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NONE);
}
colValIndex++;
break;
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) { // NONE
ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NONE);
break;
} else {
colValIndex++;
}
}
}
return 0;
}
static FORCE_INLINE void tRowBuildKVRowSetIndex(uint8_t flag, SKVIdx *indices, uint32_t offset) {
if (flag & KV_FLG_LIT) {
((uint8_t *)indices->idx)[indices->nCol] = (uint8_t)offset;
} else if (flag & KV_FLG_MID) {
((uint16_t *)indices->idx)[indices->nCol] = (uint16_t)offset;
} else {
((uint32_t *)indices->idx)[indices->nCol] = (uint32_t)offset;
}
indices->nCol++;
}
static int32_t tRowBuildKVRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema, SRow **ppRow) {
SColVal *colValArray = (SColVal *)TARRAY_DATA(aColVal);
*ppRow = (SRow *)taosMemoryCalloc(1, sinfo->kvRowSize);
if (*ppRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppRow)->flag = sinfo->kvFlag;
(*ppRow)->numOfPKs = sinfo->numOfPKs;
(*ppRow)->sver = schema->version;
(*ppRow)->len = sinfo->kvRowSize;
(*ppRow)->ts = colValArray[0].value.val;
ASSERT(sinfo->flag != HAS_NONE && sinfo->flag != HAS_NULL && sinfo->flag != HAS_VALUE);
uint8_t *primaryKeys = (*ppRow)->data;
SKVIdx *indices = (SKVIdx *)(primaryKeys + sinfo->kvPKSize);
uint8_t *payload = primaryKeys + sinfo->kvPKSize + sinfo->kvIndexSize;
uint32_t payloadSize = 0;
// primary keys
for (int32_t i = 0; i < sinfo->numOfPKs; i++) {
primaryKeys += tPutPrimaryKeyIndex(primaryKeys, sinfo->kvIndices + i);
}
ASSERT(primaryKeys == (uint8_t *)indices);
int32_t numOfColVals = TARRAY_SIZE(aColVal);
int32_t colValIndex = 1;
for (int32_t i = 1; i < schema->numOfCols; i++) {
for (;;) {
if (colValIndex >= numOfColVals) { // NONE
break;
}
if (colValArray[colValIndex].cid == schema->columns[i].colId) {
if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { // value
tRowBuildKVRowSetIndex(sinfo->kvFlag, indices, payloadSize);
if (IS_VAR_DATA_TYPE(schema->columns[i].type)) {
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData);
memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
payloadSize += colValArray[colValIndex].value.nData;
} else {
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
memcpy(payload + payloadSize, &colValArray[colValIndex].value.val,
tDataTypes[schema->columns[i].type].bytes);
payloadSize += tDataTypes[schema->columns[i].type].bytes;
}
} else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL
tRowBuildKVRowSetIndex(sinfo->kvFlag, indices, payloadSize);
payloadSize += tPutI16v(payload + payloadSize, -schema->columns[i].colId);
}
colValIndex++;
break;
} else if (colValArray[colValIndex].cid > schema->columns[i].colId) { // NONE
break;
} else {
colValIndex++;
}
}
}
return 0;
}
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) {
int32_t code = 0;
int32_t code;
SRowBuildScanInfo sinfo;
ASSERT(TARRAY_SIZE(aColVal) > 0);
ASSERT(((SColVal *)aColVal->pData)[0].cid == PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(((SColVal *)aColVal->pData)[0].value.type == TSDB_DATA_TYPE_TIMESTAMP);
code = tRowBuildScan(aColVal, pTSchema, &sinfo);
if (code) return code;
// scan ---------------
SRow *pRow = NULL;
SColVal *colVals = (SColVal *)TARRAY_DATA(aColVal);
uint8_t flag = 0;
int32_t iColVal = 1;
const int32_t nColVal = TARRAY_SIZE(aColVal);
SColVal *pColVal = (iColVal < nColVal) ? &colVals[iColVal] : NULL;
int32_t iTColumn = 1;
const STColumn *pTColumn = pTSchema->columns + iTColumn;
int32_t ntp = 0;
int32_t nkv = 0;
int32_t maxIdx = 0;
int32_t nIdx = 0;
uint8_t numPrimaryKeyCols = 0;
while (pTColumn) {
if (pColVal) {
if (pColVal->cid == pTColumn->colId) {
if (COL_VAL_IS_VALUE(pColVal)) { // VALUE
flag |= HAS_VALUE;
maxIdx = nkv;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
ntp = ntp + tPutU32v(NULL, pColVal->value.nData) + pColVal->value.nData;
nkv = nkv + tPutI16v(NULL, pTColumn->colId) + tPutU32v(NULL, pColVal->value.nData) + pColVal->value.nData;
if (sinfo.tupleRowSize <= sinfo.kvRowSize) {
code = tRowBuildTupleRow(aColVal, &sinfo, pTSchema, ppRow);
} else {
nkv = nkv + tPutI16v(NULL, pTColumn->colId) + pTColumn->bytes;
}
if (pTColumn->flags & COL_IS_KEY) {
flag |= HAS_MULTI_KEY;
numPrimaryKeyCols++;
ntp += (tPutI8(NULL, pTColumn->type) // type
+ tPutI32v(NULL, pTColumn->offset) // offset
);
nkv += (tPutI8(NULL, pTColumn->type) // type
+ tPutI32v(NULL, nIdx) // index
);
}
nIdx++;
} else if (COL_VAL_IS_NONE(pColVal)) { // NONE
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
flag |= HAS_NONE;
} else if (COL_VAL_IS_NULL(pColVal)) { // NULL
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
flag |= HAS_NULL;
maxIdx = nkv;
nkv += tPutI16v(NULL, -pTColumn->colId);
nIdx++;
} else {
if (ASSERTS(0, "invalid input")) {
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
}
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
} else if (pColVal->cid > pTColumn->colId) { // NONE
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
flag |= HAS_NONE;
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
} else {
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
}
} else { // NONE
ASSERT((pTColumn->flags & COL_IS_KEY) == 0);
flag |= HAS_NONE;
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
}
}
// compare ---------------
// Tuple Row Format
switch (flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) {
case HAS_NONE:
case HAS_NULL:
ntp = sizeof(SRow);
break;
case HAS_VALUE:
ntp = sizeof(SRow) + pTSchema->flen + ntp;
break;
case (HAS_NULL | HAS_NONE):
ntp = sizeof(SRow) + BIT1_SIZE(pTSchema->numOfCols - 1);
break;
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL):
ntp = sizeof(SRow) + BIT1_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntp;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
ntp = sizeof(SRow) + BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntp;
break;
default:
if (ASSERTS(0, "impossible")) {
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
}
if (flag & HAS_MULTI_KEY) {
ntp += (tPutI8(NULL, numPrimaryKeyCols) // numPrimaryKeyCols
+ tPutU32v(NULL, pTSchema->numOfCols) // numOfCols
+ tPutU32v(NULL, pTSchema->flen) // fixedLen
);
}
// Key-Value Row Format
if (maxIdx <= UINT8_MAX) {
nkv = sizeof(SRow) + sizeof(SKVIdx) + nIdx + nkv;
flag |= KV_FLG_LIT;
} else if (maxIdx <= UINT16_MAX) {
nkv = sizeof(SRow) + sizeof(SKVIdx) + (nIdx << 1) + nkv;
flag |= KV_FLG_MID;
} else {
nkv = sizeof(SRow) + sizeof(SKVIdx) + (nIdx << 2) + nkv;
flag |= KV_FLG_BIG;
}
if (flag & HAS_MULTI_KEY) {
nkv += tPutI8(NULL, numPrimaryKeyCols);
}
int32_t rowLength;
if (nkv < ntp) {
rowLength = nkv;
} else {
rowLength = ntp;
flag &= ((uint8_t)0x0f);
}
// alloc --------------
pRow = taosMemoryMalloc(rowLength);
if (NULL == pRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// build --------------
pColVal = &colVals[0];
pRow->flag = flag;
pRow->rsv = 0;
pRow->sver = pTSchema->version;
pRow->len = rowLength;
memcpy(&pRow->ts, &pColVal->value.val, sizeof(TSKEY));
if (flag == HAS_NONE || flag == HAS_NULL) {
goto _exit;
}
iColVal = 1;
pColVal = (iColVal < nColVal) ? &colVals[iColVal] : NULL;
iTColumn = 1;
pTColumn = pTSchema->columns + iTColumn;
uint8_t *pEnd = ((uint8_t *)pRow) + rowLength;
if (pRow->flag & HAS_MULTI_KEY) {
pEnd -= tPutI8(pEnd, numPrimaryKeyCols); // numPrimaryKeyCols
pEnd -= tPutU32v(pEnd, pTSchema->numOfCols); // numOfCols
pEnd -= tPutU32v(NULL, pTSchema->flen); // fixedLen
}
if (flag >> 4) { // KV
SKVIdx *pIdx = (SKVIdx *)pRow->data;
int32_t iIdx = 0;
int32_t nv = 0;
uint8_t *pv = NULL;
if (flag & KV_FLG_LIT) {
pv = pIdx->idx + nIdx;
} else if (flag & KV_FLG_MID) {
pv = pIdx->idx + (nIdx << 1);
} else {
pv = pIdx->idx + (nIdx << 2);
}
pIdx->nCol = nIdx;
while (pTColumn) {
if (pColVal) {
if (pColVal->cid == pTColumn->colId) {
if (COL_VAL_IS_VALUE(pColVal)) {
if (flag & KV_FLG_LIT) {
((uint8_t *)pIdx->idx)[iIdx] = (uint8_t)nv;
} else if (flag & KV_FLG_MID) {
((uint16_t *)pIdx->idx)[iIdx] = (uint16_t)nv;
} else {
((uint32_t *)pIdx->idx)[iIdx] = (uint32_t)nv;
}
nv += tPutI16v(pv + nv, pTColumn->colId);
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
nv += tPutU32v(pv + nv, pColVal->value.nData);
memcpy(pv + nv, pColVal->value.pData, pColVal->value.nData);
nv += pColVal->value.nData;
} else {
memcpy(pv + nv, &pColVal->value.val, pTColumn->bytes);
nv += pTColumn->bytes;
}
if (pRow->flag & HAS_MULTI_KEY) {
pEnd -= tPutI8(pEnd, pTColumn->type);
pEnd -= tPutI32v(pEnd, iIdx);
}
iIdx++;
} else if (COL_VAL_IS_NULL(pColVal)) {
if (flag & KV_FLG_LIT) {
((uint8_t *)pIdx->idx)[iIdx] = (uint8_t)nv;
} else if (flag & KV_FLG_MID) {
((uint16_t *)pIdx->idx)[iIdx] = (uint16_t)nv;
} else {
((uint32_t *)pIdx->idx)[iIdx] = (uint32_t)nv;
}
nv += tPutI16v(pv + nv, -pTColumn->colId);
iIdx++;
}
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
} else if (pColVal->cid > pTColumn->colId) { // NONE
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
} else {
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
}
} else { // NONE
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
}
}
} else { // TUPLE
uint8_t *pb = NULL;
uint8_t *pf = NULL;
uint8_t *pv = NULL;
int32_t nv = 0;
switch (flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) {
case (HAS_NULL | HAS_NONE):
pb = pRow->data;
break;
case HAS_VALUE:
pf = pRow->data;
pv = pf + pTSchema->flen;
break;
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL):
pb = pRow->data;
pf = pb + BIT1_SIZE(pTSchema->numOfCols - 1);
pv = pf + pTSchema->flen;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
pb = pRow->data;
pf = pb + BIT2_SIZE(pTSchema->numOfCols - 1);
pv = pf + pTSchema->flen;
break;
default:
if (ASSERTS(0, "impossible")) {
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
}
if (pb) {
if (flag == (HAS_VALUE | HAS_NULL | HAS_NONE)) {
memset(pb, 0, BIT2_SIZE(pTSchema->numOfCols - 1));
} else {
memset(pb, 0, BIT1_SIZE(pTSchema->numOfCols - 1));
}
}
// build impl
while (pTColumn) {
if (pColVal) {
if (pColVal->cid == pTColumn->colId) {
if (COL_VAL_IS_VALUE(pColVal)) { // VALUE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_VALUE);
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
*(int32_t *)(pf + pTColumn->offset) = nv;
nv += tPutU32v(pv + nv, pColVal->value.nData);
if (pColVal->value.nData) {
memcpy(pv + nv, pColVal->value.pData, pColVal->value.nData);
nv += pColVal->value.nData;
}
} else {
memcpy(pf + pTColumn->offset, &pColVal->value.val, TYPE_BYTES[pTColumn->type]);
}
if (pTColumn->flags & COL_IS_KEY) {
pEnd -= tPutI8(pEnd, pTColumn->type);
pEnd -= tPutI32v(pEnd, pTColumn->offset);
}
} else if (COL_VAL_IS_NONE(pColVal)) { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
} else { // NULL
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NULL);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
}
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
} else if (pColVal->cid > pTColumn->colId) { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
} else {
pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL;
}
} else { // NONE
ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE);
if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]);
pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL;
}
}
}
_exit:
if (code) {
*ppRow = NULL;
tRowDestroy(pRow);
} else {
*ppRow = pRow;
code = tRowBuildKVRow(aColVal, &sinfo, pTSchema, ppRow);
}
return code;
}
@ -414,9 +429,16 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
return 0;
}
SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
uint8_t *data = pRow->data;
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
data += tGetPrimaryKeyIndex(data, &indices[i]);
}
if (pRow->flag >> 4) { // KV Row
SKVIdx *pIdx = (SKVIdx *)pRow->data;
SKVIdx *pIdx = (SKVIdx *)data;
uint8_t *pv = NULL;
if (pRow->flag & KV_FLG_LIT) {
pv = pIdx->idx + pIdx->nCol;
} else if (pRow->flag & KV_FLG_MID) {
@ -470,56 +492,33 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
} else { // Tuple Row
if ((pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) == HAS_VALUE) {
pColVal->cid = pTColumn->colId;
pColVal->value.type = pTColumn->type;
pColVal->flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
uint8_t *pData = pRow->data + pTSchema->flen + *(int32_t *)(pRow->data + pTColumn->offset);
pData += tGetU32v(pData, &pColVal->value.nData);
if (pColVal->value.nData) {
pColVal->value.pData = pData;
} else {
pColVal->value.pData = NULL;
}
} else {
memcpy(&pColVal->value.val, pRow->data + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
}
} else {
uint8_t *pf;
uint8_t *pv;
uint8_t bv = BIT_FLG_VALUE;
uint8_t *bitmap = data;
uint8_t *fixed;
uint8_t *varlen;
uint8_t bit;
switch (pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) {
case (HAS_NULL | HAS_NONE):
bv = GET_BIT1(pRow->data, iCol - 1);
break;
case (HAS_VALUE | HAS_NONE):
bv = GET_BIT1(pRow->data, iCol - 1);
if (bv) bv++;
pf = pRow->data + BIT1_SIZE(pTSchema->numOfCols - 1);
pv = pf + pTSchema->flen;
break;
case (HAS_VALUE | HAS_NULL):
bv = GET_BIT1(pRow->data, iCol - 1);
bv++;
pf = pRow->data + BIT1_SIZE(pTSchema->numOfCols - 1);
pv = pf + pTSchema->flen;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
bv = GET_BIT2(pRow->data, iCol - 1);
pf = pRow->data + BIT2_SIZE(pTSchema->numOfCols - 1);
pv = pf + pTSchema->flen;
break;
default:
ASSERTS(0, "invalid row format");
return TSDB_CODE_INVALID_DATA_FMT;
}
if (pRow->flag == HAS_VALUE) {
fixed = bitmap;
bit = BIT_FLG_VALUE;
} else if (pRow->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) {
fixed = BIT2_SIZE(pTSchema->numOfCols - 1) + bitmap;
bit = GET_BIT2(bitmap, iCol - 1);
} else {
fixed = BIT1_SIZE(pTSchema->numOfCols - 1) + bitmap;
bit = GET_BIT1(bitmap, iCol - 1);
if (bv == BIT_FLG_NONE) {
if (pRow->flag == (HAS_NONE | HAS_VALUE)) {
if (bit) bit++;
} else if (pRow->flag == (HAS_NULL | HAS_VALUE)) {
bit++;
}
}
varlen = fixed + pTSchema->flen;
if (bit == BIT_FLG_NONE) {
*pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type);
return 0;
} else if (bv == BIT_FLG_NULL) {
} else if (bit == BIT_FLG_NULL) {
*pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
return 0;
}
@ -528,16 +527,10 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal)
pColVal->value.type = pTColumn->type;
pColVal->flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
uint8_t *pData = pv + *(int32_t *)(pf + pTColumn->offset);
pData += tGetU32v(pData, &pColVal->value.nData);
if (pColVal->value.nData) {
pColVal->value.pData = pData;
pColVal->value.pData = varlen + *(int32_t *)(fixed + pTColumn->offset);
pColVal->value.pData += tGetU32v(pColVal->value.pData, &pColVal->value.nData);
} else {
pColVal->value.pData = NULL;
}
} else {
memcpy(&pColVal->value.val, pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
}
memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]);
}
}
@ -699,9 +692,16 @@ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter) {
(pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) == HAS_NULL)
goto _exit;
uint8_t *data = pRow->data;
SPrimaryKeyIndex index;
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
data += tGetPrimaryKeyIndex(data, &index);
}
if (pRow->flag >> 4) {
pIter->iCol = 0;
pIter->pIdx = (SKVIdx *)pRow->data;
pIter->pIdx = (SKVIdx *)data;
if (pRow->flag & KV_FLG_LIT) {
pIter->pv = pIter->pIdx->idx + pIter->pIdx->nCol;
} else if (pRow->flag & KV_FLG_MID) {
@ -712,21 +712,21 @@ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter) {
} else {
switch (pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) {
case (HAS_NULL | HAS_NONE):
pIter->pb = pRow->data;
pIter->pb = data;
break;
case HAS_VALUE:
pIter->pf = pRow->data;
pIter->pf = data;
pIter->pv = pIter->pf + pTSchema->flen;
break;
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL):
pIter->pb = pRow->data;
pIter->pf = pRow->data + BIT1_SIZE(pTSchema->numOfCols - 1);
pIter->pb = data;
pIter->pf = data + BIT1_SIZE(pTSchema->numOfCols - 1);
pIter->pv = pIter->pf + pTSchema->flen;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
pIter->pb = pRow->data;
pIter->pf = pRow->data + BIT2_SIZE(pTSchema->numOfCols - 1);
pIter->pb = data;
pIter->pf = data + BIT2_SIZE(pTSchema->numOfCols - 1);
pIter->pv = pIter->pf + pTSchema->flen;
break;
default:
@ -940,7 +940,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *
uint8_t tflag = pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE);
switch (tflag) {
case HAS_VALUE:
pf = pRow->data;
pf = pRow->data; // TODO: fix here
pv = pf + pTSchema->flen;
break;
case (HAS_NULL | HAS_NONE):
@ -1152,88 +1152,33 @@ int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, in
}
}
void tRowGetKey(SRow *pRow, SRowKey *key) {
key->ts = pRow->ts;
if ((pRow->flag & HAS_MULTI_KEY) == 0) {
key->numOfPKs = 0;
void tRowGetKey(SRow *row, SRowKey *key) {
key->ts = row->ts;
key->numOfPKs = row->numOfPKs;
if (key->numOfPKs == 0) {
return;
}
ASSERT(row->numOfPKs <= TD_MAX_PK_COLS);
SPrimaryKeyIndex indices[TD_MAX_PK_COLS];
uint8_t *data = row->data;
for (int32_t i = 0; i < row->numOfPKs; i++) {
data += tGetPrimaryKeyIndex(data, &indices[i]);
}
// primary keys
for (int32_t i = 0; i < row->numOfPKs; i++) {
key->pks[i].type = indices[i].type;
if (IS_VAR_DATA_TYPE(indices[i].type)) {
key->pks[i].pData = data + indices[i].offset;
key->pks[i].pData += tGetU32v(key->pks[i].pData, &key->pks[i].nData);
} else {
uint8_t *pEnd = ((uint8_t *)pRow) + pRow->len;
pEnd -= tGetU8(pEnd, &key->numOfPKs);
if (pRow->flag >> 4) { // Key-Value format
SKVIdx *pKVIdx = (SKVIdx *)pRow->data;
uint8_t *pv;
if (pRow->flag & KV_FLG_LIT) {
pv = pKVIdx->idx + pKVIdx->nCol;
} else if (pRow->flag & KV_FLG_MID) {
pv = pKVIdx->idx + (pKVIdx->nCol << 1);
} else {
pv = pKVIdx->idx + (pKVIdx->nCol << 2);
}
for (uint8_t iKey = 0; iKey < key->numOfPKs; iKey++) {
int32_t index;
uint8_t *pData;
SValue *pValue = &key->pks[iKey];
pEnd -= tGetI8(pEnd, &pValue->type);
pEnd -= tGetI32v(pEnd, &index);
if (pRow->flag & KV_FLG_LIT) {
pData = pv + ((uint8_t *)pKVIdx->idx)[index];
} else if (pRow->flag & KV_FLG_MID) {
pData = pv + ((uint16_t *)pKVIdx->idx)[index];
} else {
pData = pv + ((uint32_t *)pKVIdx->idx)[index];
}
pData += tGetI16v(pData, NULL);
if (IS_VAR_DATA_TYPE(pValue->type)) {
pData += tGetU32v(pData, &pValue->nData);
pValue->pData = pData;
} else {
memcpy(&pValue->val, pData, TYPE_BYTES[pValue->type]);
}
}
} else { // Tuple format
uint8_t *pf;
uint8_t *pv;
uint32_t fixedLen;
uint32_t numOfCols;
pEnd -= tGetU32v(pEnd, &numOfCols);
pEnd -= tGetU32v(pEnd, &fixedLen);
switch (pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) {
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL):
pf = pRow->data + BIT1_SIZE(numOfCols - 1);
pv = pf + fixedLen;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
pf = pRow->data + BIT2_SIZE(numOfCols - 1);
pv = pf + fixedLen;
break;
default:
ASSERTS(0, "invalid row format");
}
for (uint8_t iKey = 0; iKey < key->numOfPKs; iKey++) {
int32_t offset;
SValue *pValue = &key->pks[iKey];
pEnd -= tGetI8(pEnd, &pValue->type);
pEnd -= tGetI32v(pEnd, &offset);
if (IS_VAR_DATA_TYPE(key->pks[iKey].type)) {
pValue->pData = pv + *(int32_t *)(pf + offset);
pValue->pData += tGetU32v(pValue->pData, &pValue->nData);
} else {
memcpy(&key->pks[iKey].val, pf + offset, TYPE_BYTES[key->pks[iKey].type]);
}
}
memcpy(&key->pks[i].val, data + indices[i].offset, tDataTypes[indices[i].type].bytes);
}
}
}