diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 22d15b0ad2..073d796717 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -46,7 +46,7 @@ void tTSchemaDestroy(STSchema *pTSchema); #define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1}) #define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)}) -int32_t tTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow); +int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow); int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow); void tTSRowFree(STSRow2 *pRow); void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); @@ -55,6 +55,13 @@ int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow); int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow); // STSRowBuilder +#define tsRowBuilderInit() ((STSRowBuilder){0}) +#define tsRowBuilderClear(B) \ + do { \ + if ((B)->pBuf) { \ + taosMemoryFree((B)->pBuf); \ + } \ + } while (0) // STag int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); @@ -64,8 +71,8 @@ char *tTagValToData(const STagVal *pTagVal, bool isJson); int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tTagToValArray(const STag *pTag, SArray **ppArray); -void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove -void debugCheckTags(STag *pTag); // TODO: remove +void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove +void debugCheckTags(STag *pTag); // TODO: remove // STRUCT ================= struct STColumn { @@ -100,17 +107,9 @@ struct STSRow2 { }; struct STSRowBuilder { - STSchema *pTSchema; - int32_t szBitMap1; - int32_t szBitMap2; - int32_t szKVBuf; - uint8_t *pKVBuf; - int32_t szTPBuf; - uint8_t *pTPBuf; - int32_t iCol; - int32_t vlenKV; - int32_t vlenTP; - STSRow2 row; + STSRow2 tsRow; + int32_t szBuf; + uint8_t *pBuf; }; struct SValue { diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index a08a20aadd..e976486223 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -142,191 +142,6 @@ static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) { } // STSRow2 ======================================================================== -static void tTupleTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow) { - int32_t nColVal = taosArrayGetSize(pArray); - STColumn *pTColumn; - SColVal *pColVal; - - ASSERT(nColVal > 0); - - pRow->sver = pTSchema->version; - - // ts - pTColumn = &pTSchema->columns[0]; - pColVal = (SColVal *)taosArrayGet(pArray, 0); - - ASSERT(pTColumn->colId == 0 && pColVal->cid == 0); - ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); - - pRow->ts = pColVal->value.ts; - - // other fields - int32_t iColVal = 1; - int32_t bidx; - uint32_t nv = 0; - uint8_t *pb = NULL; - uint8_t *pf = NULL; - uint8_t *pv = NULL; - uint8_t flags = 0; - for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { - bidx = iColumn - 1; - pTColumn = &pTSchema->columns[iColumn]; - - if (iColVal < nColVal) { - pColVal = (SColVal *)taosArrayGet(pArray, iColVal); - } else { - pColVal = NULL; - } - - if (pColVal) { - if (pColVal->cid == pTColumn->colId) { - iColVal++; - if (pColVal->isNone) { - goto _set_none; - } else if (pColVal->isNull) { - goto _set_null; - } else { - goto _set_value; - } - } else if (pColVal->cid > pTColumn->colId) { - goto _set_none; - } else { - ASSERT(0); - } - } else { - goto _set_none; - } - - _set_none: - flags |= TSROW_HAS_NONE; - // SET_BIT2(pb, bidx, 0); (todo) - continue; - - _set_null: - flags != TSROW_HAS_NULL; - // SET_BIT2(pb, bidx, 1); (todo) - continue; - - _set_value: - flags != TSROW_HAS_VAL; - // SET_BIT2(pb, bidx, 2); (todo) - if (IS_VAR_DATA_TYPE(pTColumn->type)) { - // nv += tPutColVal(pv ? pv + nv : pv, pColVal, pTColumn->type, 1); - } else { - // tPutColVal(pf ? pf + pTColumn->offset : pf, pColVal, pTColumn->type, 1); - } - continue; - } - - ASSERT(flags); - switch (flags & 0xf) { - case TSROW_HAS_NONE: - case TSROW_HAS_NULL: - pRow->nData = 0; - break; - case TSROW_HAS_VAL: - pRow->nData = pTSchema->flen + nv; - break; - case TSROW_HAS_NULL | TSROW_HAS_NONE: - pRow->nData = BIT1_SIZE(pTSchema->numOfCols - 1); - break; - case TSROW_HAS_VAL | TSROW_HAS_NONE: - case TSROW_HAS_VAL | TSROW_HAS_NULL: - pRow->nData = BIT1_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + nv; - break; - case TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE: - pRow->nData = BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + nv; - break; - default: - break; - } -} - -static void tMapTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 *pRow, uint8_t flags) { - int32_t nColVal = taosArrayGetSize(pArray); - STColumn *pTColumn; - SColVal *pColVal; - - ASSERT(nColVal > 0); - - pRow->sver = pTSchema->version; - pRow->flags = 0; - - // ts - pTColumn = &pTSchema->columns[0]; - pColVal = (SColVal *)taosArrayGet(pArray, 0); - - ASSERT(pTColumn->colId == 0 && pColVal->cid == 0); - ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); - - pRow->ts = pColVal->value.ts; - - // other fields - int32_t iColVal = 1; - uint32_t nv = 0; - uint8_t *pv = NULL; - uint8_t *pidx = NULL; - int16_t nCol = 0; - for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { - pTColumn = &pTSchema->columns[iColumn]; - - if (iColVal < nColVal) { - pColVal = (SColVal *)taosArrayGet(pArray, iColVal); - } else { - pColVal = NULL; - } - - if (pColVal) { - if (pColVal->cid == pTColumn->colId) { - iColVal++; - if (pColVal->isNone) { - goto _set_none; - } else if (pColVal->isNull) { - goto _set_null; - } else { - goto _set_value; - } - } else if (pColVal->cid > pTColumn->colId) { - goto _set_none; - } else { - ASSERT(0); - } - } else { - goto _set_none; - } - - _set_none: - pRow->flags |= TSROW_HAS_NONE; - continue; - - _set_null: - pRow->flags |= TSROW_HAS_NULL; - if (pidx) pidx[nCol++] = nv; - nv += tPutI16v(pv ? pv + nv : pv, -pTColumn->colId); - continue; - - _set_value: - pRow->flags != TSROW_HAS_VAL; - if (pidx) pidx[nCol++] = nv; - nv += tPutI16v(pv ? pv + nv : pv, pTColumn->colId); - nv += tPutValue(pv ? pv + nv : pv, &pColVal->value, pTColumn->type); - continue; - } - - if (nv <= UINT8_MAX) { - pRow->flags |= TSROW_KV_SMALL; - // small - } else if (nv <= UINT16_MAX) { - pRow->flags |= TSROW_KV_MID; - // mid - } else { - pRow->flags |= TSROW_KV_BIG; - // large - } - - ASSERT(flags == 0 || pRow->flags == flags); -} - static void tTSRowNewImpl(SArray *pArray, STSchema *pTSchema, STSRow2 *pRowT, STSRow2 *pRowK) { int32_t nColVal = taosArrayGetSize(pArray); STColumn *pTColumn; @@ -476,7 +291,7 @@ static void tTSRowNewImpl(SArray *pArray, STSchema *pTSchema, STSRow2 *pRowT, ST continue; } - // finalize (todo) + // finalize if (pRowT) { } if (pRowK) { @@ -501,7 +316,7 @@ static void tTSRowNewImpl(SArray *pArray, STSchema *pTSchema, STSRow2 *pRowT, ST } // try-decide-build -int32_t tTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) { +int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) { int32_t code = 0; STSRow2 rowT = {0}; STSRow2 rowK = {0}; @@ -510,29 +325,65 @@ int32_t tTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) { // try tTSRowNewImpl(pArray, pTSchema, &rowT, &rowK); + ASSERT(rowT.flags && rowK.flags); + + // alloc nData = TMIN(rowT.nData, rowK.nData); - *ppRow = (STSRow2 *)taosMemoryMalloc(sizeof(STSRow2)); - if (*ppRow == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - (*ppRow)->nData = 0; - (*ppRow)->pData = NULL; - if (nData) { - (*ppRow)->pData = taosMemoryMalloc(nData); - if ((*ppRow)->pData == NULL) { + if (pBuilder) { + // create from a builder + if (nData == 0) { + ASSERT((rowT.flags & 0xf) == TSROW_HAS_NONE || (rowT.flags & 0xf) == TSROW_HAS_NULL); + + pBuilder->tsRow.nData = 0; + pBuilder->tsRow.pData = NULL; + } else { + if (pBuilder->szBuf < nData) { + uint8_t *p = taosMemoryRealloc(pBuilder->pBuf, nData); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pBuilder->pBuf = p; + pBuilder->szBuf = nData; + } + + pBuilder->tsRow.nData = 0; + pBuilder->tsRow.pData = pBuilder->pBuf; + } + + *ppRow = &pBuilder->tsRow; + } else { + // create a new one + *ppRow = (STSRow2 *)taosMemoryMalloc(sizeof(STSRow2)); + if (*ppRow == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } + (*ppRow)->nData = 0; + (*ppRow)->pData = NULL; + if (nData) { + (*ppRow)->pData = taosMemoryMalloc(nData); + if ((*ppRow)->pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } } + // decide and build if (rowT.nData <= rowK.nData) { (*ppRow)->flags = rowT.flags; tTSRowNewImpl(pArray, pTSchema, *ppRow, NULL); + + ASSERT((*ppRow)->flags == rowT.flags); + ASSERT((*ppRow)->nData == rowT.nData); } else { (*ppRow)->flags = rowK.flags; tTSRowNewImpl(pArray, pTSchema, NULL, *ppRow); + + ASSERT((*ppRow)->flags == rowK.flags); + ASSERT((*ppRow)->nData == rowK.nData); } _exit: