diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index e270e09471..2e8a214a9d 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -33,9 +33,6 @@ typedef struct STSRow2 STSRow2; typedef struct STSRowBuilder STSRowBuilder; typedef struct SKVIdx SKVIdx; -#define TD_TP_ROW 0x0U -#define TD_KV_ROW 0x1U - // STSchema // STSRow2 @@ -48,8 +45,8 @@ void tTSchemaDestroy(STSchema *pTSchema); // STSRowBuilder int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols); -int32_t tTSRowBuilderClear(STSRowBuilder *pBuilder); -int32_t tTSRowBuilderReset(STSRowBuilder *pBuilder); +void tTSRowBuilderClear(STSRowBuilder *pBuilder); +void tTSRowBuilderReset(STSRowBuilder *pBuilder); int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData); int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow); @@ -85,11 +82,13 @@ struct STSRow2 { struct STSRowBuilder { STColumn *pTColumn; STSchema *pTSchema; + int32_t szKVBuf; + uint8_t *pKVBuf; + int32_t szTPBuf; + uint8_t *pTPBuf; int32_t nCols; int32_t kvVLen; - uint8_t *pKV; int32_t tpVLen; - uint8_t *pTuple; STSRow2 row; }; diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index c39ad3821f..ad020fe7d9 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -19,6 +19,8 @@ #include "tdatablock.h" #include "tlog.h" +#define TD_KV_ROW 0x1U + struct SKVIdx { int32_t cid; int32_t offset; @@ -86,16 +88,46 @@ int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t ncols, STSchema * void tTSchemaDestroy(STSchema *pTSchema) { taosMemoryFree(pTSchema); } int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, SSchema *pSchema, int32_t nCols) { - // TODO + int32_t kvBufLen; + int32_t tpBufLen; + uint8_t *p; + + if (tTSchemaCreate(sver, pSchema, nCols, &pBuilder->pTSchema) < 0) return -1; + + kvBufLen = sizeof(SKVIdx) * nCols + pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen; + tpBufLen = pBuilder->pTSchema->flen + pBuilder->pTSchema->vlen; + + if (pBuilder->szKVBuf < kvBufLen) { + p = taosMemoryRealloc(pBuilder->pKVBuf, kvBufLen); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pBuilder->pKVBuf = p; + pBuilder->szKVBuf = kvBufLen; + } + + if (pBuilder->szTPBuf < tpBufLen) { + p = taosMemoryRealloc(pBuilder->pTPBuf, tpBufLen); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pBuilder->pTPBuf = p; + pBuilder->szTPBuf = tpBufLen; + } + + tTSRowBuilderReset(pBuilder); + return 0; } -int32_t tTSRowBuilderClear(STSRowBuilder *pBuilder) { - // TODO - return 0; +void tTSRowBuilderClear(STSRowBuilder *pBuilder) { + taosMemoryFree(pBuilder->pKVBuf); + taosMemoryFree(pBuilder->pTPBuf); } -int32_t tTSRowBuilderReset(STSRowBuilder *pBuilder) { +void tTSRowBuilderReset(STSRowBuilder *pBuilder) { for (int32_t iCol = pBuilder->pTSchema->numOfCols - 1; iCol >= 0; iCol--) { pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol]; @@ -106,19 +138,29 @@ int32_t tTSRowBuilderReset(STSRowBuilder *pBuilder) { pBuilder->kvVLen = 0; pBuilder->tpVLen = 0; pBuilder->row.flags = 0; - - return 0; } int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pData, uint32_t nData) { + int32_t iCol; + uint8_t *p; + + // search column if (pBuilder->pTColumn->colId < cid) { - // right search + iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) + 1; + for (; iCol < pBuilder->pTSchema->numOfCols; iCol++) { + pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol]; + if (pBuilder->pTColumn->colId == cid) break; + } } else if (pBuilder->pTColumn->colId > cid) { - // left search + iCol = (pBuilder->pTColumn - pBuilder->pTSchema->columns) / sizeof(STColumn) - 1; + for (; iCol >= 0; iCol--) { + pBuilder->pTColumn = &pBuilder->pTSchema->columns[iCol]; + if (pBuilder->pTColumn->colId == cid) break; + } } - // check if val is set already - if (pBuilder->pTColumn->flags & COL_VAL_SET) { + // check + if (pBuilder->pTColumn->colId != cid || pBuilder->pTColumn->flags & COL_VAL_SET) { return -1; } @@ -128,7 +170,32 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, const uint8_t *pD pBuilder->row.ts = *(TSKEY *)pData; } else { if (pData) { - // set val + // ASSERT(!IS_NULL(pData)); + + // set tuple data + p = pBuilder->pTPBuf + pBuilder->pTColumn->offset; + if (IS_VAR_DATA_TYPE(pBuilder->pTColumn->type)) { + *(int32_t *)p = pBuilder->tpVLen; + + // encode the variant-length data + p = pBuilder->pTPBuf + pBuilder->pTSchema->flen + pBuilder->tpVLen; + pBuilder->tpVLen += tPutBinary(p, pData, nData); + } else { + memcpy(p, pData, nData); + } + + // set kv data + p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols; + ((SKVIdx *)p)->cid = cid; + ((SKVIdx *)p)->offset = pBuilder->kvVLen; + + p = pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->pTSchema->numOfCols + pBuilder->kvVLen; + if (IS_VAR_DATA_TYPE(pBuilder->pTColumn->type)) { + pBuilder->kvVLen += tPutBinary(p, pData, nData); + } else { + memcpy(p, pData, nData); + pBuilder->kvVLen += nData; + } } else { // set NULL val } @@ -149,18 +216,18 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) { pBuilder->row.flags |= TD_KV_ROW; pBuilder->row.ncols = pBuilder->nCols; pBuilder->row.nData = pBuilder->nCols * sizeof(SKVIdx) + pBuilder->kvVLen; - pBuilder->row.pData = pBuilder->pKV; + pBuilder->row.pData = pBuilder->pKVBuf; if (pBuilder->nCols < pBuilder->pTSchema->numOfCols) { - memmove(pBuilder->pKV + sizeof(SKVIdx) * pBuilder->nCols, - pBuilder->pKV + sizeof(SKVIdx) * pBuilder->pTSchema->numOfCols, pBuilder->kvVLen); + memmove(pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->nCols, + pBuilder->pKVBuf + sizeof(SKVIdx) * pBuilder->pTSchema->numOfCols, pBuilder->kvVLen); } } else { // encode as TD_TUPLE_ROW pBuilder->row.flags &= (~TD_KV_ROW); pBuilder->row.sver = pBuilder->pTSchema->version; pBuilder->row.nData = pBuilder->pTSchema->flen + pBuilder->tpVLen; - pBuilder->row.pData = pBuilder->pTuple; + pBuilder->row.pData = pBuilder->pTPBuf; if (pBuilder->nCols < pBuilder->pTSchema->numOfCols) { // set non-set cols as None