diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index bf2ddc0920..af035d0d1f 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -45,10 +45,12 @@ void tTSchemaDestroy(STSchema *pTSchema); // STSRow2 int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow); int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow); +int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow); +void tTSRowFree(STSRow2 *pRow); int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); // STSRowBuilder -int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols); +int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema); void tTSRowBuilderClear(STSRowBuilder *pBuilder); void tTSRowBuilderReset(STSRowBuilder *pBuilder); int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index fbc2c2306d..3f179ac58a 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -32,6 +32,8 @@ typedef struct { #pragma pack(pop) #define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW) +#define BIT1_SIZE(n) (((n)-1) / 8 + 1) +#define BIT2_SIZE(n) (((n)-1) / 4 + 1) #define SET_BIT1(p, i, v) ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8))) #define SET_BIT2(p, i, v) ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4))) #define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1)) @@ -80,6 +82,31 @@ int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow) { return n; } +int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow) { + (*ppRow) = taosMemoryMalloc(sizeof(*pRow) + pRow->nData); + if (*ppRow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + (*ppRow)->ts = pRow->ts; + (*ppRow)->flags = pRow->flags; + (*ppRow)->sver = pRow->sver; + (*ppRow)->nData = pRow->nData; + if (pRow->nData) { + (*ppRow)->pData = (uint8_t *)(&(*ppRow)[1]); + memcpy((*ppRow)->pData, pRow->pData, pRow->nData); + } else { + (*ppRow)->pData = NULL; + } + + return 0; +} + +void tTSRowFree(STSRow2 *pRow) { + if (pRow) taosMemoryFree(pRow); +} + static FORCE_INLINE int kvRowCmprFn(const void *p1, const void *p2) { col_id_t cid = *(col_id_t *)p1; SKVIdx *pKVIdx = (SKVIdx *)p2; @@ -229,11 +256,11 @@ void tTSchemaDestroy(STSchema *pTSchema) { } // STSRowBuilder -int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols) { +int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema) { if (tTSchemaCreate(sver, pSchema, nCols, &pBuilder->pTSchema) < 0) return -1; - pBuilder->szBitMap1 = (nCols - 2) / 8 + 1; - pBuilder->szBitMap2 = (nCols - 2) / 4 + 1; + pBuilder->szBitMap1 = BIT1_SIZE(nCols - 1); + pBuilder->szBitMap2 = BIT2_SIZE(nCols - 1); pBuilder->szKVBuf = sizeof(STSKVRow) + sizeof(SKVIdx) * (nCols - 1) + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen; pBuilder->szTPBuf = pBuilder->szBitMap2 + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen; @@ -286,16 +313,16 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, u int32_t iCol; STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf; - // use interp search (todo) - if (pTColumn->colId > cid) { + // use interp search + if (pTColumn->colId < cid) { // right search for (iCol = pBuilder->iCol + 1; iCol < pBuilder->pTSchema->numOfCols; iCol++) { pTColumn = &pBuilder->pTSchema->columns[iCol]; - if (pTColumn->colId == cid) break; + if (pTColumn->colId >= cid) break; } - } else if (pTColumn->colId < cid) { + } else if (pTColumn->colId > cid) { // left search for (iCol = pBuilder->iCol - 1; iCol >= 0; iCol--) { pTColumn = &pBuilder->pTSchema->columns[iCol]; - if (pTColumn->colId == cid) break; + if (pTColumn->colId <= cid) break; } } @@ -391,7 +418,7 @@ static void setBitMap(uint8_t *p, STSchema *pTSchema, uint8_t flags) { case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE: if (pTColumn->flags & COL_SET_NULL) { SET_BIT2(p, bidx, (uint8_t)1); - } else if (pTColumn->flags & COL_SET_NULL) { + } else if (pTColumn->flags & COL_SET_VAL) { SET_BIT2(p, bidx, (uint8_t)2); } else { SET_BIT2(p, bidx, (uint8_t)0); @@ -412,14 +439,15 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) { int32_t nDataTP, nDataKV; uint32_t flags; STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf; + int32_t nCols = pBuilder->pTSchema->numOfCols; // error not set ts if (!COL_IS_SET(pBuilder->pTSchema->columns->flags)) { return -1; } - ASSERT(pTSKVRow->nCols < pBuilder->pTSchema->numOfCols); - if (pTSKVRow->nCols < pBuilder->pTSchema->numOfCols - 1) { + ASSERT(pTSKVRow->nCols < nCols); + if (pTSKVRow->nCols < nCols - 1) { pBuilder->row.flags |= TSROW_HAS_NONE; } @@ -453,13 +481,15 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) { if (nDataKV < nDataTP) { // generate KV row + ASSERT(pBuilder->row.flags & 0xf != TSROW_HAS_VAL); + pBuilder->row.flags |= TSROW_KV_ROW; pBuilder->row.nData = nDataKV; pBuilder->row.pData = pBuilder->pKVBuf; qsort(pTSKVRow->idx, pTSKVRow->nCols, sizeof(SKVIdx), tSKVIdxCmprFn); - if (pTSKVRow->nCols < pBuilder->pTSchema->numOfCols - 1) { - memmove(&pTSKVRow->idx[pTSKVRow->nCols], &pTSKVRow->idx[pBuilder->pTSchema->numOfCols - 1], pBuilder->vlenKV); + if (pTSKVRow->nCols < nCols - 1) { + memmove(&pTSKVRow->idx[pTSKVRow->nCols], &pTSKVRow->idx[nCols - 1], pBuilder->vlenKV); } } else { // generate TUPLE row @@ -472,14 +502,13 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) { if (flags == TSROW_HAS_VAL) { pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2; } else { - if (flags == TSROW_HAS_VAL) { - p = pBuilder->pTPBuf; + if (flags == TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE) { + pBuilder->row.pData = pBuilder->pTPBuf; } else { - p = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1; + pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1; } - setBitMap(p, pBuilder->pTSchema, flags); - pBuilder->row.pData = p; + setBitMap(pBuilder->row.pData, pBuilder->pTSchema, flags); } }