From 4348781d3c11399ea339465834a9249efbb604fd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 29 Feb 2024 14:50:24 +0800 Subject: [PATCH] refact more code --- include/common/tdataformat.h | 18 +- source/common/src/tdataformat.c | 875 +++++++++++++++----------------- 2 files changed, 420 insertions(+), 473 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index b288272b0d..c54ece771d 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -1,4 +1,5 @@ /* + * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify @@ -47,8 +48,6 @@ typedef struct SValueColumn SValueColumn; #define HAS_NULL ((uint8_t)0x2) #define HAS_VALUE ((uint8_t)0x4) -#define HAS_MULTI_KEY ((uint8_t)0x8) - // bitmap ================================ const static uint8_t BIT1_MAP[8] = {0b11111110, 0b11111101, 0b11111011, 0b11110111, 0b11101111, 0b11011111, 0b10111111, 0b01111111}; @@ -208,24 +207,27 @@ struct STSchema { STColumn columns[]; }; -/* TODO: here may change +/* * 1. Tuple format: - * SRow + [bit map +] fix-length data + [var-length data +] [(type + offset) * numPrimaryKeyCols + fixedLen + - * numOfCols + numPrimaryKeyCols] + * SRow + [(type, offset) * numOfPKs +] [bit map +] fix-length data + [var-length data] * * 2. K-V format: - * SRow + numColsNotNone + u8/u16/u32 * numColsNotNone + ([-]cid [+ data]) * numColsNotNone + [(type + index) * - * numPrimaryKeyCols + numPrimaryKeyCols】 + * SRow + [(type, offset) * numOfPKs +] offset array + ([-]cid [+ data]) * numColsNotNone */ struct SRow { uint8_t flag; - uint8_t rsv; + uint8_t numOfPKs; uint16_t sver; uint32_t len; TSKEY ts; uint8_t data[]; }; +typedef struct { + int8_t type; + uint32_t offset; +} SPrimaryKeyIndex; + struct SValue { int8_t type; union { diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 099067d234..005cb89d57 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -42,350 +42,365 @@ typedef struct { } SKVIdx; #pragma pack(pop) -#define ROW_SET_BITMAP(PB, FLAG, IDX, VAL) \ - do { \ - if (PB) { \ - switch (FLAG) { \ - case (HAS_NULL | HAS_NONE): \ - SET_BIT1(PB, IDX, VAL); \ - break; \ - case (HAS_VALUE | HAS_NONE): \ - SET_BIT1(PB, IDX, (VAL) ? (VAL)-1 : 0); \ - break; \ - case (HAS_VALUE | HAS_NULL): \ - SET_BIT1(PB, IDX, (VAL)-1); \ - break; \ - case (HAS_VALUE | HAS_NULL | HAS_NONE): \ - SET_BIT2(PB, IDX, VAL); \ - break; \ - default: \ - ASSERT(0); \ - break; \ - } \ - } \ +#define ROW_SET_BITMAP(PB, FLAG, IDX, VAL) \ + do { \ + switch (FLAG) { \ + case (HAS_NULL | HAS_NONE): \ + SET_BIT1(PB, IDX, VAL); \ + break; \ + case (HAS_VALUE | HAS_NONE): \ + SET_BIT1(PB, IDX, (VAL) ? (VAL)-1 : 0); \ + break; \ + case (HAS_VALUE | HAS_NULL): \ + SET_BIT1(PB, IDX, (VAL)-1); \ + break; \ + case (HAS_VALUE | HAS_NULL | HAS_NONE): \ + SET_BIT2(PB, IDX, VAL); \ + break; \ + default: \ + break; \ + } \ } while (0) -int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) { - int32_t code = 0; +static int32_t tPutPrimaryKeyIndex(uint8_t *p, const SPrimaryKeyIndex *index) { + int32_t n = 0; + n += tPutI8(p ? p + n : p, index->type); + n += tPutU32v(p ? p + n : p, index->offset); + return n; +} - ASSERT(TARRAY_SIZE(aColVal) > 0); - ASSERT(((SColVal *)aColVal->pData)[0].cid == PRIMARYKEY_TIMESTAMP_COL_ID); - ASSERT(((SColVal *)aColVal->pData)[0].value.type == TSDB_DATA_TYPE_TIMESTAMP); +static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) { + int32_t n = 0; + n += tGetI8(p + n, &index->type); + n += tGetU32v(p + n, &index->offset); + return n; +} - // scan --------------- - SRow *pRow = NULL; - SColVal *colVals = (SColVal *)TARRAY_DATA(aColVal); - uint8_t flag = 0; - int32_t iColVal = 1; - const int32_t nColVal = TARRAY_SIZE(aColVal); - SColVal *pColVal = (iColVal < nColVal) ? &colVals[iColVal] : NULL; - int32_t iTColumn = 1; - const STColumn *pTColumn = pTSchema->columns + iTColumn; - int32_t ntp = 0; - int32_t nkv = 0; - int32_t maxIdx = 0; - int32_t nIdx = 0; - uint8_t numPrimaryKeyCols = 0; - while (pTColumn) { - if (pColVal) { - if (pColVal->cid == pTColumn->colId) { - if (COL_VAL_IS_VALUE(pColVal)) { // VALUE - flag |= HAS_VALUE; - maxIdx = nkv; - if (IS_VAR_DATA_TYPE(pTColumn->type)) { - ntp = ntp + tPutU32v(NULL, pColVal->value.nData) + pColVal->value.nData; - nkv = nkv + tPutI16v(NULL, pTColumn->colId) + tPutU32v(NULL, pColVal->value.nData) + pColVal->value.nData; +typedef struct { + int32_t numOfNone; + int32_t numOfNull; + int32_t numOfValue; + int32_t numOfPKs; + int8_t flag; + + // tuple + int8_t tupleFlag; + SPrimaryKeyIndex tupleIndices[TD_MAX_PK_COLS]; + int32_t tuplePKSize; // primary key size + int32_t tupleBitmapSize; // bitmap size + int32_t tupleFixedSize; // fixed part size + int32_t tupleVarSize; // var part size + int32_t tupleRowSize; + + // key-value + int8_t kvFlag; + SPrimaryKeyIndex kvIndices[TD_MAX_PK_COLS]; + int32_t kvMaxOffset; + int32_t kvPKSize; // primary key size + int32_t kvIndexSize; // offset array size + int32_t kvPayloadSize; // payload size + int32_t kvRowSize; +} SRowBuildScanInfo; + +static FORCE_INLINE void tRowBuildScanAddNone(SRowBuildScanInfo *sinfo) { sinfo->numOfNone++; } + +static FORCE_INLINE void tRowBuildScanAddNull(SRowBuildScanInfo *sinfo, const STColumn *pTColumn) { + sinfo->numOfNull++; + sinfo->kvMaxOffset = sinfo->kvPayloadSize; + sinfo->kvPayloadSize += tPutI16v(NULL, -pTColumn->colId); +} + +static FORCE_INLINE void tRowBuildScanAddValue(SRowBuildScanInfo *sinfo, SColVal *colVal, const STColumn *pTColumn) { + bool isPK = ((pTColumn->flags & COL_IS_KEY) != 0); + + if (isPK) { + ASSERTS(sinfo->numOfPKs < TD_MAX_PK_COLS, "too many primary keys"); + sinfo->tupleIndices[sinfo->numOfPKs].type = colVal->value.type; + sinfo->tupleIndices[sinfo->numOfPKs].offset = + IS_VAR_DATA_TYPE(pTColumn->type) ? sinfo->tupleVarSize + sinfo->tupleFixedSize : pTColumn->offset; + sinfo->kvIndices[sinfo->numOfPKs].type = colVal->value.type; + sinfo->kvIndices[sinfo->numOfPKs].offset = sinfo->kvPayloadSize; + sinfo->numOfPKs++; + } + + sinfo->kvMaxOffset = sinfo->kvPayloadSize; + if (IS_VAR_DATA_TYPE(colVal->value.type)) { + sinfo->tupleVarSize += tPutU32v(NULL, colVal->value.nData) // size + + colVal->value.nData; // value + + sinfo->kvPayloadSize += tPutI16v(NULL, colVal->cid) // colId + + tPutU32v(NULL, colVal->value.nData) // size + + colVal->value.nData; // value + } else { + sinfo->kvPayloadSize += tPutI16v(NULL, colVal->cid) // colId + + tDataTypes[colVal->value.type].bytes; // value + } + sinfo->numOfValue++; +} + +static int32_t tRowBuildScan(SArray *colVals, const STSchema *schema, SRowBuildScanInfo *sinfo) { + int32_t colValIndex = 1; + int32_t numOfColVals = TARRAY_SIZE(colVals); + SColVal *colValArray = (SColVal *)TARRAY_DATA(colVals); + + ASSERT(numOfColVals > 0); + ASSERT(colValArray[0].cid == PRIMARYKEY_TIMESTAMP_COL_ID); + ASSERT(colValArray[0].value.type == TSDB_DATA_TYPE_TIMESTAMP); + + *sinfo = (SRowBuildScanInfo){ + .tupleFixedSize = schema->flen, + }; + + // loop scan + for (int32_t i = 1; i < schema->numOfCols; i++) { + bool isPK = ((schema->columns[i].flags & COL_IS_KEY) != 0); + + for (;;) { + if (colValIndex >= numOfColVals) { + ASSERTS(!isPK, "Primary key should not be NONE or NULL"); + tRowBuildScanAddNone(sinfo); + break; + } + + if (colValArray[colValIndex].cid == schema->columns[i].colId) { + ASSERT(colValArray[colValIndex].value.type == schema->columns[i].type); + + if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { + tRowBuildScanAddValue(sinfo, &colValArray[colValIndex], schema->columns + i); + } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { + ASSERTS(!isPK, "Primary key should not be NULL or NONE"); + tRowBuildScanAddNull(sinfo, schema->columns + i); + } else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { + ASSERTS(!isPK, "Primary key should not be NULL or NONE"); + tRowBuildScanAddNone(sinfo); + } + + colValIndex++; + break; + } else if (colValArray[colValIndex].cid > schema->columns[i].colId) { + ASSERTS(!isPK, "Primary key should not be NONE or NULL"); + tRowBuildScanAddNone(sinfo); + break; + } else { // skip useless value + colValIndex++; + } + } + } + + if (sinfo->numOfNone) { + sinfo->flag |= HAS_NONE; + } + if (sinfo->numOfNull) { + sinfo->flag |= HAS_NULL; + } + if (sinfo->numOfValue) { + sinfo->flag |= HAS_VALUE; + } + + // Tuple + sinfo->tupleFlag = sinfo->flag; + if (sinfo->flag == HAS_NONE || sinfo->flag == HAS_NULL || sinfo->flag == HAS_VALUE) { + sinfo->tupleBitmapSize = 0; + } else if (sinfo->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) { + sinfo->tupleBitmapSize = BIT2_SIZE(schema->numOfCols - 1); + } else { + sinfo->tupleBitmapSize = BIT1_SIZE(schema->numOfCols - 1); + } + for (int32_t i = 0; i < sinfo->numOfPKs; i++) { + sinfo->tupleIndices[i].offset += sinfo->tupleBitmapSize; + sinfo->tuplePKSize += tPutPrimaryKeyIndex(NULL, sinfo->tupleIndices + i); + } + sinfo->tupleRowSize = sizeof(SRow) // SRow + + sinfo->tuplePKSize // primary keys + + sinfo->tupleBitmapSize // bitmap + + sinfo->tupleFixedSize // fixed part + + sinfo->tupleVarSize; // var part + + // Key-Value + if (sinfo->kvMaxOffset <= UINT8_MAX) { + sinfo->kvFlag = (KV_FLG_LIT | sinfo->flag); + sinfo->kvIndexSize = sizeof(SKVIdx) + (sinfo->numOfNull + sinfo->numOfValue) * sizeof(uint8_t); + } else if (sinfo->kvMaxOffset <= UINT16_MAX) { + sinfo->kvFlag = (KV_FLG_MID | sinfo->flag); + sinfo->kvIndexSize = sizeof(SKVIdx) + (sinfo->numOfNull + sinfo->numOfValue) * sizeof(uint16_t); + } else { + sinfo->kvFlag = (KV_FLG_BIG | sinfo->flag); + sinfo->kvIndexSize = sizeof(SKVIdx) + (sinfo->numOfNull + sinfo->numOfValue) * sizeof(uint32_t); + } + for (int32_t i = 0; i < sinfo->numOfPKs; i++) { + sinfo->tupleIndices[i].offset += sinfo->kvIndexSize; + sinfo->kvPKSize += tPutPrimaryKeyIndex(NULL, sinfo->kvIndices + i); + } + sinfo->kvRowSize = sizeof(SRow) // SRow + + sinfo->kvPKSize // primary keys + + sinfo->kvIndexSize // index array + + sinfo->kvPayloadSize; // payload + + return 0; +} + +static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema, + SRow **ppRow) { + SColVal *colValArray = (SColVal *)TARRAY_DATA(aColVal); + + *ppRow = (SRow *)taosMemoryCalloc(1, sinfo->tupleRowSize); + if (*ppRow == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRow)->flag = sinfo->tupleFlag; + (*ppRow)->numOfPKs = sinfo->numOfPKs; + (*ppRow)->sver = schema->version; + (*ppRow)->len = sinfo->tupleRowSize; + (*ppRow)->ts = colValArray[0].value.val; + + if (sinfo->tupleFlag == HAS_NONE || sinfo->tupleFlag == HAS_NULL) { + return 0; + } + + uint8_t *primaryKeys = (*ppRow)->data; + uint8_t *bitmap = primaryKeys + sinfo->tuplePKSize; + uint8_t *fixed = bitmap + sinfo->tupleBitmapSize; + uint8_t *varlen = fixed + schema->flen; + + // primary keys + for (int32_t i = 0; i < sinfo->numOfPKs; i++) { + primaryKeys += tPutPrimaryKeyIndex(primaryKeys, sinfo->tupleIndices + i); + } + ASSERT(primaryKeys == bitmap); + + // bitmap + fixed + varlen + int32_t numOfColVals = TARRAY_SIZE(aColVal); + int32_t colValIndex = 1; + for (int32_t i = 1; i < schema->numOfCols; i++) { + for (;;) { + if (colValIndex >= numOfColVals) { // NONE + ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NONE); + break; + } + + if (colValArray[colValIndex].cid == schema->columns[i].colId) { + if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { // value + ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_VALUE); + + if (IS_VAR_DATA_TYPE(schema->columns[i].type)) { + *(int32_t *)(fixed + schema->columns[i].offset) = varlen - fixed - sinfo->tupleFixedSize; + varlen += tPutU32v(varlen, colValArray[colValIndex].value.nData); + if (colValArray[colValIndex].value.nData) { + memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); + varlen += colValArray[colValIndex].value.nData; + } } else { - nkv = nkv + tPutI16v(NULL, pTColumn->colId) + pTColumn->bytes; - } - - if (pTColumn->flags & COL_IS_KEY) { - flag |= HAS_MULTI_KEY; - numPrimaryKeyCols++; - ntp += (tPutI8(NULL, pTColumn->type) // type - + tPutI32v(NULL, pTColumn->offset) // offset - ); - nkv += (tPutI8(NULL, pTColumn->type) // type - + tPutI32v(NULL, nIdx) // index - ); - } - - nIdx++; - } else if (COL_VAL_IS_NONE(pColVal)) { // NONE - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); - flag |= HAS_NONE; - } else if (COL_VAL_IS_NULL(pColVal)) { // NULL - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); - flag |= HAS_NULL; - maxIdx = nkv; - nkv += tPutI16v(NULL, -pTColumn->colId); - nIdx++; - } else { - if (ASSERTS(0, "invalid input")) { - code = TSDB_CODE_INVALID_PARA; - goto _exit; + memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val, + tDataTypes[schema->columns[i].type].bytes); } + } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL + ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NULL); + } else if (COL_VAL_IS_NONE(&colValArray[colValIndex])) { // NONE + ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NONE); } - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; - pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; - } else if (pColVal->cid > pTColumn->colId) { // NONE - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); - flag |= HAS_NONE; - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; + colValIndex++; + break; + } else if (colValArray[colValIndex].cid > schema->columns[i].colId) { // NONE + ROW_SET_BITMAP(bitmap, sinfo->tupleFlag, i - 1, BIT_FLG_NONE); + break; } else { - pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; + colValIndex++; } - } else { // NONE - ASSERT((pTColumn->flags & COL_IS_KEY) == 0); - flag |= HAS_NONE; - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; } } - // compare --------------- - // Tuple Row Format - switch (flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) { - case HAS_NONE: - case HAS_NULL: - ntp = sizeof(SRow); - break; - case HAS_VALUE: - ntp = sizeof(SRow) + pTSchema->flen + ntp; - break; - case (HAS_NULL | HAS_NONE): - ntp = sizeof(SRow) + BIT1_SIZE(pTSchema->numOfCols - 1); - break; - case (HAS_VALUE | HAS_NONE): - case (HAS_VALUE | HAS_NULL): - ntp = sizeof(SRow) + BIT1_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntp; - break; - case (HAS_VALUE | HAS_NULL | HAS_NONE): - ntp = sizeof(SRow) + BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntp; - break; - default: - if (ASSERTS(0, "impossible")) { - code = TSDB_CODE_INVALID_PARA; - goto _exit; + return 0; +} + +static FORCE_INLINE void tRowBuildKVRowSetIndex(uint8_t flag, SKVIdx *indices, uint32_t offset) { + if (flag & KV_FLG_LIT) { + ((uint8_t *)indices->idx)[indices->nCol] = (uint8_t)offset; + } else if (flag & KV_FLG_MID) { + ((uint16_t *)indices->idx)[indices->nCol] = (uint16_t)offset; + } else { + ((uint32_t *)indices->idx)[indices->nCol] = (uint32_t)offset; + } + indices->nCol++; +} + +static int32_t tRowBuildKVRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, const STSchema *schema, SRow **ppRow) { + SColVal *colValArray = (SColVal *)TARRAY_DATA(aColVal); + + *ppRow = (SRow *)taosMemoryCalloc(1, sinfo->kvRowSize); + if (*ppRow == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRow)->flag = sinfo->kvFlag; + (*ppRow)->numOfPKs = sinfo->numOfPKs; + (*ppRow)->sver = schema->version; + (*ppRow)->len = sinfo->kvRowSize; + (*ppRow)->ts = colValArray[0].value.val; + + ASSERT(sinfo->flag != HAS_NONE && sinfo->flag != HAS_NULL && sinfo->flag != HAS_VALUE); + + uint8_t *primaryKeys = (*ppRow)->data; + SKVIdx *indices = (SKVIdx *)(primaryKeys + sinfo->kvPKSize); + uint8_t *payload = primaryKeys + sinfo->kvPKSize + sinfo->kvIndexSize; + uint32_t payloadSize = 0; + + // primary keys + for (int32_t i = 0; i < sinfo->numOfPKs; i++) { + primaryKeys += tPutPrimaryKeyIndex(primaryKeys, sinfo->kvIndices + i); + } + ASSERT(primaryKeys == (uint8_t *)indices); + + int32_t numOfColVals = TARRAY_SIZE(aColVal); + int32_t colValIndex = 1; + for (int32_t i = 1; i < schema->numOfCols; i++) { + for (;;) { + if (colValIndex >= numOfColVals) { // NONE + break; } - } - if (flag & HAS_MULTI_KEY) { - ntp += (tPutI8(NULL, numPrimaryKeyCols) // numPrimaryKeyCols - + tPutU32v(NULL, pTSchema->numOfCols) // numOfCols - + tPutU32v(NULL, pTSchema->flen) // fixedLen - ); - } - // Key-Value Row Format - if (maxIdx <= UINT8_MAX) { - nkv = sizeof(SRow) + sizeof(SKVIdx) + nIdx + nkv; - flag |= KV_FLG_LIT; - } else if (maxIdx <= UINT16_MAX) { - nkv = sizeof(SRow) + sizeof(SKVIdx) + (nIdx << 1) + nkv; - flag |= KV_FLG_MID; - } else { - nkv = sizeof(SRow) + sizeof(SKVIdx) + (nIdx << 2) + nkv; - flag |= KV_FLG_BIG; - } - if (flag & HAS_MULTI_KEY) { - nkv += tPutI8(NULL, numPrimaryKeyCols); - } - int32_t rowLength; - if (nkv < ntp) { - rowLength = nkv; - } else { - rowLength = ntp; - flag &= ((uint8_t)0x0f); - } - // alloc -------------- - pRow = taosMemoryMalloc(rowLength); - if (NULL == pRow) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - // build -------------- - pColVal = &colVals[0]; - - pRow->flag = flag; - pRow->rsv = 0; - pRow->sver = pTSchema->version; - pRow->len = rowLength; - memcpy(&pRow->ts, &pColVal->value.val, sizeof(TSKEY)); - - if (flag == HAS_NONE || flag == HAS_NULL) { - goto _exit; - } - - iColVal = 1; - pColVal = (iColVal < nColVal) ? &colVals[iColVal] : NULL; - iTColumn = 1; - pTColumn = pTSchema->columns + iTColumn; - uint8_t *pEnd = ((uint8_t *)pRow) + rowLength; - - if (pRow->flag & HAS_MULTI_KEY) { - pEnd -= tPutI8(pEnd, numPrimaryKeyCols); // numPrimaryKeyCols - pEnd -= tPutU32v(pEnd, pTSchema->numOfCols); // numOfCols - pEnd -= tPutU32v(NULL, pTSchema->flen); // fixedLen - } - if (flag >> 4) { // KV - SKVIdx *pIdx = (SKVIdx *)pRow->data; - int32_t iIdx = 0; - int32_t nv = 0; - uint8_t *pv = NULL; - if (flag & KV_FLG_LIT) { - pv = pIdx->idx + nIdx; - } else if (flag & KV_FLG_MID) { - pv = pIdx->idx + (nIdx << 1); - } else { - pv = pIdx->idx + (nIdx << 2); - } - pIdx->nCol = nIdx; - - while (pTColumn) { - if (pColVal) { - if (pColVal->cid == pTColumn->colId) { - if (COL_VAL_IS_VALUE(pColVal)) { - if (flag & KV_FLG_LIT) { - ((uint8_t *)pIdx->idx)[iIdx] = (uint8_t)nv; - } else if (flag & KV_FLG_MID) { - ((uint16_t *)pIdx->idx)[iIdx] = (uint16_t)nv; - } else { - ((uint32_t *)pIdx->idx)[iIdx] = (uint32_t)nv; - } - - nv += tPutI16v(pv + nv, pTColumn->colId); - if (IS_VAR_DATA_TYPE(pTColumn->type)) { - nv += tPutU32v(pv + nv, pColVal->value.nData); - memcpy(pv + nv, pColVal->value.pData, pColVal->value.nData); - nv += pColVal->value.nData; - } else { - memcpy(pv + nv, &pColVal->value.val, pTColumn->bytes); - nv += pTColumn->bytes; - } - - if (pRow->flag & HAS_MULTI_KEY) { - pEnd -= tPutI8(pEnd, pTColumn->type); - pEnd -= tPutI32v(pEnd, iIdx); - } - - iIdx++; - } else if (COL_VAL_IS_NULL(pColVal)) { - if (flag & KV_FLG_LIT) { - ((uint8_t *)pIdx->idx)[iIdx] = (uint8_t)nv; - } else if (flag & KV_FLG_MID) { - ((uint16_t *)pIdx->idx)[iIdx] = (uint16_t)nv; - } else { - ((uint32_t *)pIdx->idx)[iIdx] = (uint32_t)nv; - } - nv += tPutI16v(pv + nv, -pTColumn->colId); - iIdx++; + if (colValArray[colValIndex].cid == schema->columns[i].colId) { + if (COL_VAL_IS_VALUE(&colValArray[colValIndex])) { // value + tRowBuildKVRowSetIndex(sinfo->kvFlag, indices, payloadSize); + if (IS_VAR_DATA_TYPE(schema->columns[i].type)) { + payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid); + payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData); + memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); + payloadSize += colValArray[colValIndex].value.nData; + } else { + payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid); + memcpy(payload + payloadSize, &colValArray[colValIndex].value.val, + tDataTypes[schema->columns[i].type].bytes); + payloadSize += tDataTypes[schema->columns[i].type].bytes; } - - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; - pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; - } else if (pColVal->cid > pTColumn->colId) { // NONE - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; - } else { - pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; + } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL + tRowBuildKVRowSetIndex(sinfo->kvFlag, indices, payloadSize); + payloadSize += tPutI16v(payload + payloadSize, -schema->columns[i].colId); } - } else { // NONE - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; - } - } - } else { // TUPLE - uint8_t *pb = NULL; - uint8_t *pf = NULL; - uint8_t *pv = NULL; - int32_t nv = 0; - switch (flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) { - case (HAS_NULL | HAS_NONE): - pb = pRow->data; + colValIndex++; break; - case HAS_VALUE: - pf = pRow->data; - pv = pf + pTSchema->flen; + } else if (colValArray[colValIndex].cid > schema->columns[i].colId) { // NONE break; - case (HAS_VALUE | HAS_NONE): - case (HAS_VALUE | HAS_NULL): - pb = pRow->data; - pf = pb + BIT1_SIZE(pTSchema->numOfCols - 1); - pv = pf + pTSchema->flen; - break; - case (HAS_VALUE | HAS_NULL | HAS_NONE): - pb = pRow->data; - pf = pb + BIT2_SIZE(pTSchema->numOfCols - 1); - pv = pf + pTSchema->flen; - break; - default: - if (ASSERTS(0, "impossible")) { - code = TSDB_CODE_INVALID_PARA; - goto _exit; - } - } - - if (pb) { - if (flag == (HAS_VALUE | HAS_NULL | HAS_NONE)) { - memset(pb, 0, BIT2_SIZE(pTSchema->numOfCols - 1)); } else { - memset(pb, 0, BIT1_SIZE(pTSchema->numOfCols - 1)); - } - } - - // build impl - while (pTColumn) { - if (pColVal) { - if (pColVal->cid == pTColumn->colId) { - if (COL_VAL_IS_VALUE(pColVal)) { // VALUE - ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_VALUE); - - if (IS_VAR_DATA_TYPE(pTColumn->type)) { - *(int32_t *)(pf + pTColumn->offset) = nv; - nv += tPutU32v(pv + nv, pColVal->value.nData); - if (pColVal->value.nData) { - memcpy(pv + nv, pColVal->value.pData, pColVal->value.nData); - nv += pColVal->value.nData; - } - } else { - memcpy(pf + pTColumn->offset, &pColVal->value.val, TYPE_BYTES[pTColumn->type]); - } - - if (pTColumn->flags & COL_IS_KEY) { - pEnd -= tPutI8(pEnd, pTColumn->type); - pEnd -= tPutI32v(pEnd, pTColumn->offset); - } - } else if (COL_VAL_IS_NONE(pColVal)) { // NONE - ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE); - if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); - } else { // NULL - ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NULL); - if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); - } - - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; - pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; - } else if (pColVal->cid > pTColumn->colId) { // NONE - ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE); - if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; - } else { - pColVal = (++iColVal < nColVal) ? &colVals[iColVal] : NULL; - } - } else { // NONE - ROW_SET_BITMAP(pb, flag, iTColumn - 1, BIT_FLG_NONE); - if (pf) memset(pf + pTColumn->offset, 0, TYPE_BYTES[pTColumn->type]); - pTColumn = (++iTColumn < pTSchema->numOfCols) ? pTSchema->columns + iTColumn : NULL; + colValIndex++; } } } -_exit: - if (code) { - *ppRow = NULL; - tRowDestroy(pRow); + return 0; +} + +int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) { + int32_t code; + SRowBuildScanInfo sinfo; + + code = tRowBuildScan(aColVal, pTSchema, &sinfo); + if (code) return code; + + if (sinfo.tupleRowSize <= sinfo.kvRowSize) { + code = tRowBuildTupleRow(aColVal, &sinfo, pTSchema, ppRow); } else { - *ppRow = pRow; + code = tRowBuildKVRow(aColVal, &sinfo, pTSchema, ppRow); } return code; } @@ -414,9 +429,16 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) return 0; } + SPrimaryKeyIndex indices[TD_MAX_PK_COLS]; + uint8_t *data = pRow->data; + for (int32_t i = 0; i < pRow->numOfPKs; i++) { + data += tGetPrimaryKeyIndex(data, &indices[i]); + } + if (pRow->flag >> 4) { // KV Row - SKVIdx *pIdx = (SKVIdx *)pRow->data; + SKVIdx *pIdx = (SKVIdx *)data; uint8_t *pv = NULL; + if (pRow->flag & KV_FLG_LIT) { pv = pIdx->idx + pIdx->nCol; } else if (pRow->flag & KV_FLG_MID) { @@ -470,74 +492,45 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); } else { // Tuple Row - if ((pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) == HAS_VALUE) { - pColVal->cid = pTColumn->colId; - pColVal->value.type = pTColumn->type; - pColVal->flag = CV_FLAG_VALUE; - if (IS_VAR_DATA_TYPE(pTColumn->type)) { - uint8_t *pData = pRow->data + pTSchema->flen + *(int32_t *)(pRow->data + pTColumn->offset); - pData += tGetU32v(pData, &pColVal->value.nData); - if (pColVal->value.nData) { - pColVal->value.pData = pData; - } else { - pColVal->value.pData = NULL; - } - } else { - memcpy(&pColVal->value.val, pRow->data + pTColumn->offset, TYPE_BYTES[pTColumn->type]); - } + uint8_t *bitmap = data; + uint8_t *fixed; + uint8_t *varlen; + uint8_t bit; + + if (pRow->flag == HAS_VALUE) { + fixed = bitmap; + bit = BIT_FLG_VALUE; + } else if (pRow->flag == (HAS_NONE | HAS_NULL | HAS_VALUE)) { + fixed = BIT2_SIZE(pTSchema->numOfCols - 1) + bitmap; + bit = GET_BIT2(bitmap, iCol - 1); } else { - uint8_t *pf; - uint8_t *pv; - uint8_t bv = BIT_FLG_VALUE; + fixed = BIT1_SIZE(pTSchema->numOfCols - 1) + bitmap; + bit = GET_BIT1(bitmap, iCol - 1); - switch (pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) { - case (HAS_NULL | HAS_NONE): - bv = GET_BIT1(pRow->data, iCol - 1); - break; - case (HAS_VALUE | HAS_NONE): - bv = GET_BIT1(pRow->data, iCol - 1); - if (bv) bv++; - pf = pRow->data + BIT1_SIZE(pTSchema->numOfCols - 1); - pv = pf + pTSchema->flen; - break; - case (HAS_VALUE | HAS_NULL): - bv = GET_BIT1(pRow->data, iCol - 1); - bv++; - pf = pRow->data + BIT1_SIZE(pTSchema->numOfCols - 1); - pv = pf + pTSchema->flen; - break; - case (HAS_VALUE | HAS_NULL | HAS_NONE): - bv = GET_BIT2(pRow->data, iCol - 1); - pf = pRow->data + BIT2_SIZE(pTSchema->numOfCols - 1); - pv = pf + pTSchema->flen; - break; - default: - ASSERTS(0, "invalid row format"); - return TSDB_CODE_INVALID_DATA_FMT; + if (pRow->flag == (HAS_NONE | HAS_VALUE)) { + if (bit) bit++; + } else if (pRow->flag == (HAS_NULL | HAS_VALUE)) { + bit++; } + } + varlen = fixed + pTSchema->flen; - if (bv == BIT_FLG_NONE) { - *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); - return 0; - } else if (bv == BIT_FLG_NULL) { - *pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type); - return 0; - } + if (bit == BIT_FLG_NONE) { + *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); + return 0; + } else if (bit == BIT_FLG_NULL) { + *pColVal = COL_VAL_NULL(pTColumn->colId, pTColumn->type); + return 0; + } - pColVal->cid = pTColumn->colId; - pColVal->value.type = pTColumn->type; - pColVal->flag = CV_FLAG_VALUE; - if (IS_VAR_DATA_TYPE(pTColumn->type)) { - uint8_t *pData = pv + *(int32_t *)(pf + pTColumn->offset); - pData += tGetU32v(pData, &pColVal->value.nData); - if (pColVal->value.nData) { - pColVal->value.pData = pData; - } else { - pColVal->value.pData = NULL; - } - } else { - memcpy(&pColVal->value.val, pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]); - } + pColVal->cid = pTColumn->colId; + pColVal->value.type = pTColumn->type; + pColVal->flag = CV_FLAG_VALUE; + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + pColVal->value.pData = varlen + *(int32_t *)(fixed + pTColumn->offset); + pColVal->value.pData += tGetU32v(pColVal->value.pData, &pColVal->value.nData); + } else { + memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]); } } @@ -699,9 +692,16 @@ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter) { (pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) == HAS_NULL) goto _exit; + uint8_t *data = pRow->data; + + SPrimaryKeyIndex index; + for (int32_t i = 0; i < pRow->numOfPKs; i++) { + data += tGetPrimaryKeyIndex(data, &index); + } + if (pRow->flag >> 4) { pIter->iCol = 0; - pIter->pIdx = (SKVIdx *)pRow->data; + pIter->pIdx = (SKVIdx *)data; if (pRow->flag & KV_FLG_LIT) { pIter->pv = pIter->pIdx->idx + pIter->pIdx->nCol; } else if (pRow->flag & KV_FLG_MID) { @@ -712,21 +712,21 @@ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter) { } else { switch (pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) { case (HAS_NULL | HAS_NONE): - pIter->pb = pRow->data; + pIter->pb = data; break; case HAS_VALUE: - pIter->pf = pRow->data; + pIter->pf = data; pIter->pv = pIter->pf + pTSchema->flen; break; case (HAS_VALUE | HAS_NONE): case (HAS_VALUE | HAS_NULL): - pIter->pb = pRow->data; - pIter->pf = pRow->data + BIT1_SIZE(pTSchema->numOfCols - 1); + pIter->pb = data; + pIter->pf = data + BIT1_SIZE(pTSchema->numOfCols - 1); pIter->pv = pIter->pf + pTSchema->flen; break; case (HAS_VALUE | HAS_NULL | HAS_NONE): - pIter->pb = pRow->data; - pIter->pf = pRow->data + BIT2_SIZE(pTSchema->numOfCols - 1); + pIter->pb = data; + pIter->pf = data + BIT2_SIZE(pTSchema->numOfCols - 1); pIter->pv = pIter->pf + pTSchema->flen; break; default: @@ -940,7 +940,7 @@ static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData * uint8_t tflag = pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE); switch (tflag) { case HAS_VALUE: - pf = pRow->data; + pf = pRow->data; // TODO: fix here pv = pf + pTSchema->flen; break; case (HAS_NULL | HAS_NONE): @@ -1152,88 +1152,33 @@ int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, in } } -void tRowGetKey(SRow *pRow, SRowKey *key) { - key->ts = pRow->ts; - if ((pRow->flag & HAS_MULTI_KEY) == 0) { - key->numOfPKs = 0; - } else { - uint8_t *pEnd = ((uint8_t *)pRow) + pRow->len; +void tRowGetKey(SRow *row, SRowKey *key) { + key->ts = row->ts; + key->numOfPKs = row->numOfPKs; - pEnd -= tGetU8(pEnd, &key->numOfPKs); + if (key->numOfPKs == 0) { + return; + } - if (pRow->flag >> 4) { // Key-Value format - SKVIdx *pKVIdx = (SKVIdx *)pRow->data; - uint8_t *pv; + ASSERT(row->numOfPKs <= TD_MAX_PK_COLS); - if (pRow->flag & KV_FLG_LIT) { - pv = pKVIdx->idx + pKVIdx->nCol; - } else if (pRow->flag & KV_FLG_MID) { - pv = pKVIdx->idx + (pKVIdx->nCol << 1); - } else { - pv = pKVIdx->idx + (pKVIdx->nCol << 2); - } + SPrimaryKeyIndex indices[TD_MAX_PK_COLS]; - for (uint8_t iKey = 0; iKey < key->numOfPKs; iKey++) { - int32_t index; - uint8_t *pData; - SValue *pValue = &key->pks[iKey]; + uint8_t *data = row->data; - pEnd -= tGetI8(pEnd, &pValue->type); - pEnd -= tGetI32v(pEnd, &index); + for (int32_t i = 0; i < row->numOfPKs; i++) { + data += tGetPrimaryKeyIndex(data, &indices[i]); + } - if (pRow->flag & KV_FLG_LIT) { - pData = pv + ((uint8_t *)pKVIdx->idx)[index]; - } else if (pRow->flag & KV_FLG_MID) { - pData = pv + ((uint16_t *)pKVIdx->idx)[index]; - } else { - pData = pv + ((uint32_t *)pKVIdx->idx)[index]; - } + // primary keys + for (int32_t i = 0; i < row->numOfPKs; i++) { + key->pks[i].type = indices[i].type; - pData += tGetI16v(pData, NULL); - if (IS_VAR_DATA_TYPE(pValue->type)) { - pData += tGetU32v(pData, &pValue->nData); - pValue->pData = pData; - } else { - memcpy(&pValue->val, pData, TYPE_BYTES[pValue->type]); - } - } - } else { // Tuple format - uint8_t *pf; - uint8_t *pv; - uint32_t fixedLen; - uint32_t numOfCols; - - pEnd -= tGetU32v(pEnd, &numOfCols); - pEnd -= tGetU32v(pEnd, &fixedLen); - - switch (pRow->flag & (HAS_VALUE | HAS_NULL | HAS_NONE)) { - case (HAS_VALUE | HAS_NONE): - case (HAS_VALUE | HAS_NULL): - pf = pRow->data + BIT1_SIZE(numOfCols - 1); - pv = pf + fixedLen; - break; - case (HAS_VALUE | HAS_NULL | HAS_NONE): - pf = pRow->data + BIT2_SIZE(numOfCols - 1); - pv = pf + fixedLen; - break; - default: - ASSERTS(0, "invalid row format"); - } - - for (uint8_t iKey = 0; iKey < key->numOfPKs; iKey++) { - int32_t offset; - SValue *pValue = &key->pks[iKey]; - - pEnd -= tGetI8(pEnd, &pValue->type); - pEnd -= tGetI32v(pEnd, &offset); - - if (IS_VAR_DATA_TYPE(key->pks[iKey].type)) { - pValue->pData = pv + *(int32_t *)(pf + offset); - pValue->pData += tGetU32v(pValue->pData, &pValue->nData); - } else { - memcpy(&key->pks[iKey].val, pf + offset, TYPE_BYTES[key->pks[iKey].type]); - } - } + if (IS_VAR_DATA_TYPE(indices[i].type)) { + key->pks[i].pData = data + indices[i].offset; + key->pks[i].pData += tGetU32v(key->pks[i].pData, &key->pks[i].nData); + } else { + memcpy(&key->pks[i].val, data + indices[i].offset, tDataTypes[indices[i].type].bytes); } } }