diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index eaa8ac5cc4..1011b90ce8 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -64,18 +64,22 @@ int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type); int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type); int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type); -// STSRow2 +// SColVal #define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1}) #define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1}) #define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)}) +// STSRow2 +#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL) +#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL) + int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow); int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow); void tTSRowFree(STSRow2 *pRow); void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray); int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow); -int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow); +int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow); // STSRowBuilder #define tsRowBuilderInit() ((STSRowBuilder){0}) @@ -97,7 +101,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tTagToValArray(const STag *pTag, SArray **ppArray); void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove -int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, void* pMsgBuf); +int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf); // STRUCT ================= struct STColumn { @@ -123,16 +127,16 @@ struct STSchema { #define TSROW_KV_SMALL ((uint8_t)0x10U) #define TSROW_KV_MID ((uint8_t)0x20U) #define TSROW_KV_BIG ((uint8_t)0x40U) +#pragma pack(push, 1) struct STSRow2 { - TSKEY ts; - uint8_t flags; - int32_t sver; - uint32_t nData; - uint8_t *pData; + TSKEY ts; + uint8_t flags; + uint8_t data[]; }; +#pragma pack(pop) struct STSRowBuilder { - STSRow2 tsRow; + // STSRow2 tsRow; int32_t szBuf; uint8_t *pBuf; }; @@ -226,50 +230,6 @@ struct STag { memcpy(varDataVal(x), (str), (_size)); \ } while (0); -// ----------------- TSDB COLUMN DEFINITION - -#define colType(col) ((col)->type) -#define colFlags(col) ((col)->flags) -#define colColId(col) ((col)->colId) -#define colBytes(col) ((col)->bytes) -#define colOffset(col) ((col)->offset) - -#define colSetType(col, t) (colType(col) = (t)) -#define colSetFlags(col, f) (colFlags(col) = (f)) -#define colSetColId(col, id) (colColId(col) = (id)) -#define colSetBytes(col, b) (colBytes(col) = (b)) -#define colSetOffset(col, o) (colOffset(col) = (o)) - -// ----------------- TSDB SCHEMA DEFINITION - -#define schemaNCols(s) ((s)->numOfCols) -#define schemaVersion(s) ((s)->version) -#define schemaTLen(s) ((s)->tlen) -#define schemaFLen(s) ((s)->flen) -#define schemaVLen(s) ((s)->vlen) -#define schemaColAt(s, i) ((s)->columns + i) -#define tdFreeSchema(s) taosMemoryFreeClear((s)) - -STSchema *tdDupSchema(const STSchema *pSchema); -int32_t tdEncodeSchema(void **buf, STSchema *pSchema); -void *tdDecodeSchema(void *buf, STSchema **pRSchema); - -static FORCE_INLINE int32_t comparColId(const void *key1, const void *key2) { - if (*(int16_t *)key1 > ((STColumn *)key2)->colId) { - return 1; - } else if (*(int16_t *)key1 < ((STColumn *)key2)->colId) { - return -1; - } else { - return 0; - } -} - -static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) { - void *ptr = bsearch(&colId, (void *)pSchema->columns, schemaNCols(pSchema), sizeof(STColumn), comparColId); - if (ptr == NULL) return NULL; - return (STColumn *)ptr; -} - // ----------------- SCHEMA BUILDER DEFINITION typedef struct { int32_t tCols; @@ -299,141 +259,6 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); -// ----------------- Semantic timestamp key definition -// typedef uint64_t TKEY; -#define TKEY TSKEY - -#define TKEY_INVALID UINT64_MAX -#define TKEY_NULL TKEY_INVALID -#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63) -#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG)) - -#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0) -#define TKEY_IS_DELETED(tkey) (false) - -#define tdGetTKEY(key) (key) -#define tdGetKey(tskey) (tskey) - -#define MIN_TS_KEY ((TSKEY)0x8000000000000001) -#define MAX_TS_KEY ((TSKEY)0x7fffffffffffffff) - -#define TD_TO_TKEY(key) tdGetTKEY(((key) < MIN_TS_KEY) ? MIN_TS_KEY : (((key) > MAX_TS_KEY) ? MAX_TS_KEY : key)) - -static FORCE_INLINE TKEY keyToTkey(TSKEY key) { - TSKEY lkey = key; - if (key > MAX_TS_KEY) { - lkey = MAX_TS_KEY; - } else if (key < MIN_TS_KEY) { - lkey = MIN_TS_KEY; - } - - return tdGetTKEY(lkey); -} - -static FORCE_INLINE int32_t tkeyComparFn(const void *tkey1, const void *tkey2) { - TSKEY key1 = tdGetKey(*(TKEY *)tkey1); - TSKEY key2 = tdGetKey(*(TKEY *)tkey2); - - if (key1 < key2) { - return -1; - } else if (key1 > key2) { - return 1; - } else { - return 0; - } -} - -// ----------------- Data column structure -// SDataCol arrangement: data => bitmap => dataOffset -typedef struct SDataCol { - int8_t type; // column type - uint8_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows - uint8_t reserve : 7; - int16_t colId; // column ID - int32_t bytes; // column data bytes defined - int32_t offset; // data offset in a SDataRow (including the header size) - int32_t spaceSize; // Total space size for this column - int32_t len; // column data length - VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column - void *pData; // Actual data pointer - void *pBitmap; // Bitmap pointer - TSKEY ts; // only used in last NULL column -} SDataCol; - -#define isAllRowsNull(pCol) ((pCol)->len == 0) -#define isAllRowsNone(pCol) ((pCol)->len == 0) -static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } - -int32_t tdAllocMemForCol(SDataCol *pCol, int32_t maxPoints); - -void dataColInit(SDataCol *pDataCol, STColumn *pCol, int32_t maxPoints); -int32_t dataColAppendVal(SDataCol *pCol, const void *value, int32_t numOfRows, int32_t maxPoints); -void *dataColSetOffset(SDataCol *pCol, int32_t nEle); - -bool isNEleNull(SDataCol *pCol, int32_t nEle); - -typedef struct { - col_id_t maxCols; // max number of columns - col_id_t numOfCols; // Total number of cols - int32_t maxPoints; // max number of points - int32_t numOfRows; - int32_t bitmapMode : 1; // default is 0(2 bits), otherwise 1(1 bit) - int32_t sversion : 31; // TODO: set sversion(not used yet) - SDataCol *cols; -} SDataCols; - -static FORCE_INLINE bool tdDataColsIsBitmapI(SDataCols *pCols) { return pCols->bitmapMode != TSDB_BITMODE_DEFAULT; } -static FORCE_INLINE void tdDataColsSetBitmapI(SDataCols *pCols) { pCols->bitmapMode = TSDB_BITMODE_ONE_BIT; } -static FORCE_INLINE bool tdIsBitmapModeI(int8_t bitmapMode) { return bitmapMode != TSDB_BITMODE_DEFAULT; } - -#define keyCol(pCols) (&((pCols)->cols[0])) // Key column -#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data -#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx)) -static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) { - if (pCols->numOfRows) { - return dataColsTKeyAt(pCols, 0); - } else { - return TKEY_INVALID; - } -} - -static FORCE_INLINE TSKEY dataColsKeyAtRow(SDataCols *pCols, int32_t row) { - assert(row < pCols->numOfRows); - return dataColsKeyAt(pCols, row); -} - -static FORCE_INLINE TSKEY dataColsKeyFirst(SDataCols *pCols) { - if (pCols->numOfRows) { - return dataColsKeyAt(pCols, 0); - } else { - return TSDB_DATA_TIMESTAMP_NULL; - } -} - -static FORCE_INLINE TKEY dataColsTKeyLast(SDataCols *pCols) { - if (pCols->numOfRows) { - return dataColsTKeyAt(pCols, pCols->numOfRows - 1); - } else { - return TKEY_INVALID; - } -} - -static FORCE_INLINE TSKEY dataColsKeyLast(SDataCols *pCols) { - if (pCols->numOfRows) { - return dataColsKeyAt(pCols, pCols->numOfRows - 1); - } else { - return TSDB_DATA_TIMESTAMP_NULL; - } -} - -SDataCols *tdNewDataCols(int32_t maxCols, int32_t maxRows); -void tdResetDataCols(SDataCols *pCols); -int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema); -SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); -SDataCols *tdFreeDataCols(SDataCols *pCols); -int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update, - TDRowVerT maxVer); - #endif #ifdef __cplusplus diff --git a/include/common/trow.h b/include/common/trow.h index 086a6ce6fb..807a4c0f0a 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -168,7 +168,7 @@ typedef struct { // N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and // (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. -#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN) +#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) ((s)->tlen + TD_ROW_HEAD_LEN) #define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i)) #define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t)) @@ -223,9 +223,10 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDR static FORCE_INLINE int32_t tdGetBitmapValType(const void *pBitmap, int16_t colIdx, TDRowValT *pValType, int8_t bitmapMode); bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode); -int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t maxPoints, - int8_t bitmapMode, bool isMerge); -int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge); +// int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t +// maxPoints, +// int8_t bitmapMode, bool isMerge); +// int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge); int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pValType); int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType); @@ -318,12 +319,9 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal); bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal); bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal); -STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema1, STSchema *pSchema2); -int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int32_t row, int8_t bitmapMode); bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx, SCellVal *pVal); bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal); -int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode); void tdSCellValPrint(SCellVal *pVal, int8_t colType); void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index ec7be79934..8eeab77a15 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -175,7 +175,8 @@ static void setBitMap(uint8_t *pb, uint8_t v, int32_t idx, uint8_t flags) { } while (0) int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) { - int32_t code = 0; + int32_t code = 0; +#if 0 STColumn *pTColumn; SColVal *pColVal; int32_t nColVal = taosArrayGetSize(pArray); @@ -462,30 +463,22 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S } } +#endif _exit: return code; } int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow) { int32_t code = 0; + int32_t rLen; - (*ppRow) = (STSRow2 *)taosMemoryMalloc(sizeof(**ppRow)); + TSROW_LEN(pRow, rLen); + (*ppRow) = (STSRow2 *)taosMemoryMalloc(rLen); if (*ppRow == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - **ppRow = *pRow; - (*ppRow)->pData = NULL; - - if (pRow->nData) { - (*ppRow)->pData = taosMemoryMalloc(pRow->nData); - if ((*ppRow)->pData == NULL) { - taosMemoryFree(*ppRow); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - memcpy((*ppRow)->pData, pRow->pData, pRow->nData); - } + memcpy(*ppRow, pRow, rLen); _exit: return code; @@ -493,12 +486,12 @@ _exit: void tTSRowFree(STSRow2 *pRow) { if (pRow) { - if (pRow->pData) taosMemoryFree(pRow->pData); taosMemoryFree(pRow); } } void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { +#if 0 uint8_t isTuple = ((pRow->flags & 0xf0) == 0) ? 1 : 0; STColumn *pTColumn = &pTSchema->columns[iCol]; uint8_t flags = pRow->flags & (uint8_t)0xf; @@ -643,10 +636,12 @@ _return_null: _return_value: *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value); return; +#endif } int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) { int32_t code = 0; +#if 0 SColVal cv; (*ppArray) = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); @@ -660,52 +655,27 @@ int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) { taosArrayPush(*ppArray, &cv); } +#endif _exit: return code; } int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) { - int32_t n = 0; + int32_t n; - n += tPutI64(p ? p + n : p, pRow->ts); - n += tPutI8(p ? p + n : p, pRow->flags); - n += tPutI32v(p ? p + n : p, pRow->sver); - - ASSERT(pRow->flags & 0xf); - - switch (pRow->flags & 0xf) { - case TSROW_HAS_NONE: - case TSROW_HAS_NULL: - ASSERT(pRow->nData == 0); - ASSERT(pRow->pData == NULL); - break; - default: - ASSERT(pRow->nData && pRow->pData); - n += tPutBinary(p ? p + n : p, pRow->pData, pRow->nData); - break; + TSROW_LEN(pRow, n); + if (p) { + memcpy(p, pRow, n); } return n; } -int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow) { - int32_t n = 0; +int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow) { + int32_t n; - n += tGetI64(p + n, &pRow->ts); - n += tGetI8(p + n, &pRow->flags); - n += tGetI32v(p + n, &pRow->sver); - - ASSERT(pRow->flags); - switch (pRow->flags & 0xf) { - case TSROW_HAS_NONE: - case TSROW_HAS_NULL: - pRow->nData = 0; - pRow->pData = NULL; - break; - default: - n += tGetBinary(p + n, &pRow->pData, &pRow->nData); - break; - } + *ppRow = (STSRow2 *)p; + TSROW_LEN(*ppRow, n); return n; } @@ -904,15 +874,13 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) { return n; } -bool tTagIsJson(const void *pTag){ - return (((const STag *)pTag)->flags & TD_TAG_JSON); -} +bool tTagIsJson(const void *pTag) { return (((const STag *)pTag)->flags & TD_TAG_JSON); } -bool tTagIsJsonNull(void *data){ - STag *pTag = (STag*)data; - int8_t isJson = tTagIsJson(pTag); - if(!isJson) return false; - return ((STag*)data)->nTag == 0; +bool tTagIsJsonNull(void *data) { + STag *pTag = (STag *)data; + int8_t isJson = tTagIsJson(pTag); + if (!isJson) return false; + return ((STag *)data)->nTag == 0; } int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) { @@ -1097,112 +1065,6 @@ _err: } #if 1 // =================================================================================================================== -static void dataColSetNEleNull(SDataCol *pCol, int nEle); -int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { - int spaceNeeded = pCol->bytes * maxPoints; - if (IS_VAR_DATA_TYPE(pCol->type)) { - spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; - } -#ifdef TD_SUPPORT_BITMAP - int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints); - spaceNeeded += (int)nBitmapBytes; - // TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more - // TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered. - // spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus - // remove the additional space -#endif - - if (pCol->spaceSize < spaceNeeded) { - void *ptr = taosMemoryRealloc(pCol->pData, spaceNeeded); - if (ptr == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded, strerror(errno)); - return -1; - } else { - pCol->pData = ptr; - pCol->spaceSize = spaceNeeded; - } - } -#ifdef TD_SUPPORT_BITMAP - - if (IS_VAR_DATA_TYPE(pCol->type)) { - pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); - pCol->dataOff = POINTER_SHIFT(pCol->pBitmap, nBitmapBytes); - } else { - pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); - } -#else - if (IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); - } -#endif - return 0; -} - -/** - * Duplicate the schema and return a new object - */ -STSchema *tdDupSchema(const STSchema *pSchema) { - int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema); - STSchema *tSchema = (STSchema *)taosMemoryMalloc(tlen); - if (tSchema == NULL) return NULL; - - memcpy((void *)tSchema, (void *)pSchema, tlen); - - return tSchema; -} - -/** - * Encode a schema to dst, and return the next pointer - */ -int tdEncodeSchema(void **buf, STSchema *pSchema) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema)); - tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema)); - - for (int i = 0; i < schemaNCols(pSchema); i++) { - STColumn *pCol = schemaColAt(pSchema, i); - tlen += taosEncodeFixedI8(buf, colType(pCol)); - tlen += taosEncodeFixedI8(buf, colFlags(pCol)); - tlen += taosEncodeFixedI16(buf, colColId(pCol)); - tlen += taosEncodeFixedI16(buf, colBytes(pCol)); - } - - return tlen; -} - -/** - * Decode a schema from a binary. - */ -void *tdDecodeSchema(void *buf, STSchema **pRSchema) { - int version = 0; - int numOfCols = 0; - STSchemaBuilder schemaBuilder; - - buf = taosDecodeFixedI32(buf, &version); - buf = taosDecodeFixedI32(buf, &numOfCols); - - if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL; - - for (int i = 0; i < numOfCols; i++) { - col_type_t type = 0; - int8_t flags = 0; - col_id_t colId = 0; - col_bytes_t bytes = 0; - buf = taosDecodeFixedI8(buf, &type); - buf = taosDecodeFixedI8(buf, &flags); - buf = taosDecodeFixedI16(buf, &colId); - buf = taosDecodeFixedI32(buf, &bytes); - if (tdAddColToSchema(&schemaBuilder, type, flags, colId, bytes) < 0) { - tdDestroyTSchemaBuilder(&schemaBuilder); - return NULL; - } - } - - *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder); - tdDestroyTSchemaBuilder(&schemaBuilder); - return buf; -} - int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) { if (pBuilder == NULL) return -1; @@ -1239,22 +1101,22 @@ int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, c } STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]); - colSetType(pCol, type); - colSetColId(pCol, colId); - colSetFlags(pCol, flags); + pCol->type = type; + pCol->colId = colId; + pCol->flags = flags; if (pBuilder->nCols == 0) { - colSetOffset(pCol, 0); + pCol->offset = 0; } else { STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols - 1]); - colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); + pCol->offset = pTCol->offset + TYPE_BYTES[pTCol->type]; } if (IS_VAR_DATA_TYPE(type)) { - colSetBytes(pCol, bytes); + pCol->bytes = bytes; pBuilder->tlen += (TYPE_BYTES[type] + bytes); pBuilder->vlen += bytes - sizeof(VarDataLenT); } else { - colSetBytes(pCol, TYPE_BYTES[type]); + pCol->bytes = TYPE_BYTES[type]; pBuilder->tlen += TYPE_BYTES[type]; pBuilder->vlen += TYPE_BYTES[type]; } @@ -1275,151 +1137,19 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { STSchema *pSchema = (STSchema *)taosMemoryMalloc(tlen); if (pSchema == NULL) return NULL; - schemaVersion(pSchema) = pBuilder->version; - schemaNCols(pSchema) = pBuilder->nCols; - schemaTLen(pSchema) = pBuilder->tlen; - schemaFLen(pSchema) = pBuilder->flen; - schemaVLen(pSchema) = pBuilder->vlen; + pSchema->version = pBuilder->version; + pSchema->numOfCols = pBuilder->nCols; + pSchema->tlen = pBuilder->tlen; + pSchema->flen = pBuilder->flen; + pSchema->vlen = pBuilder->vlen; #ifdef TD_SUPPORT_BITMAP - schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema)); + pSchema->tlen += (int)TD_BITMAP_BYTES(pSchema->numOfCols); #endif - memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols); + memcpy(&pSchema->columns[0], pBuilder->columns, sizeof(STColumn) * pBuilder->nCols); return pSchema; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { - pDataCol->type = colType(pCol); - pDataCol->colId = colColId(pCol); - pDataCol->bytes = colBytes(pCol); - pDataCol->offset = colOffset(pCol) + 0; // TD_DATA_ROW_HEAD_SIZE; - - pDataCol->len = 0; -} - -static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); - } else { - return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); - } -} - -bool isNEleNull(SDataCol *pCol, int nEle) { - if (isAllRowsNull(pCol)) return true; - for (int i = 0; i < nEle; ++i) { - if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false; - } - return true; -} - -void *dataColSetOffset(SDataCol *pCol, int nEle) { - ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); - - void *tptr = pCol->pData; - // char *tptr = (char *)(pCol->pData); - - VarDataOffsetT offset = 0; - for (int i = 0; i < nEle; ++i) { - pCol->dataOff[i] = offset; - offset += varDataTLen(tptr); - tptr = POINTER_SHIFT(tptr, varDataTLen(tptr)); - } - return POINTER_SHIFT(tptr, varDataTLen(tptr)); -} - -SDataCols *tdNewDataCols(int maxCols, int maxRows) { - SDataCols *pCols = (SDataCols *)taosMemoryCalloc(1, sizeof(SDataCols)); - if (pCols == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno)); - return NULL; - } - - pCols->maxPoints = maxRows; - pCols->maxCols = maxCols; - pCols->numOfRows = 0; - pCols->numOfCols = 0; - pCols->bitmapMode = TSDB_BITMODE_DEFAULT; - - if (maxCols > 0) { - pCols->cols = (SDataCol *)taosMemoryCalloc(maxCols, sizeof(SDataCol)); - if (pCols->cols == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, - strerror(errno)); - tdFreeDataCols(pCols); - return NULL; - } -#if 0 // no need as calloc used - int i; - for (i = 0; i < maxCols; i++) { - pCols->cols[i].spaceSize = 0; - pCols->cols[i].len = 0; - pCols->cols[i].pData = NULL; - pCols->cols[i].dataOff = NULL; - } -#endif - } - - return pCols; -} - -int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { - int i; - int oldMaxCols = pCols->maxCols; - if (schemaNCols(pSchema) > oldMaxCols) { - pCols->maxCols = schemaNCols(pSchema); - void *ptr = (SDataCol *)taosMemoryRealloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); - if (ptr == NULL) return -1; - pCols->cols = ptr; - for (i = oldMaxCols; i < pCols->maxCols; ++i) { - pCols->cols[i].pData = NULL; - pCols->cols[i].dataOff = NULL; - pCols->cols[i].pBitmap = NULL; - pCols->cols[i].spaceSize = 0; - } - } -#if 0 - tdResetDataCols(pCols); // redundant loop to reset len/blen to 0, already reset in following dataColInit(...) -#endif - - pCols->numOfRows = 0; - pCols->bitmapMode = TSDB_BITMODE_DEFAULT; - pCols->numOfCols = schemaNCols(pSchema); - - for (i = 0; i < schemaNCols(pSchema); ++i) { - dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); - } - - return 0; -} - -SDataCols *tdFreeDataCols(SDataCols *pCols) { - int i; - if (pCols) { - if (pCols->cols) { - int maxCols = pCols->maxCols; - for (i = 0; i < maxCols; ++i) { - SDataCol *pCol = &pCols->cols[i]; - taosMemoryFreeClear(pCol->pData); - } - taosMemoryFree(pCols->cols); - pCols->cols = NULL; - } - taosMemoryFree(pCols); - } - return NULL; -} - -void tdResetDataCols(SDataCols *pCols) { - if (pCols != NULL) { - pCols->numOfRows = 0; - pCols->bitmapMode = 0; - for (int i = 0; i < pCols->maxCols; ++i) { - dataColReset(pCols->cols + i); - } - } -} - #endif \ No newline at end of file diff --git a/source/common/src/trow.c b/source/common/src/trow.c index 052b6ffe58..f64250bce6 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -32,28 +32,10 @@ const uint8_t tdVTypeByte[2][3] = {{ }; // declaration -static uint8_t tdGetBitmapByte(uint8_t byte); -static int32_t tdCompareColId(const void *arg1, const void *arg2); +static uint8_t tdGetBitmapByte(uint8_t byte); +static int32_t tdCompareColId(const void *arg1, const void *arg2); static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2); -// static void dataColSetNEleNull(SDataCol *pCol, int nEle); - -/** - * @brief src2 data has more priority than src1 - * - * @param target - * @param src1 - * @param iter1 - * @param limit1 - * @param src2 - * @param iter2 - * @param limit2 - * @param tRows - * @param update - */ -static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows, bool update); - // implementation /** * @brief Compress bitmap bytes comprised of 2-bits to counterpart of 1-bit. @@ -287,33 +269,6 @@ void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) { } } -static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index, bool setBitmap, int8_t bitmapMode) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff[index] = pCol->len; - char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); - setVardataNull(ptr, pCol->type); - pCol->len += varDataTLen(ptr); - } else { - setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); - pCol->len += TYPE_BYTES[pCol->type]; - } - if (setBitmap) { - tdSetBitmapValType(pCol->pBitmap, index, TD_VTYPE_NONE, bitmapMode); - } -} - -// static void dataColSetNEleNull(SDataCol *pCol, int nEle) { -// if (IS_VAR_DATA_TYPE(pCol->type)) { -// pCol->len = 0; -// for (int i = 0; i < nEle; i++) { -// dataColSetNullAt(pCol, i); -// } -// } else { -// setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); -// pCol->len = TYPE_BYTES[pCol->type] * nEle; -// } -// } - /** * @brief Set bitmap area by byte preferentially and then by bit. * @@ -362,56 +317,6 @@ bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode return true; } -static FORCE_INLINE void dataColSetNoneAt(SDataCol *pCol, int index, bool setBitmap, int8_t bitmapMode) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff[index] = pCol->len; - char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); - setVardataNull(ptr, pCol->type); - pCol->len += varDataTLen(ptr); - } else { - setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); - pCol->len += TYPE_BYTES[pCol->type]; - } - if (setBitmap) { - tdSetBitmapValType(pCol->pBitmap, index, TD_VTYPE_NONE, bitmapMode); - } -} - -static void dataColSetNEleNone(SDataCol *pCol, int nEle, int8_t bitmapMode) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - pCol->len = 0; - for (int i = 0; i < nEle; ++i) { - dataColSetNoneAt(pCol, i, false, bitmapMode); - } - } else { - setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); - pCol->len = TYPE_BYTES[pCol->type] * nEle; - } -#ifdef TD_SUPPORT_BITMAP - tdSetBitmapValTypeN(pCol->pBitmap, nEle, TD_VTYPE_NONE, bitmapMode); -#endif -} - -#if 0 -void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) { - // TODO -} - -void trbSetRowVersion(SRowBuilder *pRB, uint64_t ver) { - // TODO -} - -void trbSetRowTS(SRowBuilder *pRB, TSKEY ts) { - // TODO -} - -int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) { - // TODO - return 0; -} - -#endif - STSRow *tdRowDup(STSRow *row) { STSRow *trow = taosMemoryMalloc(TD_ROW_LEN(row)); if (trow == NULL) return NULL; @@ -420,511 +325,6 @@ STSRow *tdRowDup(STSRow *row) { return trow; } -/** - * @brief - * - * @param pCol - * @param valType - * @param val - * @param numOfRows - * @param maxPoints - * @param bitmapMode default is 0(2 bits), otherwise 1(1 bit) - * @param isMerge merge to current row - * @return int - */ -int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints, - int8_t bitmapMode, bool isMerge) { - TASSERT(pCol != NULL); - - // Assume that the columns not specified during insert/upsert mean None. - if (isAllRowsNone(pCol)) { - if (tdValIsNone(valType)) { - // all None value yet, just return - return 0; - } - - if (tdAllocMemForCol(pCol, maxPoints) < 0) return -1; - if (numOfRows > 0) { - // Find the first not None value, fill all previous values as None - dataColSetNEleNone(pCol, numOfRows, bitmapMode); - } - } - const void *value = val; - if (!tdValTypeIsNorm(valType) || !val) { - // TODO: - // 1. back compatibility and easy to debug with codes of 2.0 to save NULL values. - // 2. later on, considering further optimization, don't save Null/None for VarType. - value = getNullValue(pCol->type); - } - if (!isMerge) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - // set offset - pCol->dataOff[numOfRows] = pCol->len; - // Copy data - memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); - // Update the length - pCol->len += varDataTLen(value); - } else { - ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); - memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); - pCol->len += pCol->bytes; - } - } else if (!tdValTypeIsNone(valType)) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - // keep the last offset - // discard the last var data - int32_t lastVarLen = varDataTLen(POINTER_SHIFT(pCol->pData, pCol->dataOff[numOfRows])); - pCol->len -= lastVarLen; - // Copy data - memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); - // Update the length - pCol->len += varDataTLen(value); - } else { - ASSERT(pCol->len - TYPE_BYTES[pCol->type] == TYPE_BYTES[pCol->type] * numOfRows); - memcpy(POINTER_SHIFT(pCol->pData, pCol->len - TYPE_BYTES[pCol->type]), value, pCol->bytes); - } - } - -#ifdef TD_SUPPORT_BITMAP - if (!isMerge || !tdValTypeIsNone(valType)) { - tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode); - } -#endif - return 0; -} - -// internal -static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) { -#if 0 - ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow)); -#endif - - // Multi-Version rows with the same key and different versions supported - ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) <= TD_ROW_KEY(pRow)); - - int rcol = 1; - int dcol = 1; - void *pBitmap = tdGetBitmapAddrTp(pRow, pSchema->flen); - - SDataCol *pDataCol = &(pCols->cols[0]); - ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID); - tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - - while (dcol < pCols->numOfCols) { - pDataCol = &(pCols->cols[dcol]); - if (rcol >= schemaNCols(pSchema)) { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - ++dcol; - continue; - } - - STColumn *pRowCol = schemaColAt(pSchema, rcol); - SCellVal sVal = {0}; - if (pRowCol->colId == pDataCol->colId) { - if (tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset - sizeof(TSKEY), rcol - 1) < 0) { - return terrno; - } - tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - ++dcol; - ++rcol; - } else if (pRowCol->colId < pDataCol->colId) { - ++rcol; - } else { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - ++dcol; - } - } -#if 0 - ++pCols->numOfRows; -#endif - - return TSDB_CODE_SUCCESS; -} -// internal -static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) { - ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow)); - - int rcol = 0; - int dcol = 1; - int tRowCols = tdRowGetNCols(pRow) - 1; // the primary TS key not included in kvRowColIdx part - int tSchemaCols = schemaNCols(pSchema) - 1; - void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow)); - - SDataCol *pDataCol = &(pCols->cols[0]); - ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID); - tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - - while (dcol < pCols->numOfCols) { - pDataCol = &(pCols->cols[dcol]); - if (rcol >= tRowCols || rcol >= tSchemaCols) { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - ++dcol; - continue; - } - - SKvRowIdx *pIdx = tdKvRowColIdxAt(pRow, rcol); - int16_t colIdx = -1; - if (pIdx) { - colIdx = POINTER_DISTANCE(pIdx, TD_ROW_COL_IDX(pRow)) / sizeof(SKvRowIdx); - } - TASSERT(colIdx >= 0); - SCellVal sVal = {0}; - if (pIdx->colId == pDataCol->colId) { - if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pIdx->offset, colIdx) < 0) { - return terrno; - } - tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - ++dcol; - ++rcol; - } else if (pIdx->colId < pDataCol->colId) { - ++rcol; - } else { - tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, - isMerge); - ++dcol; - } - } -#if 0 - ++pCols->numOfRows; -#endif - - return TSDB_CODE_SUCCESS; -} - -/** - * @brief exposed - * - * @param pRow - * @param pSchema - * @param pCols - */ -int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) { -#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS - printf("%s:%d ts: %" PRIi64 " sver:%d maxCols:%" PRIi16 " nCols:%" PRIi16 ", nRows:%d\n", __func__, __LINE__, - TD_ROW_KEY(pRow), TD_ROW_SVER(pRow), pCols->maxCols, pCols->numOfCols, pCols->numOfRows); -#endif - if (TD_IS_TP_ROW(pRow)) { - return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge); - } else if (TD_IS_KV_ROW(pRow)) { - return tdAppendKvRowToDataCol(pRow, pSchema, pCols, isMerge); - } else { - ASSERT(0); - } - return TSDB_CODE_SUCCESS; -} - -/** - * @brief source data has more priority than target - * - * @param target - * @param source - * @param rowsToMerge - * @param pOffset - * @param update - * @param maxVer - * @return int - */ -int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool update, - TDRowVerT maxVer) { - ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); - ASSERT(target->numOfCols == source->numOfCols); - int offset = 0; - - if (pOffset == NULL) { - pOffset = &offset; - } - - SDataCols *pTarget = NULL; - - if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap - ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); - // TODO: filter the maxVer - TSKEY lastKey = TSKEY_INITIAL_VAL; - for (int i = 0; i < rowsToMerge; ++i) { - bool merge = false; - for (int j = 0; j < source->numOfCols; j++) { - if (source->cols[j].len > 0 || target->cols[j].len > 0) { - SCellVal sVal = {0}; - if (tdGetColDataOfRow(&sVal, source->cols + j, i + (*pOffset), source->bitmapMode) < 0) { - TASSERT(0); - } - - if (j == 0) { - if (lastKey == *(TSKEY *)sVal.val) { - if (!update) { - break; - } - merge = true; - } else if (lastKey != TSKEY_INITIAL_VAL) { - ++target->numOfRows; - } - - lastKey = *(TSKEY *)sVal.val; - } - if (i == 0) { - (target->cols + j)->bitmap = (source->cols + j)->bitmap; - } - - tdAppendValToDataCol(target->cols + j, sVal.valType, sVal.val, target->numOfRows, target->maxPoints, - target->bitmapMode, merge); - } - } - } - if (lastKey != TSKEY_INITIAL_VAL) { - ++target->numOfRows; - } - (*pOffset) += rowsToMerge; - } else { - pTarget = tdDupDataCols(target, true); - if (pTarget == NULL) goto _err; - - int iter1 = 0; - tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows, - pTarget->numOfRows + rowsToMerge, update); - } - - tdFreeDataCols(pTarget); - return 0; - -_err: - tdFreeDataCols(pTarget); - return -1; -} - -static void tdAppendValToDataCols(SDataCols *target, SDataCols *src, int iter, bool isMerge) { - for (int i = 0; i < src->numOfCols; ++i) { - ASSERT(target->cols[i].type == src->cols[i].type); - if (src->cols[i].len > 0 || target->cols[i].len > 0) { - SCellVal sVal = {0}; - if (tdGetColDataOfRow(&sVal, src->cols + i, iter, src->bitmapMode) < 0) { - TASSERT(0); - } - if (isMerge) { - if (!tdValTypeIsNone(sVal.valType)) { - tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, - target->bitmapMode, isMerge); - } else { - // Keep the origin value for None - } - } else { - tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints, - target->bitmapMode, isMerge); - } - } - } -} -/** - * @brief src2 data has more priority than src1 - * - * @param target - * @param src1 - * @param iter1 - * @param limit1 - * @param src2 - * @param iter2 - * @param limit2 - * @param tRows - * @param update - */ -static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows, bool update) { - tdResetDataCols(target); - target->bitmapMode = src1->bitmapMode; - ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); - int32_t nRows = 0; - - // TODO: filter the maxVer - // TODO: handle the delete function - TSKEY lastKey = TSKEY_INITIAL_VAL; - while (nRows < tRows) { - if (*iter1 >= limit1 && *iter2 >= limit2) break; - - TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1); - // TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1); - TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2); - // TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2); - - // ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1))); - - if (key1 <= key2) { - // select key1 if not delete - if (update && (lastKey == key1)) { - tdAppendValToDataCols(target, src1, *iter1, true); - } else if (lastKey != key1) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++target->numOfRows; - } - tdAppendValToDataCols(target, src1, *iter1, false); - } - ++nRows; - ++(*iter1); - lastKey = key1; - } else { - // use key2 if not deleted - // TODO: handle the delete function - if (update && (lastKey == key2)) { - tdAppendValToDataCols(target, src2, *iter2, true); - } else if (lastKey != key2) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++target->numOfRows; - } - tdAppendValToDataCols(target, src2, *iter2, false); - } - - ++nRows; - ++(*iter2); - lastKey = key2; - } - - ASSERT(target->numOfRows <= target->maxPoints - 1); - } - if (lastKey != TSKEY_INITIAL_VAL) { - ++target->numOfRows; - } -} - -STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema1, STSchema *pSchema2) { -#if 0 - ASSERT(TD_ROW_KEY(row1) == TD_ROW_KEY(row2)); - ASSERT(schemaVersion(pSchema1) == TD_ROW_SVER(row1)); - ASSERT(schemaVersion(pSchema2) == TD_ROW_SVER(row2)); - ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2)); -#endif - -#if 0 - SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo)); - if (stashRow == NULL) { - return NULL; - } - - STSRow pRow = buffer; - STpRow dataRow = memRowDataBody(pRow); - memRowSetType(pRow, SMEM_ROW_DATA); - dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version - dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen)); - - TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE; - - int32_t i = 0; // row1 - int32_t j = 0; // row2 - int32_t nCols1 = schemaNCols(pSchema1); - int32_t nCols2 = schemaNCols(pSchema2); - SColInfo colInfo = {0}; - int32_t kvIdx1 = 0, kvIdx2 = 0; - - while (i < nCols1) { - STColumn *pCol = schemaColAt(pSchema1, i); - void * val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1); - // if val1 != NULL, use val1; - if (val1 != NULL && !isNull(val1, pCol->type)) { - tdAppendColVal(dataRow, val1, pCol->type, pCol->offset); - kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type); - setSColInfo(&colInfo, pCol->colId, pCol->type, val1); - taosArrayPush(stashRow, &colInfo); - ++i; // next col - continue; - } - - void *val2 = NULL; - while (j < nCols2) { - STColumn *tCol = schemaColAt(pSchema2, j); - if (tCol->colId < pCol->colId) { - ++j; - continue; - } - if (tCol->colId == pCol->colId) { - val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2); - } else if (tCol->colId > pCol->colId) { - // set NULL - } - break; - } // end of while(jtype); - } - tdAppendColVal(dataRow, val2, pCol->type, pCol->offset); - if (!isNull(val2, pCol->type)) { - kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type); - setSColInfo(&colInfo, pCol->colId, pCol->type, val2); - taosArrayPush(stashRow, &colInfo); - } - - ++i; // next col - } - - dataLen = TD_ROW_LEN(pRow); - - if (kvLen < dataLen) { - // scan stashRow and generate SKVRow - memset(buffer, 0, sizeof(dataLen)); - STSRow tRow = buffer; - memRowSetType(tRow, SMEM_ROW_KV); - SKVRow kvRow = (SKVRow)memRowKvBody(tRow); - int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow); - kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols)); - kvRowSetNCols(kvRow, nKvNCols); - memRowSetKvVersion(tRow, pSchema1->version); - - int32_t toffset = 0; - int16_t k; - for (k = 0; k < nKvNCols; ++k) { - SColInfo *pColInfo = taosArrayGet(stashRow, k); - tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset); - toffset += sizeof(SColIdx); - } - ASSERT(kvLen == TD_ROW_LEN(tRow)); - } - taosArrayDestroy(stashRow); - return buffer; -#endif - return NULL; -} - -SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { - SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints); - if (pRet == NULL) return NULL; - - pRet->numOfCols = pDataCols->numOfCols; - pRet->bitmapMode = pDataCols->bitmapMode; - pRet->sversion = pDataCols->sversion; - if (keepData) pRet->numOfRows = pDataCols->numOfRows; - - for (int i = 0; i < pDataCols->numOfCols; ++i) { - pRet->cols[i].type = pDataCols->cols[i].type; - pRet->cols[i].bitmap = pDataCols->cols[i].bitmap; - pRet->cols[i].colId = pDataCols->cols[i].colId; - pRet->cols[i].bytes = pDataCols->cols[i].bytes; - pRet->cols[i].offset = pDataCols->cols[i].offset; - - if (keepData) { - if (pDataCols->cols[i].len > 0) { - if (tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) { - tdFreeDataCols(pRet); - return NULL; - } - pRet->cols[i].len = pDataCols->cols[i].len; - memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); - if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { - int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints; - memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize); - } - if (!TD_COL_ROWS_NORM(pRet->cols + i)) { - memcpy(pRet->cols[i].pBitmap, pDataCols->cols[i].pBitmap, TD_BITMAP_BYTES(pDataCols->numOfRows)); - } - } - } - } - - return pRet; -} - void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) { STSRowIter iter = {0}; tdSTSRowIterInit(&iter, pSchema); @@ -1020,32 +420,6 @@ void tdSCellValPrint(SCellVal *pVal, int8_t colType) { } } -int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode) { - ASSERT(rows > 0); - int32_t result = 0; - - if (IS_VAR_DATA_TYPE(pDataCol->type)) { - result += pDataCol->dataOff[rows - 1]; - SCellVal val = {0}; - if (tdGetColDataOfRow(&val, pDataCol, rows - 1, bitmapMode) < 0) { - TASSERT(0); - } - - // Currently, count the varDataTLen in of Null/None cols considering back compatibility test for 2.4 - result += varDataTLen(val.val); - // TODO: later on, don't save Null/None for VarDataT for 3.0 - // if (tdValTypeIsNorm(val.valType)) { - // result += varDataTLen(val.val); - // } - } else { - result += TYPE_BYTES[pDataCol->type] * rows; - } - - ASSERT(pDataCol->len == result); - - return result; -} - bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow)); @@ -1082,40 +456,6 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl return true; } -int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int32_t row, int8_t bitmapMode) { - if (isAllRowsNone(pCol)) { - pVal->valType = TD_VTYPE_NONE; -#ifdef TD_SUPPORT_READ2 - pVal->val = (void *)getNullValue(pCol->type); -#else - pVal->val = NULL; -#endif - return TSDB_CODE_SUCCESS; - } - - if (TD_COL_ROWS_NORM(pCol)) { - pVal->valType = TD_VTYPE_NORM; - } else if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType), bitmapMode) < 0) { - return terrno; - } - - if (tdValTypeIsNorm(pVal->valType)) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - pVal->val = POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); - } else { - pVal->val = POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); - } - } else { - pVal->valType = TD_VTYPE_NULL; -#ifdef TD_SUPPORT_READ2 - pVal->val = (void *)getNullValue(pCol->type); -#else - pVal->val = NULL; -#endif - } - return TSDB_CODE_SUCCESS; -} - bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { pVal->val = &pIter->pRow->ts; diff --git a/source/common/test/dataformatTest.cpp b/source/common/test/dataformatTest.cpp index a52bb6b516..d16e35ff07 100644 --- a/source/common/test/dataformatTest.cpp +++ b/source/common/test/dataformatTest.cpp @@ -285,8 +285,8 @@ int32_t debugPrintSColVal(SColVal *cv, int8_t type) { } void debugPrintTSRow(STSRow2 *row, STSchema *pTSchema, const char *tags, int32_t ln) { - printf("%s:%d %s:v%d:%d ", tags, ln, (row->flags & 0xf0) ? "KV" : "TP", row->sver, row->nData); - for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) { + // printf("%s:%d %s:v%d:%d ", tags, ln, (row->flags & 0xf0) ? "KV" : "TP", row->sver, row->nData); + for (int16_t i = 0; i < pTSchema->numOfCols; ++i) { SColVal cv = {0}; tTSRowGet(row, pTSchema, i, &cv); debugPrintSColVal(&cv, pTSchema->columns[i].type); @@ -393,7 +393,7 @@ static int32_t checkSColVal(const char *rawVal, SColVal *cv, int8_t type) { } static void checkTSRow(const char **data, STSRow2 *row, STSchema *pTSchema) { - for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) { + for (int16_t i = 0; i < pTSchema->numOfCols; ++i) { SColVal cv = {0}; tTSRowGet(row, pTSchema, i, &cv); checkSColVal(data[i], &cv, pTSchema->columns[i].type); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 792f322f79..773484a9b3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -420,8 +420,8 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx } bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) { - if (pInterval->interval != pInterval->sliding && (pWin->ekey < pBlockInfo->calWin.skey || - pWin->skey > pBlockInfo->calWin.ekey) ) { + if (pInterval->interval != pInterval->sliding && + (pWin->ekey < pBlockInfo->calWin.skey || pWin->skey > pBlockInfo->calWin.ekey)) { return false; } return true; @@ -813,7 +813,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); int32_t ret = TSDB_CODE_SUCCESS; - if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { + if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && + inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { @@ -846,7 +847,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); } - if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { + if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && + inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, pBlock->info.rows, numOfOutput, pInfo->order); @@ -928,7 +930,7 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo tsCols = (int64_t*)pColDataInfo->pData; if (tsCols != NULL) { - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); } } @@ -1290,15 +1292,16 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); pGpDatas = (uint64_t*)pGpCol->pData; } - int32_t step = 0; - int32_t startPos = 0; + int32_t step = 0; + int32_t startPos = 0; SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC); while (1) { - step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + step = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; - bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); + bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); if (pUpWins && res) { SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; taosArrayPush(pUpWins, &winRes); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 949aad768f..0dbf476837 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1144,9 +1144,9 @@ static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBl static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { // the data is loaded, not only the block SMA value - for(int32_t i = start; i < num + start; ++i) { + for (int32_t i = start; i < num + start; ++i) { char* p = colDataGetData(pCol, i); - if (memcpy((void*)tval, p, pCol->info.bytes) == 0) { + if (memcpy((void*)tval, p, pCol->info.bytes) == 0) { return i; } } @@ -1154,7 +1154,6 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c ASSERT(0); } - int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int32_t numOfElems = 0; @@ -1938,7 +1937,7 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) { SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - for(int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { char* data = colDataGetData(pCol, i); SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data); stddevTransferInfo(pInputInfo, pInfo); @@ -3512,8 +3511,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS setBufPageDirty(pPage, true); releaseBufPage(pCtx->pBuf, pPage); #ifdef BUF_PAGE_DEBUG - qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId, - pPos->offset); + qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId, pPos->offset); #endif } @@ -3814,7 +3812,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); pInfo->result = 0; - pInfo->min = MAX_TS_KEY; + pInfo->min = TSKEY_MAX; pInfo->max = 0; if (pCtx->numOfParams > 1) { @@ -3841,7 +3839,7 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { } if (pInput->colDataAggIsSet) { - if (pInfo->min == MAX_TS_KEY) { + if (pInfo->min == TSKEY_MAX) { pInfo->min = GET_INT64_VAL(&pAgg->min); pInfo->max = GET_INT64_VAL(&pAgg->max); } else {