Merge pull request #14787 from taosdata/feat/row_refact
refact: row format
This commit is contained in:
commit
a67c8560a0
|
@ -64,18 +64,22 @@ int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type);
|
||||||
int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
|
int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
|
||||||
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
|
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
|
||||||
|
|
||||||
// STSRow2
|
// SColVal
|
||||||
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
|
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
|
||||||
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
|
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
|
||||||
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
|
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
|
||||||
|
|
||||||
|
// STSRow2
|
||||||
|
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL)
|
||||||
|
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL)
|
||||||
|
|
||||||
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
|
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
|
||||||
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
|
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
|
||||||
void tTSRowFree(STSRow2 *pRow);
|
void tTSRowFree(STSRow2 *pRow);
|
||||||
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
||||||
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
|
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
|
||||||
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
|
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
|
||||||
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
|
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow);
|
||||||
|
|
||||||
// STSRowBuilder
|
// STSRowBuilder
|
||||||
#define tsRowBuilderInit() ((STSRowBuilder){0})
|
#define tsRowBuilderInit() ((STSRowBuilder){0})
|
||||||
|
@ -123,16 +127,16 @@ struct STSchema {
|
||||||
#define TSROW_KV_SMALL ((uint8_t)0x10U)
|
#define TSROW_KV_SMALL ((uint8_t)0x10U)
|
||||||
#define TSROW_KV_MID ((uint8_t)0x20U)
|
#define TSROW_KV_MID ((uint8_t)0x20U)
|
||||||
#define TSROW_KV_BIG ((uint8_t)0x40U)
|
#define TSROW_KV_BIG ((uint8_t)0x40U)
|
||||||
|
#pragma pack(push, 1)
|
||||||
struct STSRow2 {
|
struct STSRow2 {
|
||||||
TSKEY ts;
|
TSKEY ts;
|
||||||
uint8_t flags;
|
uint8_t flags;
|
||||||
int32_t sver;
|
uint8_t data[];
|
||||||
uint32_t nData;
|
|
||||||
uint8_t *pData;
|
|
||||||
};
|
};
|
||||||
|
#pragma pack(pop)
|
||||||
|
|
||||||
struct STSRowBuilder {
|
struct STSRowBuilder {
|
||||||
STSRow2 tsRow;
|
// STSRow2 tsRow;
|
||||||
int32_t szBuf;
|
int32_t szBuf;
|
||||||
uint8_t *pBuf;
|
uint8_t *pBuf;
|
||||||
};
|
};
|
||||||
|
@ -226,50 +230,6 @@ struct STag {
|
||||||
memcpy(varDataVal(x), (str), (_size)); \
|
memcpy(varDataVal(x), (str), (_size)); \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// ----------------- TSDB COLUMN DEFINITION
|
|
||||||
|
|
||||||
#define colType(col) ((col)->type)
|
|
||||||
#define colFlags(col) ((col)->flags)
|
|
||||||
#define colColId(col) ((col)->colId)
|
|
||||||
#define colBytes(col) ((col)->bytes)
|
|
||||||
#define colOffset(col) ((col)->offset)
|
|
||||||
|
|
||||||
#define colSetType(col, t) (colType(col) = (t))
|
|
||||||
#define colSetFlags(col, f) (colFlags(col) = (f))
|
|
||||||
#define colSetColId(col, id) (colColId(col) = (id))
|
|
||||||
#define colSetBytes(col, b) (colBytes(col) = (b))
|
|
||||||
#define colSetOffset(col, o) (colOffset(col) = (o))
|
|
||||||
|
|
||||||
// ----------------- TSDB SCHEMA DEFINITION
|
|
||||||
|
|
||||||
#define schemaNCols(s) ((s)->numOfCols)
|
|
||||||
#define schemaVersion(s) ((s)->version)
|
|
||||||
#define schemaTLen(s) ((s)->tlen)
|
|
||||||
#define schemaFLen(s) ((s)->flen)
|
|
||||||
#define schemaVLen(s) ((s)->vlen)
|
|
||||||
#define schemaColAt(s, i) ((s)->columns + i)
|
|
||||||
#define tdFreeSchema(s) taosMemoryFreeClear((s))
|
|
||||||
|
|
||||||
STSchema *tdDupSchema(const STSchema *pSchema);
|
|
||||||
int32_t tdEncodeSchema(void **buf, STSchema *pSchema);
|
|
||||||
void *tdDecodeSchema(void *buf, STSchema **pRSchema);
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t comparColId(const void *key1, const void *key2) {
|
|
||||||
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
|
|
||||||
return 1;
|
|
||||||
} else if (*(int16_t *)key1 < ((STColumn *)key2)->colId) {
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) {
|
|
||||||
void *ptr = bsearch(&colId, (void *)pSchema->columns, schemaNCols(pSchema), sizeof(STColumn), comparColId);
|
|
||||||
if (ptr == NULL) return NULL;
|
|
||||||
return (STColumn *)ptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----------------- SCHEMA BUILDER DEFINITION
|
// ----------------- SCHEMA BUILDER DEFINITION
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t tCols;
|
int32_t tCols;
|
||||||
|
@ -299,141 +259,6 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version)
|
||||||
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes);
|
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes);
|
||||||
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
|
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
|
||||||
|
|
||||||
// ----------------- Semantic timestamp key definition
|
|
||||||
// typedef uint64_t TKEY;
|
|
||||||
#define TKEY TSKEY
|
|
||||||
|
|
||||||
#define TKEY_INVALID UINT64_MAX
|
|
||||||
#define TKEY_NULL TKEY_INVALID
|
|
||||||
#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63)
|
|
||||||
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG))
|
|
||||||
|
|
||||||
#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0)
|
|
||||||
#define TKEY_IS_DELETED(tkey) (false)
|
|
||||||
|
|
||||||
#define tdGetTKEY(key) (key)
|
|
||||||
#define tdGetKey(tskey) (tskey)
|
|
||||||
|
|
||||||
#define MIN_TS_KEY ((TSKEY)0x8000000000000001)
|
|
||||||
#define MAX_TS_KEY ((TSKEY)0x7fffffffffffffff)
|
|
||||||
|
|
||||||
#define TD_TO_TKEY(key) tdGetTKEY(((key) < MIN_TS_KEY) ? MIN_TS_KEY : (((key) > MAX_TS_KEY) ? MAX_TS_KEY : key))
|
|
||||||
|
|
||||||
static FORCE_INLINE TKEY keyToTkey(TSKEY key) {
|
|
||||||
TSKEY lkey = key;
|
|
||||||
if (key > MAX_TS_KEY) {
|
|
||||||
lkey = MAX_TS_KEY;
|
|
||||||
} else if (key < MIN_TS_KEY) {
|
|
||||||
lkey = MIN_TS_KEY;
|
|
||||||
}
|
|
||||||
|
|
||||||
return tdGetTKEY(lkey);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tkeyComparFn(const void *tkey1, const void *tkey2) {
|
|
||||||
TSKEY key1 = tdGetKey(*(TKEY *)tkey1);
|
|
||||||
TSKEY key2 = tdGetKey(*(TKEY *)tkey2);
|
|
||||||
|
|
||||||
if (key1 < key2) {
|
|
||||||
return -1;
|
|
||||||
} else if (key1 > key2) {
|
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----------------- Data column structure
|
|
||||||
// SDataCol arrangement: data => bitmap => dataOffset
|
|
||||||
typedef struct SDataCol {
|
|
||||||
int8_t type; // column type
|
|
||||||
uint8_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows
|
|
||||||
uint8_t reserve : 7;
|
|
||||||
int16_t colId; // column ID
|
|
||||||
int32_t bytes; // column data bytes defined
|
|
||||||
int32_t offset; // data offset in a SDataRow (including the header size)
|
|
||||||
int32_t spaceSize; // Total space size for this column
|
|
||||||
int32_t len; // column data length
|
|
||||||
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
|
|
||||||
void *pData; // Actual data pointer
|
|
||||||
void *pBitmap; // Bitmap pointer
|
|
||||||
TSKEY ts; // only used in last NULL column
|
|
||||||
} SDataCol;
|
|
||||||
|
|
||||||
#define isAllRowsNull(pCol) ((pCol)->len == 0)
|
|
||||||
#define isAllRowsNone(pCol) ((pCol)->len == 0)
|
|
||||||
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
|
||||||
|
|
||||||
int32_t tdAllocMemForCol(SDataCol *pCol, int32_t maxPoints);
|
|
||||||
|
|
||||||
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int32_t maxPoints);
|
|
||||||
int32_t dataColAppendVal(SDataCol *pCol, const void *value, int32_t numOfRows, int32_t maxPoints);
|
|
||||||
void *dataColSetOffset(SDataCol *pCol, int32_t nEle);
|
|
||||||
|
|
||||||
bool isNEleNull(SDataCol *pCol, int32_t nEle);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
col_id_t maxCols; // max number of columns
|
|
||||||
col_id_t numOfCols; // Total number of cols
|
|
||||||
int32_t maxPoints; // max number of points
|
|
||||||
int32_t numOfRows;
|
|
||||||
int32_t bitmapMode : 1; // default is 0(2 bits), otherwise 1(1 bit)
|
|
||||||
int32_t sversion : 31; // TODO: set sversion(not used yet)
|
|
||||||
SDataCol *cols;
|
|
||||||
} SDataCols;
|
|
||||||
|
|
||||||
static FORCE_INLINE bool tdDataColsIsBitmapI(SDataCols *pCols) { return pCols->bitmapMode != TSDB_BITMODE_DEFAULT; }
|
|
||||||
static FORCE_INLINE void tdDataColsSetBitmapI(SDataCols *pCols) { pCols->bitmapMode = TSDB_BITMODE_ONE_BIT; }
|
|
||||||
static FORCE_INLINE bool tdIsBitmapModeI(int8_t bitmapMode) { return bitmapMode != TSDB_BITMODE_DEFAULT; }
|
|
||||||
|
|
||||||
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
|
|
||||||
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data
|
|
||||||
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
|
|
||||||
static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) {
|
|
||||||
if (pCols->numOfRows) {
|
|
||||||
return dataColsTKeyAt(pCols, 0);
|
|
||||||
} else {
|
|
||||||
return TKEY_INVALID;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY dataColsKeyAtRow(SDataCols *pCols, int32_t row) {
|
|
||||||
assert(row < pCols->numOfRows);
|
|
||||||
return dataColsKeyAt(pCols, row);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY dataColsKeyFirst(SDataCols *pCols) {
|
|
||||||
if (pCols->numOfRows) {
|
|
||||||
return dataColsKeyAt(pCols, 0);
|
|
||||||
} else {
|
|
||||||
return TSDB_DATA_TIMESTAMP_NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TKEY dataColsTKeyLast(SDataCols *pCols) {
|
|
||||||
if (pCols->numOfRows) {
|
|
||||||
return dataColsTKeyAt(pCols, pCols->numOfRows - 1);
|
|
||||||
} else {
|
|
||||||
return TKEY_INVALID;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY dataColsKeyLast(SDataCols *pCols) {
|
|
||||||
if (pCols->numOfRows) {
|
|
||||||
return dataColsKeyAt(pCols, pCols->numOfRows - 1);
|
|
||||||
} else {
|
|
||||||
return TSDB_DATA_TIMESTAMP_NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataCols *tdNewDataCols(int32_t maxCols, int32_t maxRows);
|
|
||||||
void tdResetDataCols(SDataCols *pCols);
|
|
||||||
int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
|
||||||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
|
||||||
SDataCols *tdFreeDataCols(SDataCols *pCols);
|
|
||||||
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update,
|
|
||||||
TDRowVerT maxVer);
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -168,7 +168,7 @@ typedef struct {
|
||||||
|
|
||||||
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
|
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
|
||||||
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
|
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
|
||||||
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN)
|
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) ((s)->tlen + TD_ROW_HEAD_LEN)
|
||||||
|
|
||||||
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
|
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
|
||||||
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
|
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
|
||||||
|
@ -223,9 +223,10 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDR
|
||||||
static FORCE_INLINE int32_t tdGetBitmapValType(const void *pBitmap, int16_t colIdx, TDRowValT *pValType,
|
static FORCE_INLINE int32_t tdGetBitmapValType(const void *pBitmap, int16_t colIdx, TDRowValT *pValType,
|
||||||
int8_t bitmapMode);
|
int8_t bitmapMode);
|
||||||
bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode);
|
bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode);
|
||||||
int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t maxPoints,
|
// int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t
|
||||||
int8_t bitmapMode, bool isMerge);
|
// maxPoints,
|
||||||
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge);
|
// int8_t bitmapMode, bool isMerge);
|
||||||
|
// int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge);
|
||||||
|
|
||||||
int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pValType);
|
int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pValType);
|
||||||
int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType);
|
int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType);
|
||||||
|
@ -318,12 +319,9 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
|
||||||
bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal);
|
bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal);
|
||||||
bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal);
|
bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal);
|
||||||
bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
|
bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
|
||||||
STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema1, STSchema *pSchema2);
|
|
||||||
int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int32_t row, int8_t bitmapMode);
|
|
||||||
bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx,
|
bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx,
|
||||||
SCellVal *pVal);
|
SCellVal *pVal);
|
||||||
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal);
|
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal);
|
||||||
int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode);
|
|
||||||
void tdSCellValPrint(SCellVal *pVal, int8_t colType);
|
void tdSCellValPrint(SCellVal *pVal, int8_t colType);
|
||||||
void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag);
|
void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag);
|
||||||
|
|
||||||
|
|
|
@ -176,6 +176,7 @@ static void setBitMap(uint8_t *pb, uint8_t v, int32_t idx, uint8_t flags) {
|
||||||
|
|
||||||
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) {
|
int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
#if 0
|
||||||
STColumn *pTColumn;
|
STColumn *pTColumn;
|
||||||
SColVal *pColVal;
|
SColVal *pColVal;
|
||||||
int32_t nColVal = taosArrayGetSize(pArray);
|
int32_t nColVal = taosArrayGetSize(pArray);
|
||||||
|
@ -462,30 +463,22 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow) {
|
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t rLen;
|
||||||
|
|
||||||
(*ppRow) = (STSRow2 *)taosMemoryMalloc(sizeof(**ppRow));
|
TSROW_LEN(pRow, rLen);
|
||||||
|
(*ppRow) = (STSRow2 *)taosMemoryMalloc(rLen);
|
||||||
if (*ppRow == NULL) {
|
if (*ppRow == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
**ppRow = *pRow;
|
memcpy(*ppRow, pRow, rLen);
|
||||||
(*ppRow)->pData = NULL;
|
|
||||||
|
|
||||||
if (pRow->nData) {
|
|
||||||
(*ppRow)->pData = taosMemoryMalloc(pRow->nData);
|
|
||||||
if ((*ppRow)->pData == NULL) {
|
|
||||||
taosMemoryFree(*ppRow);
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
memcpy((*ppRow)->pData, pRow->pData, pRow->nData);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
@ -493,12 +486,12 @@ _exit:
|
||||||
|
|
||||||
void tTSRowFree(STSRow2 *pRow) {
|
void tTSRowFree(STSRow2 *pRow) {
|
||||||
if (pRow) {
|
if (pRow) {
|
||||||
if (pRow->pData) taosMemoryFree(pRow->pData);
|
|
||||||
taosMemoryFree(pRow);
|
taosMemoryFree(pRow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
|
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
|
||||||
|
#if 0
|
||||||
uint8_t isTuple = ((pRow->flags & 0xf0) == 0) ? 1 : 0;
|
uint8_t isTuple = ((pRow->flags & 0xf0) == 0) ? 1 : 0;
|
||||||
STColumn *pTColumn = &pTSchema->columns[iCol];
|
STColumn *pTColumn = &pTSchema->columns[iCol];
|
||||||
uint8_t flags = pRow->flags & (uint8_t)0xf;
|
uint8_t flags = pRow->flags & (uint8_t)0xf;
|
||||||
|
@ -643,10 +636,12 @@ _return_null:
|
||||||
_return_value:
|
_return_value:
|
||||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
|
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
|
||||||
return;
|
return;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) {
|
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
#if 0
|
||||||
SColVal cv;
|
SColVal cv;
|
||||||
|
|
||||||
(*ppArray) = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
|
(*ppArray) = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
|
||||||
|
@ -660,52 +655,27 @@ int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) {
|
||||||
taosArrayPush(*ppArray, &cv);
|
taosArrayPush(*ppArray, &cv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
|
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
|
||||||
int32_t n = 0;
|
int32_t n;
|
||||||
|
|
||||||
n += tPutI64(p ? p + n : p, pRow->ts);
|
TSROW_LEN(pRow, n);
|
||||||
n += tPutI8(p ? p + n : p, pRow->flags);
|
if (p) {
|
||||||
n += tPutI32v(p ? p + n : p, pRow->sver);
|
memcpy(p, pRow, n);
|
||||||
|
|
||||||
ASSERT(pRow->flags & 0xf);
|
|
||||||
|
|
||||||
switch (pRow->flags & 0xf) {
|
|
||||||
case TSROW_HAS_NONE:
|
|
||||||
case TSROW_HAS_NULL:
|
|
||||||
ASSERT(pRow->nData == 0);
|
|
||||||
ASSERT(pRow->pData == NULL);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
ASSERT(pRow->nData && pRow->pData);
|
|
||||||
n += tPutBinary(p ? p + n : p, pRow->pData, pRow->nData);
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow) {
|
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow) {
|
||||||
int32_t n = 0;
|
int32_t n;
|
||||||
|
|
||||||
n += tGetI64(p + n, &pRow->ts);
|
*ppRow = (STSRow2 *)p;
|
||||||
n += tGetI8(p + n, &pRow->flags);
|
TSROW_LEN(*ppRow, n);
|
||||||
n += tGetI32v(p + n, &pRow->sver);
|
|
||||||
|
|
||||||
ASSERT(pRow->flags);
|
|
||||||
switch (pRow->flags & 0xf) {
|
|
||||||
case TSROW_HAS_NONE:
|
|
||||||
case TSROW_HAS_NULL:
|
|
||||||
pRow->nData = 0;
|
|
||||||
pRow->pData = NULL;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
n += tGetBinary(p + n, &pRow->pData, &pRow->nData);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
@ -904,9 +874,7 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tTagIsJson(const void *pTag){
|
bool tTagIsJson(const void *pTag) { return (((const STag *)pTag)->flags & TD_TAG_JSON); }
|
||||||
return (((const STag *)pTag)->flags & TD_TAG_JSON);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool tTagIsJsonNull(void *data) {
|
bool tTagIsJsonNull(void *data) {
|
||||||
STag *pTag = (STag *)data;
|
STag *pTag = (STag *)data;
|
||||||
|
@ -1097,112 +1065,6 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1 // ===================================================================================================================
|
#if 1 // ===================================================================================================================
|
||||||
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
|
||||||
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
|
|
||||||
int spaceNeeded = pCol->bytes * maxPoints;
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
|
|
||||||
}
|
|
||||||
#ifdef TD_SUPPORT_BITMAP
|
|
||||||
int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints);
|
|
||||||
spaceNeeded += (int)nBitmapBytes;
|
|
||||||
// TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
|
|
||||||
// TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
|
|
||||||
// spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus
|
|
||||||
// remove the additional space
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (pCol->spaceSize < spaceNeeded) {
|
|
||||||
void *ptr = taosMemoryRealloc(pCol->pData, spaceNeeded);
|
|
||||||
if (ptr == NULL) {
|
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
pCol->pData = ptr;
|
|
||||||
pCol->spaceSize = spaceNeeded;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#ifdef TD_SUPPORT_BITMAP
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
|
|
||||||
pCol->dataOff = POINTER_SHIFT(pCol->pBitmap, nBitmapBytes);
|
|
||||||
} else {
|
|
||||||
pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Duplicate the schema and return a new object
|
|
||||||
*/
|
|
||||||
STSchema *tdDupSchema(const STSchema *pSchema) {
|
|
||||||
int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
|
|
||||||
STSchema *tSchema = (STSchema *)taosMemoryMalloc(tlen);
|
|
||||||
if (tSchema == NULL) return NULL;
|
|
||||||
|
|
||||||
memcpy((void *)tSchema, (void *)pSchema, tlen);
|
|
||||||
|
|
||||||
return tSchema;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Encode a schema to dst, and return the next pointer
|
|
||||||
*/
|
|
||||||
int tdEncodeSchema(void **buf, STSchema *pSchema) {
|
|
||||||
int tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
|
|
||||||
tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
|
|
||||||
|
|
||||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
|
||||||
STColumn *pCol = schemaColAt(pSchema, i);
|
|
||||||
tlen += taosEncodeFixedI8(buf, colType(pCol));
|
|
||||||
tlen += taosEncodeFixedI8(buf, colFlags(pCol));
|
|
||||||
tlen += taosEncodeFixedI16(buf, colColId(pCol));
|
|
||||||
tlen += taosEncodeFixedI16(buf, colBytes(pCol));
|
|
||||||
}
|
|
||||||
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decode a schema from a binary.
|
|
||||||
*/
|
|
||||||
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
|
|
||||||
int version = 0;
|
|
||||||
int numOfCols = 0;
|
|
||||||
STSchemaBuilder schemaBuilder;
|
|
||||||
|
|
||||||
buf = taosDecodeFixedI32(buf, &version);
|
|
||||||
buf = taosDecodeFixedI32(buf, &numOfCols);
|
|
||||||
|
|
||||||
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
|
|
||||||
|
|
||||||
for (int i = 0; i < numOfCols; i++) {
|
|
||||||
col_type_t type = 0;
|
|
||||||
int8_t flags = 0;
|
|
||||||
col_id_t colId = 0;
|
|
||||||
col_bytes_t bytes = 0;
|
|
||||||
buf = taosDecodeFixedI8(buf, &type);
|
|
||||||
buf = taosDecodeFixedI8(buf, &flags);
|
|
||||||
buf = taosDecodeFixedI16(buf, &colId);
|
|
||||||
buf = taosDecodeFixedI32(buf, &bytes);
|
|
||||||
if (tdAddColToSchema(&schemaBuilder, type, flags, colId, bytes) < 0) {
|
|
||||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
|
||||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
||||||
if (pBuilder == NULL) return -1;
|
if (pBuilder == NULL) return -1;
|
||||||
|
|
||||||
|
@ -1239,22 +1101,22 @@ int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, c
|
||||||
}
|
}
|
||||||
|
|
||||||
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
|
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
|
||||||
colSetType(pCol, type);
|
pCol->type = type;
|
||||||
colSetColId(pCol, colId);
|
pCol->colId = colId;
|
||||||
colSetFlags(pCol, flags);
|
pCol->flags = flags;
|
||||||
if (pBuilder->nCols == 0) {
|
if (pBuilder->nCols == 0) {
|
||||||
colSetOffset(pCol, 0);
|
pCol->offset = 0;
|
||||||
} else {
|
} else {
|
||||||
STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols - 1]);
|
STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols - 1]);
|
||||||
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
|
pCol->offset = pTCol->offset + TYPE_BYTES[pTCol->type];
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
colSetBytes(pCol, bytes);
|
pCol->bytes = bytes;
|
||||||
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
|
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
|
||||||
pBuilder->vlen += bytes - sizeof(VarDataLenT);
|
pBuilder->vlen += bytes - sizeof(VarDataLenT);
|
||||||
} else {
|
} else {
|
||||||
colSetBytes(pCol, TYPE_BYTES[type]);
|
pCol->bytes = TYPE_BYTES[type];
|
||||||
pBuilder->tlen += TYPE_BYTES[type];
|
pBuilder->tlen += TYPE_BYTES[type];
|
||||||
pBuilder->vlen += TYPE_BYTES[type];
|
pBuilder->vlen += TYPE_BYTES[type];
|
||||||
}
|
}
|
||||||
|
@ -1275,151 +1137,19 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
|
||||||
STSchema *pSchema = (STSchema *)taosMemoryMalloc(tlen);
|
STSchema *pSchema = (STSchema *)taosMemoryMalloc(tlen);
|
||||||
if (pSchema == NULL) return NULL;
|
if (pSchema == NULL) return NULL;
|
||||||
|
|
||||||
schemaVersion(pSchema) = pBuilder->version;
|
pSchema->version = pBuilder->version;
|
||||||
schemaNCols(pSchema) = pBuilder->nCols;
|
pSchema->numOfCols = pBuilder->nCols;
|
||||||
schemaTLen(pSchema) = pBuilder->tlen;
|
pSchema->tlen = pBuilder->tlen;
|
||||||
schemaFLen(pSchema) = pBuilder->flen;
|
pSchema->flen = pBuilder->flen;
|
||||||
schemaVLen(pSchema) = pBuilder->vlen;
|
pSchema->vlen = pBuilder->vlen;
|
||||||
|
|
||||||
#ifdef TD_SUPPORT_BITMAP
|
#ifdef TD_SUPPORT_BITMAP
|
||||||
schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema));
|
pSchema->tlen += (int)TD_BITMAP_BYTES(pSchema->numOfCols);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
|
memcpy(&pSchema->columns[0], pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
|
||||||
|
|
||||||
return pSchema;
|
return pSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
|
|
||||||
pDataCol->type = colType(pCol);
|
|
||||||
pDataCol->colId = colColId(pCol);
|
|
||||||
pDataCol->bytes = colBytes(pCol);
|
|
||||||
pDataCol->offset = colOffset(pCol) + 0; // TD_DATA_ROW_HEAD_SIZE;
|
|
||||||
|
|
||||||
pDataCol->len = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
|
||||||
} else {
|
|
||||||
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isNEleNull(SDataCol *pCol, int nEle) {
|
|
||||||
if (isAllRowsNull(pCol)) return true;
|
|
||||||
for (int i = 0; i < nEle; ++i) {
|
|
||||||
if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *dataColSetOffset(SDataCol *pCol, int nEle) {
|
|
||||||
ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));
|
|
||||||
|
|
||||||
void *tptr = pCol->pData;
|
|
||||||
// char *tptr = (char *)(pCol->pData);
|
|
||||||
|
|
||||||
VarDataOffsetT offset = 0;
|
|
||||||
for (int i = 0; i < nEle; ++i) {
|
|
||||||
pCol->dataOff[i] = offset;
|
|
||||||
offset += varDataTLen(tptr);
|
|
||||||
tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
|
|
||||||
}
|
|
||||||
return POINTER_SHIFT(tptr, varDataTLen(tptr));
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
|
|
||||||
SDataCols *pCols = (SDataCols *)taosMemoryCalloc(1, sizeof(SDataCols));
|
|
||||||
if (pCols == NULL) {
|
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCols->maxPoints = maxRows;
|
|
||||||
pCols->maxCols = maxCols;
|
|
||||||
pCols->numOfRows = 0;
|
|
||||||
pCols->numOfCols = 0;
|
|
||||||
pCols->bitmapMode = TSDB_BITMODE_DEFAULT;
|
|
||||||
|
|
||||||
if (maxCols > 0) {
|
|
||||||
pCols->cols = (SDataCol *)taosMemoryCalloc(maxCols, sizeof(SDataCol));
|
|
||||||
if (pCols->cols == NULL) {
|
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
|
|
||||||
strerror(errno));
|
|
||||||
tdFreeDataCols(pCols);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
#if 0 // no need as calloc used
|
|
||||||
int i;
|
|
||||||
for (i = 0; i < maxCols; i++) {
|
|
||||||
pCols->cols[i].spaceSize = 0;
|
|
||||||
pCols->cols[i].len = 0;
|
|
||||||
pCols->cols[i].pData = NULL;
|
|
||||||
pCols->cols[i].dataOff = NULL;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
return pCols;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
|
|
||||||
int i;
|
|
||||||
int oldMaxCols = pCols->maxCols;
|
|
||||||
if (schemaNCols(pSchema) > oldMaxCols) {
|
|
||||||
pCols->maxCols = schemaNCols(pSchema);
|
|
||||||
void *ptr = (SDataCol *)taosMemoryRealloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
|
|
||||||
if (ptr == NULL) return -1;
|
|
||||||
pCols->cols = ptr;
|
|
||||||
for (i = oldMaxCols; i < pCols->maxCols; ++i) {
|
|
||||||
pCols->cols[i].pData = NULL;
|
|
||||||
pCols->cols[i].dataOff = NULL;
|
|
||||||
pCols->cols[i].pBitmap = NULL;
|
|
||||||
pCols->cols[i].spaceSize = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#if 0
|
|
||||||
tdResetDataCols(pCols); // redundant loop to reset len/blen to 0, already reset in following dataColInit(...)
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pCols->numOfRows = 0;
|
|
||||||
pCols->bitmapMode = TSDB_BITMODE_DEFAULT;
|
|
||||||
pCols->numOfCols = schemaNCols(pSchema);
|
|
||||||
|
|
||||||
for (i = 0; i < schemaNCols(pSchema); ++i) {
|
|
||||||
dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataCols *tdFreeDataCols(SDataCols *pCols) {
|
|
||||||
int i;
|
|
||||||
if (pCols) {
|
|
||||||
if (pCols->cols) {
|
|
||||||
int maxCols = pCols->maxCols;
|
|
||||||
for (i = 0; i < maxCols; ++i) {
|
|
||||||
SDataCol *pCol = &pCols->cols[i];
|
|
||||||
taosMemoryFreeClear(pCol->pData);
|
|
||||||
}
|
|
||||||
taosMemoryFree(pCols->cols);
|
|
||||||
pCols->cols = NULL;
|
|
||||||
}
|
|
||||||
taosMemoryFree(pCols);
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tdResetDataCols(SDataCols *pCols) {
|
|
||||||
if (pCols != NULL) {
|
|
||||||
pCols->numOfRows = 0;
|
|
||||||
pCols->bitmapMode = 0;
|
|
||||||
for (int i = 0; i < pCols->maxCols; ++i) {
|
|
||||||
dataColReset(pCols->cols + i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
#endif
|
|
@ -36,24 +36,6 @@ static uint8_t tdGetBitmapByte(uint8_t byte);
|
||||||
static int32_t tdCompareColId(const void *arg1, const void *arg2);
|
static int32_t tdCompareColId(const void *arg1, const void *arg2);
|
||||||
static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2);
|
static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2);
|
||||||
|
|
||||||
// static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief src2 data has more priority than src1
|
|
||||||
*
|
|
||||||
* @param target
|
|
||||||
* @param src1
|
|
||||||
* @param iter1
|
|
||||||
* @param limit1
|
|
||||||
* @param src2
|
|
||||||
* @param iter2
|
|
||||||
* @param limit2
|
|
||||||
* @param tRows
|
|
||||||
* @param update
|
|
||||||
*/
|
|
||||||
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
|
|
||||||
int limit2, int tRows, bool update);
|
|
||||||
|
|
||||||
// implementation
|
// implementation
|
||||||
/**
|
/**
|
||||||
* @brief Compress bitmap bytes comprised of 2-bits to counterpart of 1-bit.
|
* @brief Compress bitmap bytes comprised of 2-bits to counterpart of 1-bit.
|
||||||
|
@ -287,33 +269,6 @@ void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index, bool setBitmap, int8_t bitmapMode) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
pCol->dataOff[index] = pCol->len;
|
|
||||||
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
|
|
||||||
setVardataNull(ptr, pCol->type);
|
|
||||||
pCol->len += varDataTLen(ptr);
|
|
||||||
} else {
|
|
||||||
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
|
|
||||||
pCol->len += TYPE_BYTES[pCol->type];
|
|
||||||
}
|
|
||||||
if (setBitmap) {
|
|
||||||
tdSetBitmapValType(pCol->pBitmap, index, TD_VTYPE_NONE, bitmapMode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
|
|
||||||
// if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
// pCol->len = 0;
|
|
||||||
// for (int i = 0; i < nEle; i++) {
|
|
||||||
// dataColSetNullAt(pCol, i);
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
|
||||||
// pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Set bitmap area by byte preferentially and then by bit.
|
* @brief Set bitmap area by byte preferentially and then by bit.
|
||||||
*
|
*
|
||||||
|
@ -362,56 +317,6 @@ bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void dataColSetNoneAt(SDataCol *pCol, int index, bool setBitmap, int8_t bitmapMode) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
pCol->dataOff[index] = pCol->len;
|
|
||||||
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
|
|
||||||
setVardataNull(ptr, pCol->type);
|
|
||||||
pCol->len += varDataTLen(ptr);
|
|
||||||
} else {
|
|
||||||
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
|
|
||||||
pCol->len += TYPE_BYTES[pCol->type];
|
|
||||||
}
|
|
||||||
if (setBitmap) {
|
|
||||||
tdSetBitmapValType(pCol->pBitmap, index, TD_VTYPE_NONE, bitmapMode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dataColSetNEleNone(SDataCol *pCol, int nEle, int8_t bitmapMode) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
pCol->len = 0;
|
|
||||||
for (int i = 0; i < nEle; ++i) {
|
|
||||||
dataColSetNoneAt(pCol, i, false, bitmapMode);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
|
||||||
pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
|
||||||
}
|
|
||||||
#ifdef TD_SUPPORT_BITMAP
|
|
||||||
tdSetBitmapValTypeN(pCol->pBitmap, nEle, TD_VTYPE_NONE, bitmapMode);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
void trbSetRowVersion(SRowBuilder *pRB, uint64_t ver) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
void trbSetRowTS(SRowBuilder *pRB, TSKEY ts) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
STSRow *tdRowDup(STSRow *row) {
|
STSRow *tdRowDup(STSRow *row) {
|
||||||
STSRow *trow = taosMemoryMalloc(TD_ROW_LEN(row));
|
STSRow *trow = taosMemoryMalloc(TD_ROW_LEN(row));
|
||||||
if (trow == NULL) return NULL;
|
if (trow == NULL) return NULL;
|
||||||
|
@ -420,511 +325,6 @@ STSRow *tdRowDup(STSRow *row) {
|
||||||
return trow;
|
return trow;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief
|
|
||||||
*
|
|
||||||
* @param pCol
|
|
||||||
* @param valType
|
|
||||||
* @param val
|
|
||||||
* @param numOfRows
|
|
||||||
* @param maxPoints
|
|
||||||
* @param bitmapMode default is 0(2 bits), otherwise 1(1 bit)
|
|
||||||
* @param isMerge merge to current row
|
|
||||||
* @return int
|
|
||||||
*/
|
|
||||||
int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints,
|
|
||||||
int8_t bitmapMode, bool isMerge) {
|
|
||||||
TASSERT(pCol != NULL);
|
|
||||||
|
|
||||||
// Assume that the columns not specified during insert/upsert mean None.
|
|
||||||
if (isAllRowsNone(pCol)) {
|
|
||||||
if (tdValIsNone(valType)) {
|
|
||||||
// all None value yet, just return
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
|
|
||||||
if (numOfRows > 0) {
|
|
||||||
// Find the first not None value, fill all previous values as None
|
|
||||||
dataColSetNEleNone(pCol, numOfRows, bitmapMode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const void *value = val;
|
|
||||||
if (!tdValTypeIsNorm(valType) || !val) {
|
|
||||||
// TODO:
|
|
||||||
// 1. back compatibility and easy to debug with codes of 2.0 to save NULL values.
|
|
||||||
// 2. later on, considering further optimization, don't save Null/None for VarType.
|
|
||||||
value = getNullValue(pCol->type);
|
|
||||||
}
|
|
||||||
if (!isMerge) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
// set offset
|
|
||||||
pCol->dataOff[numOfRows] = pCol->len;
|
|
||||||
// Copy data
|
|
||||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
|
|
||||||
// Update the length
|
|
||||||
pCol->len += varDataTLen(value);
|
|
||||||
} else {
|
|
||||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
|
|
||||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
|
||||||
pCol->len += pCol->bytes;
|
|
||||||
}
|
|
||||||
} else if (!tdValTypeIsNone(valType)) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
// keep the last offset
|
|
||||||
// discard the last var data
|
|
||||||
int32_t lastVarLen = varDataTLen(POINTER_SHIFT(pCol->pData, pCol->dataOff[numOfRows]));
|
|
||||||
pCol->len -= lastVarLen;
|
|
||||||
// Copy data
|
|
||||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
|
|
||||||
// Update the length
|
|
||||||
pCol->len += varDataTLen(value);
|
|
||||||
} else {
|
|
||||||
ASSERT(pCol->len - TYPE_BYTES[pCol->type] == TYPE_BYTES[pCol->type] * numOfRows);
|
|
||||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len - TYPE_BYTES[pCol->type]), value, pCol->bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef TD_SUPPORT_BITMAP
|
|
||||||
if (!isMerge || !tdValTypeIsNone(valType)) {
|
|
||||||
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// internal
|
|
||||||
static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
|
||||||
#if 0
|
|
||||||
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow));
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// Multi-Version rows with the same key and different versions supported
|
|
||||||
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) <= TD_ROW_KEY(pRow));
|
|
||||||
|
|
||||||
int rcol = 1;
|
|
||||||
int dcol = 1;
|
|
||||||
void *pBitmap = tdGetBitmapAddrTp(pRow, pSchema->flen);
|
|
||||||
|
|
||||||
SDataCol *pDataCol = &(pCols->cols[0]);
|
|
||||||
ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
|
||||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
|
|
||||||
while (dcol < pCols->numOfCols) {
|
|
||||||
pDataCol = &(pCols->cols[dcol]);
|
|
||||||
if (rcol >= schemaNCols(pSchema)) {
|
|
||||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
++dcol;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
STColumn *pRowCol = schemaColAt(pSchema, rcol);
|
|
||||||
SCellVal sVal = {0};
|
|
||||||
if (pRowCol->colId == pDataCol->colId) {
|
|
||||||
if (tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset - sizeof(TSKEY), rcol - 1) < 0) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
++dcol;
|
|
||||||
++rcol;
|
|
||||||
} else if (pRowCol->colId < pDataCol->colId) {
|
|
||||||
++rcol;
|
|
||||||
} else {
|
|
||||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
++dcol;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#if 0
|
|
||||||
++pCols->numOfRows;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
// internal
|
|
||||||
static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
|
||||||
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow));
|
|
||||||
|
|
||||||
int rcol = 0;
|
|
||||||
int dcol = 1;
|
|
||||||
int tRowCols = tdRowGetNCols(pRow) - 1; // the primary TS key not included in kvRowColIdx part
|
|
||||||
int tSchemaCols = schemaNCols(pSchema) - 1;
|
|
||||||
void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow));
|
|
||||||
|
|
||||||
SDataCol *pDataCol = &(pCols->cols[0]);
|
|
||||||
ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
|
||||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
|
|
||||||
while (dcol < pCols->numOfCols) {
|
|
||||||
pDataCol = &(pCols->cols[dcol]);
|
|
||||||
if (rcol >= tRowCols || rcol >= tSchemaCols) {
|
|
||||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
++dcol;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
SKvRowIdx *pIdx = tdKvRowColIdxAt(pRow, rcol);
|
|
||||||
int16_t colIdx = -1;
|
|
||||||
if (pIdx) {
|
|
||||||
colIdx = POINTER_DISTANCE(pIdx, TD_ROW_COL_IDX(pRow)) / sizeof(SKvRowIdx);
|
|
||||||
}
|
|
||||||
TASSERT(colIdx >= 0);
|
|
||||||
SCellVal sVal = {0};
|
|
||||||
if (pIdx->colId == pDataCol->colId) {
|
|
||||||
if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pIdx->offset, colIdx) < 0) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
++dcol;
|
|
||||||
++rcol;
|
|
||||||
} else if (pIdx->colId < pDataCol->colId) {
|
|
||||||
++rcol;
|
|
||||||
} else {
|
|
||||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
|
||||||
isMerge);
|
|
||||||
++dcol;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#if 0
|
|
||||||
++pCols->numOfRows;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief exposed
|
|
||||||
*
|
|
||||||
* @param pRow
|
|
||||||
* @param pSchema
|
|
||||||
* @param pCols
|
|
||||||
*/
|
|
||||||
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
|
||||||
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
|
|
||||||
printf("%s:%d ts: %" PRIi64 " sver:%d maxCols:%" PRIi16 " nCols:%" PRIi16 ", nRows:%d\n", __func__, __LINE__,
|
|
||||||
TD_ROW_KEY(pRow), TD_ROW_SVER(pRow), pCols->maxCols, pCols->numOfCols, pCols->numOfRows);
|
|
||||||
#endif
|
|
||||||
if (TD_IS_TP_ROW(pRow)) {
|
|
||||||
return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge);
|
|
||||||
} else if (TD_IS_KV_ROW(pRow)) {
|
|
||||||
return tdAppendKvRowToDataCol(pRow, pSchema, pCols, isMerge);
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief source data has more priority than target
|
|
||||||
*
|
|
||||||
* @param target
|
|
||||||
* @param source
|
|
||||||
* @param rowsToMerge
|
|
||||||
* @param pOffset
|
|
||||||
* @param update
|
|
||||||
* @param maxVer
|
|
||||||
* @return int
|
|
||||||
*/
|
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool update,
|
|
||||||
TDRowVerT maxVer) {
|
|
||||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
|
||||||
ASSERT(target->numOfCols == source->numOfCols);
|
|
||||||
int offset = 0;
|
|
||||||
|
|
||||||
if (pOffset == NULL) {
|
|
||||||
pOffset = &offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataCols *pTarget = NULL;
|
|
||||||
|
|
||||||
if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap
|
|
||||||
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
|
||||||
// TODO: filter the maxVer
|
|
||||||
TSKEY lastKey = TSKEY_INITIAL_VAL;
|
|
||||||
for (int i = 0; i < rowsToMerge; ++i) {
|
|
||||||
bool merge = false;
|
|
||||||
for (int j = 0; j < source->numOfCols; j++) {
|
|
||||||
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
|
|
||||||
SCellVal sVal = {0};
|
|
||||||
if (tdGetColDataOfRow(&sVal, source->cols + j, i + (*pOffset), source->bitmapMode) < 0) {
|
|
||||||
TASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (j == 0) {
|
|
||||||
if (lastKey == *(TSKEY *)sVal.val) {
|
|
||||||
if (!update) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
merge = true;
|
|
||||||
} else if (lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
++target->numOfRows;
|
|
||||||
}
|
|
||||||
|
|
||||||
lastKey = *(TSKEY *)sVal.val;
|
|
||||||
}
|
|
||||||
if (i == 0) {
|
|
||||||
(target->cols + j)->bitmap = (source->cols + j)->bitmap;
|
|
||||||
}
|
|
||||||
|
|
||||||
tdAppendValToDataCol(target->cols + j, sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
|
||||||
target->bitmapMode, merge);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
++target->numOfRows;
|
|
||||||
}
|
|
||||||
(*pOffset) += rowsToMerge;
|
|
||||||
} else {
|
|
||||||
pTarget = tdDupDataCols(target, true);
|
|
||||||
if (pTarget == NULL) goto _err;
|
|
||||||
|
|
||||||
int iter1 = 0;
|
|
||||||
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
|
|
||||||
pTarget->numOfRows + rowsToMerge, update);
|
|
||||||
}
|
|
||||||
|
|
||||||
tdFreeDataCols(pTarget);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
_err:
|
|
||||||
tdFreeDataCols(pTarget);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tdAppendValToDataCols(SDataCols *target, SDataCols *src, int iter, bool isMerge) {
|
|
||||||
for (int i = 0; i < src->numOfCols; ++i) {
|
|
||||||
ASSERT(target->cols[i].type == src->cols[i].type);
|
|
||||||
if (src->cols[i].len > 0 || target->cols[i].len > 0) {
|
|
||||||
SCellVal sVal = {0};
|
|
||||||
if (tdGetColDataOfRow(&sVal, src->cols + i, iter, src->bitmapMode) < 0) {
|
|
||||||
TASSERT(0);
|
|
||||||
}
|
|
||||||
if (isMerge) {
|
|
||||||
if (!tdValTypeIsNone(sVal.valType)) {
|
|
||||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
|
||||||
target->bitmapMode, isMerge);
|
|
||||||
} else {
|
|
||||||
// Keep the origin value for None
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
|
||||||
target->bitmapMode, isMerge);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* @brief src2 data has more priority than src1
|
|
||||||
*
|
|
||||||
* @param target
|
|
||||||
* @param src1
|
|
||||||
* @param iter1
|
|
||||||
* @param limit1
|
|
||||||
* @param src2
|
|
||||||
* @param iter2
|
|
||||||
* @param limit2
|
|
||||||
* @param tRows
|
|
||||||
* @param update
|
|
||||||
*/
|
|
||||||
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
|
|
||||||
int limit2, int tRows, bool update) {
|
|
||||||
tdResetDataCols(target);
|
|
||||||
target->bitmapMode = src1->bitmapMode;
|
|
||||||
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
|
|
||||||
int32_t nRows = 0;
|
|
||||||
|
|
||||||
// TODO: filter the maxVer
|
|
||||||
// TODO: handle the delete function
|
|
||||||
TSKEY lastKey = TSKEY_INITIAL_VAL;
|
|
||||||
while (nRows < tRows) {
|
|
||||||
if (*iter1 >= limit1 && *iter2 >= limit2) break;
|
|
||||||
|
|
||||||
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
|
|
||||||
// TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
|
|
||||||
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
|
|
||||||
// TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);
|
|
||||||
|
|
||||||
// ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
|
|
||||||
|
|
||||||
if (key1 <= key2) {
|
|
||||||
// select key1 if not delete
|
|
||||||
if (update && (lastKey == key1)) {
|
|
||||||
tdAppendValToDataCols(target, src1, *iter1, true);
|
|
||||||
} else if (lastKey != key1) {
|
|
||||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
++target->numOfRows;
|
|
||||||
}
|
|
||||||
tdAppendValToDataCols(target, src1, *iter1, false);
|
|
||||||
}
|
|
||||||
++nRows;
|
|
||||||
++(*iter1);
|
|
||||||
lastKey = key1;
|
|
||||||
} else {
|
|
||||||
// use key2 if not deleted
|
|
||||||
// TODO: handle the delete function
|
|
||||||
if (update && (lastKey == key2)) {
|
|
||||||
tdAppendValToDataCols(target, src2, *iter2, true);
|
|
||||||
} else if (lastKey != key2) {
|
|
||||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
++target->numOfRows;
|
|
||||||
}
|
|
||||||
tdAppendValToDataCols(target, src2, *iter2, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
++nRows;
|
|
||||||
++(*iter2);
|
|
||||||
lastKey = key2;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(target->numOfRows <= target->maxPoints - 1);
|
|
||||||
}
|
|
||||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
|
||||||
++target->numOfRows;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema1, STSchema *pSchema2) {
|
|
||||||
#if 0
|
|
||||||
ASSERT(TD_ROW_KEY(row1) == TD_ROW_KEY(row2));
|
|
||||||
ASSERT(schemaVersion(pSchema1) == TD_ROW_SVER(row1));
|
|
||||||
ASSERT(schemaVersion(pSchema2) == TD_ROW_SVER(row2));
|
|
||||||
ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
|
|
||||||
if (stashRow == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
STSRow pRow = buffer;
|
|
||||||
STpRow dataRow = memRowDataBody(pRow);
|
|
||||||
memRowSetType(pRow, SMEM_ROW_DATA);
|
|
||||||
dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version
|
|
||||||
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));
|
|
||||||
|
|
||||||
TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
|
|
||||||
|
|
||||||
int32_t i = 0; // row1
|
|
||||||
int32_t j = 0; // row2
|
|
||||||
int32_t nCols1 = schemaNCols(pSchema1);
|
|
||||||
int32_t nCols2 = schemaNCols(pSchema2);
|
|
||||||
SColInfo colInfo = {0};
|
|
||||||
int32_t kvIdx1 = 0, kvIdx2 = 0;
|
|
||||||
|
|
||||||
while (i < nCols1) {
|
|
||||||
STColumn *pCol = schemaColAt(pSchema1, i);
|
|
||||||
void * val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
|
|
||||||
// if val1 != NULL, use val1;
|
|
||||||
if (val1 != NULL && !isNull(val1, pCol->type)) {
|
|
||||||
tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
|
|
||||||
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
|
|
||||||
setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
|
|
||||||
taosArrayPush(stashRow, &colInfo);
|
|
||||||
++i; // next col
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *val2 = NULL;
|
|
||||||
while (j < nCols2) {
|
|
||||||
STColumn *tCol = schemaColAt(pSchema2, j);
|
|
||||||
if (tCol->colId < pCol->colId) {
|
|
||||||
++j;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (tCol->colId == pCol->colId) {
|
|
||||||
val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
|
|
||||||
} else if (tCol->colId > pCol->colId) {
|
|
||||||
// set NULL
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
} // end of while(j<nCols2)
|
|
||||||
if (val2 == NULL) {
|
|
||||||
val2 = (void *)getNullValue(pCol->type);
|
|
||||||
}
|
|
||||||
tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
|
|
||||||
if (!isNull(val2, pCol->type)) {
|
|
||||||
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
|
|
||||||
setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
|
|
||||||
taosArrayPush(stashRow, &colInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
++i; // next col
|
|
||||||
}
|
|
||||||
|
|
||||||
dataLen = TD_ROW_LEN(pRow);
|
|
||||||
|
|
||||||
if (kvLen < dataLen) {
|
|
||||||
// scan stashRow and generate SKVRow
|
|
||||||
memset(buffer, 0, sizeof(dataLen));
|
|
||||||
STSRow tRow = buffer;
|
|
||||||
memRowSetType(tRow, SMEM_ROW_KV);
|
|
||||||
SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
|
|
||||||
int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
|
|
||||||
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
|
|
||||||
kvRowSetNCols(kvRow, nKvNCols);
|
|
||||||
memRowSetKvVersion(tRow, pSchema1->version);
|
|
||||||
|
|
||||||
int32_t toffset = 0;
|
|
||||||
int16_t k;
|
|
||||||
for (k = 0; k < nKvNCols; ++k) {
|
|
||||||
SColInfo *pColInfo = taosArrayGet(stashRow, k);
|
|
||||||
tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
|
|
||||||
toffset += sizeof(SColIdx);
|
|
||||||
}
|
|
||||||
ASSERT(kvLen == TD_ROW_LEN(tRow));
|
|
||||||
}
|
|
||||||
taosArrayDestroy(stashRow);
|
|
||||||
return buffer;
|
|
||||||
#endif
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|
||||||
SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
|
|
||||||
if (pRet == NULL) return NULL;
|
|
||||||
|
|
||||||
pRet->numOfCols = pDataCols->numOfCols;
|
|
||||||
pRet->bitmapMode = pDataCols->bitmapMode;
|
|
||||||
pRet->sversion = pDataCols->sversion;
|
|
||||||
if (keepData) pRet->numOfRows = pDataCols->numOfRows;
|
|
||||||
|
|
||||||
for (int i = 0; i < pDataCols->numOfCols; ++i) {
|
|
||||||
pRet->cols[i].type = pDataCols->cols[i].type;
|
|
||||||
pRet->cols[i].bitmap = pDataCols->cols[i].bitmap;
|
|
||||||
pRet->cols[i].colId = pDataCols->cols[i].colId;
|
|
||||||
pRet->cols[i].bytes = pDataCols->cols[i].bytes;
|
|
||||||
pRet->cols[i].offset = pDataCols->cols[i].offset;
|
|
||||||
|
|
||||||
if (keepData) {
|
|
||||||
if (pDataCols->cols[i].len > 0) {
|
|
||||||
if (tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
|
|
||||||
tdFreeDataCols(pRet);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pRet->cols[i].len = pDataCols->cols[i].len;
|
|
||||||
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
|
|
||||||
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
|
|
||||||
int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints;
|
|
||||||
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize);
|
|
||||||
}
|
|
||||||
if (!TD_COL_ROWS_NORM(pRet->cols + i)) {
|
|
||||||
memcpy(pRet->cols[i].pBitmap, pDataCols->cols[i].pBitmap, TD_BITMAP_BYTES(pDataCols->numOfRows));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pRet;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) {
|
void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) {
|
||||||
STSRowIter iter = {0};
|
STSRowIter iter = {0};
|
||||||
tdSTSRowIterInit(&iter, pSchema);
|
tdSTSRowIterInit(&iter, pSchema);
|
||||||
|
@ -1020,32 +420,6 @@ void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dataColGetNEleLen(SDataCol *pDataCol, int32_t rows, int8_t bitmapMode) {
|
|
||||||
ASSERT(rows > 0);
|
|
||||||
int32_t result = 0;
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pDataCol->type)) {
|
|
||||||
result += pDataCol->dataOff[rows - 1];
|
|
||||||
SCellVal val = {0};
|
|
||||||
if (tdGetColDataOfRow(&val, pDataCol, rows - 1, bitmapMode) < 0) {
|
|
||||||
TASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Currently, count the varDataTLen in of Null/None cols considering back compatibility test for 2.4
|
|
||||||
result += varDataTLen(val.val);
|
|
||||||
// TODO: later on, don't save Null/None for VarDataT for 3.0
|
|
||||||
// if (tdValTypeIsNorm(val.valType)) {
|
|
||||||
// result += varDataTLen(val.val);
|
|
||||||
// }
|
|
||||||
} else {
|
|
||||||
result += TYPE_BYTES[pDataCol->type] * rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pDataCol->len == result);
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal) {
|
bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, col_id_t colIdx, SCellVal *pVal) {
|
||||||
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow));
|
tdRowSetVal(pVal, TD_VTYPE_NORM, TD_ROW_KEY_ADDR(pRow));
|
||||||
|
@ -1082,40 +456,6 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int32_t row, int8_t bitmapMode) {
|
|
||||||
if (isAllRowsNone(pCol)) {
|
|
||||||
pVal->valType = TD_VTYPE_NONE;
|
|
||||||
#ifdef TD_SUPPORT_READ2
|
|
||||||
pVal->val = (void *)getNullValue(pCol->type);
|
|
||||||
#else
|
|
||||||
pVal->val = NULL;
|
|
||||||
#endif
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TD_COL_ROWS_NORM(pCol)) {
|
|
||||||
pVal->valType = TD_VTYPE_NORM;
|
|
||||||
} else if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType), bitmapMode) < 0) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tdValTypeIsNorm(pVal->valType)) {
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
|
||||||
pVal->val = POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
|
||||||
} else {
|
|
||||||
pVal->val = POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pVal->valType = TD_VTYPE_NULL;
|
|
||||||
#ifdef TD_SUPPORT_READ2
|
|
||||||
pVal->val = (void *)getNullValue(pCol->type);
|
|
||||||
#else
|
|
||||||
pVal->val = NULL;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
|
bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
|
||||||
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
pVal->val = &pIter->pRow->ts;
|
pVal->val = &pIter->pRow->ts;
|
||||||
|
|
|
@ -285,8 +285,8 @@ int32_t debugPrintSColVal(SColVal *cv, int8_t type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void debugPrintTSRow(STSRow2 *row, STSchema *pTSchema, const char *tags, int32_t ln) {
|
void debugPrintTSRow(STSRow2 *row, STSchema *pTSchema, const char *tags, int32_t ln) {
|
||||||
printf("%s:%d %s:v%d:%d ", tags, ln, (row->flags & 0xf0) ? "KV" : "TP", row->sver, row->nData);
|
// printf("%s:%d %s:v%d:%d ", tags, ln, (row->flags & 0xf0) ? "KV" : "TP", row->sver, row->nData);
|
||||||
for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) {
|
for (int16_t i = 0; i < pTSchema->numOfCols; ++i) {
|
||||||
SColVal cv = {0};
|
SColVal cv = {0};
|
||||||
tTSRowGet(row, pTSchema, i, &cv);
|
tTSRowGet(row, pTSchema, i, &cv);
|
||||||
debugPrintSColVal(&cv, pTSchema->columns[i].type);
|
debugPrintSColVal(&cv, pTSchema->columns[i].type);
|
||||||
|
@ -393,7 +393,7 @@ static int32_t checkSColVal(const char *rawVal, SColVal *cv, int8_t type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void checkTSRow(const char **data, STSRow2 *row, STSchema *pTSchema) {
|
static void checkTSRow(const char **data, STSRow2 *row, STSchema *pTSchema) {
|
||||||
for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) {
|
for (int16_t i = 0; i < pTSchema->numOfCols; ++i) {
|
||||||
SColVal cv = {0};
|
SColVal cv = {0};
|
||||||
tTSRowGet(row, pTSchema, i, &cv);
|
tTSRowGet(row, pTSchema, i, &cv);
|
||||||
checkSColVal(data[i], &cv, pTSchema->columns[i].type);
|
checkSColVal(data[i], &cv, pTSchema->columns[i].type);
|
||||||
|
|
|
@ -420,8 +420,8 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
|
||||||
}
|
}
|
||||||
|
|
||||||
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
|
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
|
||||||
if (pInterval->interval != pInterval->sliding && (pWin->ekey < pBlockInfo->calWin.skey ||
|
if (pInterval->interval != pInterval->sliding &&
|
||||||
pWin->skey > pBlockInfo->calWin.ekey) ) {
|
(pWin->ekey < pBlockInfo->calWin.skey || pWin->skey > pBlockInfo->calWin.ekey)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -813,7 +813,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
|
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
|
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
|
||||||
|
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
|
||||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||||
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||||
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
@ -846,7 +847,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
|
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
|
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
|
||||||
|
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
|
||||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||||
pBlock->info.rows, numOfOutput, pInfo->order);
|
pBlock->info.rows, numOfOutput, pInfo->order);
|
||||||
|
@ -1296,9 +1298,10 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC);
|
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC);
|
||||||
while (1) {
|
while (1) {
|
||||||
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
step =
|
||||||
|
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||||
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
|
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
|
||||||
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
|
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput);
|
||||||
if (pUpWins && res) {
|
if (pUpWins && res) {
|
||||||
SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
|
SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
|
||||||
taosArrayPush(pUpWins, &winRes);
|
taosArrayPush(pUpWins, &winRes);
|
||||||
|
|
|
@ -1154,7 +1154,6 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
|
@ -3512,8 +3511,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
setBufPageDirty(pPage, true);
|
setBufPageDirty(pPage, true);
|
||||||
releaseBufPage(pCtx->pBuf, pPage);
|
releaseBufPage(pCtx->pBuf, pPage);
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId,
|
qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId, pPos->offset);
|
||||||
pPos->offset);
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3814,7 +3812,7 @@ bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
|
||||||
|
|
||||||
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
pInfo->result = 0;
|
pInfo->result = 0;
|
||||||
pInfo->min = MAX_TS_KEY;
|
pInfo->min = TSKEY_MAX;
|
||||||
pInfo->max = 0;
|
pInfo->max = 0;
|
||||||
|
|
||||||
if (pCtx->numOfParams > 1) {
|
if (pCtx->numOfParams > 1) {
|
||||||
|
@ -3841,7 +3839,7 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInput->colDataAggIsSet) {
|
if (pInput->colDataAggIsSet) {
|
||||||
if (pInfo->min == MAX_TS_KEY) {
|
if (pInfo->min == TSKEY_MAX) {
|
||||||
pInfo->min = GET_INT64_VAL(&pAgg->min);
|
pInfo->min = GET_INT64_VAL(&pAgg->min);
|
||||||
pInfo->max = GET_INT64_VAL(&pAgg->max);
|
pInfo->max = GET_INT64_VAL(&pAgg->max);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue