feat: support update in mem/file
This commit is contained in:
parent
71b723e8dc
commit
7a8f8e11a5
|
@ -27,7 +27,7 @@ extern "C" {
|
|||
|
||||
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
|
||||
#define TD_SUPPORT_BITMAP
|
||||
#define TD_SUPPORT_READ2
|
||||
#undef TD_SUPPORT_READ2
|
||||
#define TD_SUPPORT_BACK2 // suppport back compatibility of 2.0
|
||||
|
||||
#define TASSERT(x) ASSERT(x)
|
||||
|
@ -387,8 +387,6 @@ typedef struct SDataCol {
|
|||
TSKEY ts; // only used in last NULL column
|
||||
} SDataCol;
|
||||
|
||||
|
||||
|
||||
#define isAllRowsNull(pCol) ((pCol)->len == 0)
|
||||
#define isAllRowsNone(pCol) ((pCol)->len == 0)
|
||||
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
||||
|
@ -482,7 +480,7 @@ void tdResetDataCols(SDataCols *pCols);
|
|||
int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
||||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||
SDataCols *tdFreeDataCols(SDataCols *pCols);
|
||||
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, TDRowVerT maxVer);
|
||||
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update, TDRowVerT maxVer);
|
||||
|
||||
// ----------------- K-V data row structure
|
||||
/* |<-------------------------------------- len -------------------------------------------->|
|
||||
|
|
|
@ -42,7 +42,7 @@ extern "C" {
|
|||
* @brief value type
|
||||
* - for data from client input and STSRow in memory, 3 types of value none/null/norm available
|
||||
*/
|
||||
#define TD_VTYPE_NORM 0x00U // normal val: not none, not null(no need assign value)
|
||||
#define TD_VTYPE_NORM 0x00U // normal val: not none, not null
|
||||
#define TD_VTYPE_NULL 0x01U // null val
|
||||
#define TD_VTYPE_NONE 0x02U // none or unknown/undefined
|
||||
#define TD_VTYPE_MAX 0x03U //
|
||||
|
@ -122,6 +122,8 @@ typedef struct {
|
|||
typedef struct {
|
||||
/// timestamp
|
||||
TSKEY ts;
|
||||
/// row version
|
||||
uint64_t ver;
|
||||
union {
|
||||
/// union field for encode and decode
|
||||
uint32_t info;
|
||||
|
@ -140,8 +142,6 @@ typedef struct {
|
|||
};
|
||||
/// row total length
|
||||
uint32_t len;
|
||||
/// row version
|
||||
uint64_t ver;
|
||||
/// the inline data, maybe a tuple or a k-v tuple
|
||||
char data[];
|
||||
} STSRow;
|
||||
|
@ -241,13 +241,13 @@ static FORCE_INLINE int32_t tdGetBitmapValType(const void *pBitmap, int16_t colI
|
|||
static FORCE_INLINE bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode);
|
||||
bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode);
|
||||
int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t maxPoints,
|
||||
int8_t bitmapMode);
|
||||
int8_t bitmapMode, bool isMerge);
|
||||
static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val,
|
||||
bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset);
|
||||
static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val,
|
||||
bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset,
|
||||
col_id_t colId);
|
||||
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols);
|
||||
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge);
|
||||
|
||||
/**
|
||||
* @brief
|
||||
|
@ -622,7 +622,6 @@ static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief To judge row type: STpRow/SKvRow
|
||||
*
|
||||
|
@ -719,6 +718,7 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
|
|||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -758,7 +758,6 @@ static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
|
||||
*
|
||||
|
@ -1250,16 +1249,16 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param colType
|
||||
* @param flen
|
||||
* @param offset
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param colType
|
||||
* @param flen
|
||||
* @param offset
|
||||
* @param colIdx start from 0
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
*/
|
||||
static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset,
|
||||
col_id_t colIdx, SCellVal *pVal) {
|
||||
|
@ -1273,14 +1272,14 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param offset
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param offset
|
||||
* @param colIdx start from 0
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
*/
|
||||
static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx,
|
||||
SCellVal *pVal) {
|
||||
|
@ -1397,14 +1396,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) {
|
||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) {
|
||||
STSRowIter iter = {0};
|
||||
tdSTSRowIterInit(&iter, pSchema);
|
||||
tdSTSRowIterReset(&iter, row);
|
||||
printf("%s >>>", tag);
|
||||
for (int i = 0; i < pSchema->numOfCols; ++i) {
|
||||
STColumn *stCol = pSchema->columns + i;
|
||||
SCellVal sVal = { 255, NULL};
|
||||
SCellVal sVal = {255, NULL};
|
||||
if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ extern "C" {
|
|||
#define TSKEY_MAX (INT64_MAX - 1)
|
||||
#define TSKEY_INITIAL_VAL TSKEY_MIN
|
||||
|
||||
#define TD_VER_MAX UINT64_MAX // TODO: use the real max version from query handle
|
||||
|
||||
// Bytes for each type.
|
||||
extern const int32_t TYPE_BYTES[15];
|
||||
|
||||
|
|
|
@ -33,6 +33,24 @@ const uint8_t tdVTypeByte[2][3] = {{
|
|||
// declaration
|
||||
static uint8_t tdGetBitmapByte(uint8_t byte);
|
||||
|
||||
// static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
||||
|
||||
/**
|
||||
* @brief src2 data has more priority than src1
|
||||
*
|
||||
* @param target
|
||||
* @param src1
|
||||
* @param iter1
|
||||
* @param limit1
|
||||
* @param src2
|
||||
* @param iter2
|
||||
* @param limit2
|
||||
* @param tRows
|
||||
* @param update
|
||||
*/
|
||||
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
|
||||
int limit2, int tRows, bool update);
|
||||
|
||||
// implementation
|
||||
/**
|
||||
* @brief Compress bitmap bytes comprised of 2-bits to counterpart of 1-bit.
|
||||
|
@ -229,23 +247,23 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) {
|
|||
void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) {
|
||||
int32_t i = 0, j = 0;
|
||||
int32_t nBytes = TD_BITMAP_BYTES(nBits);
|
||||
int32_t nStrictBytes = nBits / 4;
|
||||
int32_t nPartialBits = nBits - nStrictBytes * 4;
|
||||
int32_t nRoundBytes = nBits / 4;
|
||||
int32_t nRemainderBits = nBits - nRoundBytes * 4;
|
||||
|
||||
switch (nPartialBits) {
|
||||
switch (nRemainderBits) {
|
||||
case 0:
|
||||
// NOTHING TODO
|
||||
break;
|
||||
case 1: {
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes);
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nRoundBytes);
|
||||
*(uint8_t *)lastByte &= 0xC0;
|
||||
} break;
|
||||
case 2: {
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes);
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nRoundBytes);
|
||||
*(uint8_t *)lastByte &= 0xF0;
|
||||
} break;
|
||||
case 3: {
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nStrictBytes);
|
||||
void *lastByte = POINTER_SHIFT(srcBitmap, nRoundBytes);
|
||||
*(uint8_t *)lastByte &= 0xFC;
|
||||
} break;
|
||||
default:
|
||||
|
@ -266,10 +284,6 @@ void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) {
|
|||
}
|
||||
}
|
||||
|
||||
// static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
||||
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
|
||||
int limit2, int tRows, bool forceSetNull);
|
||||
|
||||
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index, bool setBitmap, int8_t bitmapMode) {
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
pCol->dataOff[index] = pCol->len;
|
||||
|
@ -410,11 +424,12 @@ STSRow *tdRowDup(STSRow *row) {
|
|||
* @param val
|
||||
* @param numOfRows
|
||||
* @param maxPoints
|
||||
* @param bitmapMode default is 0(2 bits), otherwise 1(1 bit)
|
||||
* @param bitmapMode default is 0(2 bits), otherwise 1(1 bit)
|
||||
* @param isMerge merge to current row
|
||||
* @return int
|
||||
*/
|
||||
int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints,
|
||||
int8_t bitmapMode) {
|
||||
int8_t bitmapMode, bool isMerge) {
|
||||
TASSERT(pCol != NULL);
|
||||
|
||||
// Assume that the columns not specified during insert/upsert mean None.
|
||||
|
@ -436,18 +451,35 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int
|
|||
// 2. later on, considering further optimization, don't save Null/None for VarType.
|
||||
val = getNullValue(pCol->type);
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
// set offset
|
||||
pCol->dataOff[numOfRows] = pCol->len;
|
||||
// Copy data
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, varDataTLen(val));
|
||||
// Update the length
|
||||
pCol->len += varDataTLen(val);
|
||||
if (!isMerge) {
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
// set offset
|
||||
pCol->dataOff[numOfRows] = pCol->len;
|
||||
// Copy data
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, varDataTLen(val));
|
||||
// Update the length
|
||||
pCol->len += varDataTLen(val);
|
||||
} else {
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, pCol->bytes);
|
||||
pCol->len += pCol->bytes;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, pCol->bytes);
|
||||
pCol->len += pCol->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
// keep the last offset
|
||||
// discard the last var data
|
||||
int32_t lastVarLen = varDataTLen(POINTER_SHIFT(pCol->pData, pCol->dataOff[numOfRows]));
|
||||
pCol->len -= lastVarLen;
|
||||
// Copy data
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, varDataTLen(val));
|
||||
// Update the length
|
||||
pCol->len += varDataTLen(val);
|
||||
} else {
|
||||
ASSERT(pCol->len - TYPE_BYTES[pCol->type] == TYPE_BYTES[pCol->type] * numOfRows);
|
||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len - TYPE_BYTES[pCol->type]), val, pCol->bytes);
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef TD_SUPPORT_BITMAP
|
||||
tdSetBitmapValType(pCol->pBitmap, numOfRows, valType, bitmapMode);
|
||||
#endif
|
||||
|
@ -455,8 +487,13 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int
|
|||
}
|
||||
|
||||
// internal
|
||||
static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) {
|
||||
static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
||||
#if 0
|
||||
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow));
|
||||
#endif
|
||||
|
||||
// Multi-Version rows with the same key and different versions supported
|
||||
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) <= TD_ROW_KEY(pRow));
|
||||
|
||||
int rcol = 1;
|
||||
int dcol = 1;
|
||||
|
@ -464,12 +501,14 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
|
||||
SDataCol *pDataCol = &(pCols->cols[0]);
|
||||
ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
|
||||
while (dcol < pCols->numOfCols) {
|
||||
pDataCol = &(pCols->cols[dcol]);
|
||||
if (rcol >= schemaNCols(pSchema)) {
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
continue;
|
||||
}
|
||||
|
@ -480,13 +519,15 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
if (tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset - sizeof(TSKEY), rcol - 1) < 0) {
|
||||
return terrno;
|
||||
}
|
||||
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
++rcol;
|
||||
} else if (pRowCol->colId < pDataCol->colId) {
|
||||
++rcol;
|
||||
} else {
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
}
|
||||
}
|
||||
|
@ -495,7 +536,7 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
// internal
|
||||
static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) {
|
||||
static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
||||
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow));
|
||||
|
||||
int rcol = 0;
|
||||
|
@ -506,12 +547,14 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
|
||||
SDataCol *pDataCol = &(pCols->cols[0]);
|
||||
ASSERT(pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
|
||||
while (dcol < pCols->numOfCols) {
|
||||
pDataCol = &(pCols->cols[dcol]);
|
||||
if (rcol >= tRowCols || rcol >= tSchemaCols) {
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
continue;
|
||||
}
|
||||
|
@ -527,13 +570,15 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
if (tdGetKvRowValOfCol(&sVal, pRow, pBitmap, pIdx->offset, colIdx) < 0) {
|
||||
return terrno;
|
||||
}
|
||||
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
++rcol;
|
||||
} else if (pIdx->colId < pDataCol->colId) {
|
||||
++rcol;
|
||||
} else {
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode);
|
||||
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode,
|
||||
isMerge);
|
||||
++dcol;
|
||||
}
|
||||
}
|
||||
|
@ -548,20 +593,30 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols
|
|||
* @param pRow
|
||||
* @param pSchema
|
||||
* @param pCols
|
||||
* @param forceSetNull
|
||||
*/
|
||||
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) {
|
||||
int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge) {
|
||||
if (TD_IS_TP_ROW(pRow)) {
|
||||
return tdAppendTpRowToDataCol(pRow, pSchema, pCols);
|
||||
return tdAppendTpRowToDataCol(pRow, pSchema, pCols, isMerge);
|
||||
} else if (TD_IS_KV_ROW(pRow)) {
|
||||
return tdAppendKvRowToDataCol(pRow, pSchema, pCols);
|
||||
return tdAppendKvRowToDataCol(pRow, pSchema, pCols, isMerge);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull,
|
||||
/**
|
||||
* @brief source data has more priority than target
|
||||
*
|
||||
* @param target
|
||||
* @param source
|
||||
* @param rowsToMerge
|
||||
* @param pOffset
|
||||
* @param update
|
||||
* @param maxVer
|
||||
* @return int
|
||||
*/
|
||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool update,
|
||||
TDRowVerT maxVer) {
|
||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
|
||||
ASSERT(target->numOfCols == source->numOfCols);
|
||||
|
@ -576,17 +631,35 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
|
|||
if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap
|
||||
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
|
||||
// TODO: filter the maxVer
|
||||
for (int i = 0; i < rowsToMerge; i++) {
|
||||
TSKEY lastKey = TSKEY_INITIAL_VAL;
|
||||
for (int i = 0; i < rowsToMerge; ++i) {
|
||||
bool merge = false;
|
||||
for (int j = 0; j < source->numOfCols; j++) {
|
||||
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
|
||||
SCellVal sVal = {0};
|
||||
if (tdGetColDataOfRow(&sVal, source->cols + j, i + (*pOffset), source->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
|
||||
if (j == 0) {
|
||||
if (lastKey == *(TSKEY *)sVal.val) {
|
||||
if (!update) {
|
||||
break;
|
||||
}
|
||||
merge = true;
|
||||
} else if (lastKey != TSKEY_INITIAL_VAL) {
|
||||
++target->numOfRows;
|
||||
}
|
||||
|
||||
lastKey = *(TSKEY *)sVal.val;
|
||||
}
|
||||
|
||||
tdAppendValToDataCol(target->cols + j, sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
target->bitmapMode, merge);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (rowsToMerge > 0) {
|
||||
++target->numOfRows;
|
||||
}
|
||||
(*pOffset) += rowsToMerge;
|
||||
|
@ -596,7 +669,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
|
|||
|
||||
int iter1 = 0;
|
||||
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
|
||||
pTarget->numOfRows + rowsToMerge, forceSetNull);
|
||||
pTarget->numOfRows + rowsToMerge, update);
|
||||
}
|
||||
|
||||
tdFreeDataCols(pTarget);
|
||||
|
@ -606,68 +679,95 @@ _err:
|
|||
tdFreeDataCols(pTarget);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// src2 data has more priority than src1
|
||||
static void tdAppendValToDataCols(SDataCols *target, SDataCols *src, int iter, bool isMerge) {
|
||||
for (int i = 0; i < src->numOfCols; ++i) {
|
||||
ASSERT(target->cols[i].type == src->cols[i].type);
|
||||
if (src->cols[i].len > 0 || target->cols[i].len > 0) {
|
||||
SCellVal sVal = {0};
|
||||
if (tdGetColDataOfRow(&sVal, src->cols + i, iter, src->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
if (isMerge) {
|
||||
if (!tdValTypeIsNone(sVal.valType)) {
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode, isMerge);
|
||||
} else {
|
||||
// Keep the origi value for None
|
||||
}
|
||||
} else {
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode, isMerge);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @brief src2 data has more priority than src1
|
||||
*
|
||||
* @param target
|
||||
* @param src1
|
||||
* @param iter1
|
||||
* @param limit1
|
||||
* @param src2
|
||||
* @param iter2
|
||||
* @param limit2
|
||||
* @param tRows
|
||||
* @param update
|
||||
*/
|
||||
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
|
||||
int limit2, int tRows, bool forceSetNull) {
|
||||
int limit2, int tRows, bool update) {
|
||||
tdResetDataCols(target);
|
||||
target->bitmapMode = src1->bitmapMode;
|
||||
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
|
||||
int32_t nRows = 0;
|
||||
|
||||
while (target->numOfRows < tRows) {
|
||||
// TODO: filter the maxVer
|
||||
// TODO: handle the delete function
|
||||
TSKEY lastKey = TSKEY_INITIAL_VAL;
|
||||
while (nRows < tRows) {
|
||||
if (*iter1 >= limit1 && *iter2 >= limit2) break;
|
||||
|
||||
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
|
||||
TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
|
||||
// TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
|
||||
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
|
||||
// TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);
|
||||
|
||||
ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
|
||||
// TODO: filter the maxVer
|
||||
if (key1 < key2) {
|
||||
for (int i = 0; i < src1->numOfCols; ++i) {
|
||||
ASSERT(target->cols[i].type == src1->cols[i].type);
|
||||
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
|
||||
SCellVal sVal = {0};
|
||||
if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1, src1->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
}
|
||||
}
|
||||
// ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
|
||||
|
||||
++target->numOfRows;
|
||||
if (key1 <= key2) {
|
||||
// select key1 if not delete
|
||||
if (update && (lastKey == key1)) {
|
||||
tdAppendValToDataCols(target, src1, *iter1, true);
|
||||
} else if (lastKey != key1) {
|
||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
||||
++target->numOfRows;
|
||||
}
|
||||
tdAppendValToDataCols(target, src1, *iter1, false);
|
||||
}
|
||||
++nRows;
|
||||
++(*iter1);
|
||||
} else if (key1 >= key2) {
|
||||
// TODO: filter the maxVer
|
||||
if ((key1 > key2) || ((key1 == key2) && !TKEY_IS_DELETED(key2))) {
|
||||
for (int i = 0; i < src2->numOfCols; ++i) {
|
||||
SCellVal sVal = {0};
|
||||
ASSERT(target->cols[i].type == src2->cols[i].type);
|
||||
if (tdGetColDataOfRow(&sVal, src2->cols + i, *iter2, src2->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
if (src2->cols[i].len > 0 && !tdValTypeIsNull(sVal.valType)) {
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
} else if (!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
|
||||
if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1, src1->bitmapMode) < 0) {
|
||||
TASSERT(0);
|
||||
}
|
||||
tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints,
|
||||
target->bitmapMode);
|
||||
} else if (target->cols[i].len > 0) {
|
||||
dataColSetNullAt(&target->cols[i], target->numOfRows, true, target->bitmapMode);
|
||||
}
|
||||
lastKey = key1;
|
||||
} else {
|
||||
// use key2 if not deleted
|
||||
// TODO: handle the delete function
|
||||
if (update && (lastKey == key2)) {
|
||||
tdAppendValToDataCols(target, src2, *iter2, true);
|
||||
} else if (lastKey != key2) {
|
||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
||||
++target->numOfRows;
|
||||
}
|
||||
++target->numOfRows;
|
||||
tdAppendValToDataCols(target, src2, *iter2, false);
|
||||
}
|
||||
|
||||
++nRows;
|
||||
++(*iter2);
|
||||
if (key1 == key2) ++(*iter1);
|
||||
lastKey = key2;
|
||||
}
|
||||
|
||||
ASSERT(target->numOfRows <= target->maxPoints);
|
||||
ASSERT(target->numOfRows <= target->maxPoints - 1);
|
||||
}
|
||||
if (nRows > 0) {
|
||||
++target->numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -777,7 +877,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
pRet->sversion = pDataCols->sversion;
|
||||
if (keepData) pRet->numOfRows = pDataCols->numOfRows;
|
||||
|
||||
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
for (int i = 0; i < pDataCols->numOfCols; ++i) {
|
||||
pRet->cols[i].type = pDataCols->cols[i].type;
|
||||
pRet->cols[i].bitmap = pDataCols->cols[i].bitmap;
|
||||
pRet->cols[i].colId = pDataCols->cols[i].colId;
|
||||
|
@ -797,8 +897,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
|||
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize);
|
||||
}
|
||||
if (!TD_COL_ROWS_NORM(pRet->cols + i)) {
|
||||
int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(pDataCols->numOfRows);
|
||||
memcpy(pRet->cols[i].pBitmap, pDataCols->cols[i].pBitmap, nBitmapBytes);
|
||||
memcpy(pRet->cols[i].pBitmap, pDataCols->cols[i].pBitmap, TD_BITMAP_BYTES(pDataCols->numOfRows));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -557,7 +557,7 @@ static const void *nullValues[] = {
|
|||
};
|
||||
|
||||
const void *getNullValue(int32_t type) {
|
||||
assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_UBIGINT);
|
||||
assert(type >= TSDB_DATA_TYPE_BOOL && type <= TSDB_DATA_TYPE_UBIGINT); // TODO: extend the types
|
||||
return nullValues[type - 1];
|
||||
}
|
||||
|
||||
|
|
|
@ -274,7 +274,8 @@ typedef enum {
|
|||
|
||||
typedef struct {
|
||||
uint8_t last : 1;
|
||||
uint8_t blkVer : 7;
|
||||
uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version)
|
||||
uint8_t blkVer : 6;
|
||||
uint8_t numOfSubBlocks;
|
||||
col_id_t numOfCols; // not including timestamp column
|
||||
uint32_t len; // data block length
|
||||
|
|
|
@ -1360,7 +1360,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
|||
TASSERT(0);
|
||||
}
|
||||
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
|
||||
pTarget->bitmapMode);
|
||||
pTarget->bitmapMode, false);
|
||||
}
|
||||
|
||||
++pTarget->numOfRows;
|
||||
|
@ -1371,7 +1371,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
|||
ASSERT(pSchema != NULL);
|
||||
}
|
||||
|
||||
tdAppendSTSRowToDataCol(row, pSchema, pTarget);
|
||||
tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
|
||||
|
||||
tSkipListIterNext(pCommitIter->pIter);
|
||||
} else {
|
||||
|
@ -1409,7 +1409,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
|||
}
|
||||
// TODO: tdAppendValToDataCol may fail
|
||||
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
|
||||
pTarget->bitmapMode);
|
||||
pTarget->bitmapMode, false);
|
||||
}
|
||||
|
||||
if (TD_SUPPORT_UPDATE(update)) {
|
||||
|
@ -1427,9 +1427,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
|||
STSRow *nextRow = tsdbNextIterRow(pCommitIter->pIter);
|
||||
|
||||
if (key2 < TD_ROW_KEY(nextRow)) {
|
||||
tdAppendSTSRowToDataCol(row, pSchema, pTarget);
|
||||
tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
|
||||
} else {
|
||||
tdAppendSTSRowToDataCol(row, pSchema, pTarget);
|
||||
tdAppendSTSRowToDataCol(row, pSchema, pTarget, false);
|
||||
}
|
||||
// TODO: merge with Multi-Version
|
||||
} else {
|
||||
|
|
|
@ -256,9 +256,12 @@ static STbData *tsdbNewTbData(tb_uid_t uid) {
|
|||
pTbData->keyMin = TSKEY_MAX;
|
||||
pTbData->keyMax = TSKEY_MIN;
|
||||
pTbData->nrows = 0;
|
||||
|
||||
#if 0
|
||||
pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_DISCARD_DUP_KEY,
|
||||
tsdbGetTsTupleKey);
|
||||
#endif
|
||||
pTbData->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), tkeyComparFn, SL_ALLOW_DUP_KEY,
|
||||
tsdbGetTsTupleKey);
|
||||
if (pTbData->pData == NULL) {
|
||||
taosMemoryFree(pTbData);
|
||||
return NULL;
|
||||
|
@ -303,7 +306,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
|
|||
}
|
||||
}
|
||||
|
||||
tdAppendSTSRowToDataCol(row, *ppSchema, pCols);
|
||||
tdAppendSTSRowToDataCol(row, *ppSchema, pCols, false);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -163,7 +163,7 @@ static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
|
|||
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
|
||||
static bool tsdbGetExternalRow(tsdbReaderT pHandle);
|
||||
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions);
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions);
|
||||
|
||||
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
|
||||
pBlockLoadInfo->slot = -1;
|
||||
|
@ -351,36 +351,43 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
|
|||
pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
}
|
||||
}
|
||||
#if 0
|
||||
int nQUERY = 0;
|
||||
#if 1
|
||||
int nQUERY = 0;
|
||||
#endif
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) {
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) {
|
||||
if (vnodeIsRollup(pVnode)) {
|
||||
int level = 0;
|
||||
#if 1
|
||||
#if 0
|
||||
int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
|
||||
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
|
||||
SRetention* pRetention = retentions + i;
|
||||
if (pRetention->keep <= 0 || (now - pRetention->keep) >= winSKey) {
|
||||
break;
|
||||
}
|
||||
++level;
|
||||
}
|
||||
#endif
|
||||
#if 0
|
||||
++nQUERY;
|
||||
if(nQUERY%3 == 0) {
|
||||
level = 2;
|
||||
} else if(nQUERY%2 == 0) {
|
||||
level = 1;
|
||||
} else {
|
||||
level = 0;
|
||||
#if 1
|
||||
switch ((nQUERY++) % 3) {
|
||||
case 0:
|
||||
level = 0;
|
||||
break;
|
||||
case 1:
|
||||
level = 1;
|
||||
break;
|
||||
default:
|
||||
level = 2;
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
if (level == TSDB_RETENTION_L0) {
|
||||
tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level);
|
||||
return VND_RSMA0(pVnode);
|
||||
} else if (level == TSDB_RETENTION_L1) {
|
||||
tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level);
|
||||
return VND_RSMA1(pVnode);
|
||||
} else {
|
||||
tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level);
|
||||
return VND_RSMA2(pVnode);
|
||||
}
|
||||
}
|
||||
|
@ -393,7 +400,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
|
|||
goto _end;
|
||||
}
|
||||
|
||||
STsdb* pTsdb = getTsdbByRetentions(pVnode, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions);
|
||||
STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions);
|
||||
|
||||
pReadHandle->order = pCond->order;
|
||||
pReadHandle->pTsdb = pTsdb;
|
||||
|
@ -803,12 +810,15 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
|
|||
tSkipListDestroyIter(pCheckInfo->iiter);
|
||||
}
|
||||
|
||||
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
|
||||
static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
|
||||
STSRow *rmem = NULL, *rimem = NULL;
|
||||
if (pCheckInfo->iter) {
|
||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||
if (node != NULL) {
|
||||
rmem = (STSRow*)SL_GET_NODE_DATA(node);
|
||||
if (TD_ROW_KEY(rmem) > maxVer) {
|
||||
rmem = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -816,6 +826,9 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
|
|||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||
if (node != NULL) {
|
||||
rimem = (STSRow*)SL_GET_NODE_DATA(node);
|
||||
if (TD_ROW_KEY(rimem) > maxVer) {
|
||||
rimem = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -837,6 +850,7 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
|
|||
TSKEY r2 = TD_ROW_KEY(rimem);
|
||||
|
||||
if (r1 == r2) {
|
||||
#if 0
|
||||
if (update == TD_ROW_DISCARD_UPDATE) {
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
|
||||
tSkipListIterNext(pCheckInfo->iter);
|
||||
|
@ -846,6 +860,13 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
|
|||
} else {
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
|
||||
}
|
||||
#endif
|
||||
if (TD_SUPPORT_UPDATE(update)) {
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
|
||||
} else {
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
|
||||
tSkipListIterNext(pCheckInfo->iter);
|
||||
}
|
||||
return r1;
|
||||
} else if (r1 < r2 && ASCENDING_TRAVERSE(order)) {
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM;
|
||||
|
@ -856,12 +877,16 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
|
|||
}
|
||||
}
|
||||
|
||||
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow) {
|
||||
|
||||
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) {
|
||||
STSRow *rmem = NULL, *rimem = NULL;
|
||||
if (pCheckInfo->iter) {
|
||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||
if (node != NULL) {
|
||||
rmem = (STSRow*)SL_GET_NODE_DATA(node);
|
||||
if (TD_ROW_VER(rmem) > maxVer) {
|
||||
rmem = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -869,6 +894,9 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
|
|||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||
if (node != NULL) {
|
||||
rimem = (STSRow*)SL_GET_NODE_DATA(node);
|
||||
if (TD_ROW_VER(rimem) > maxVer) {
|
||||
rimem = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -890,6 +918,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
|
|||
TSKEY r2 = TD_ROW_KEY(rimem);
|
||||
|
||||
if (r1 == r2) {
|
||||
#if 0
|
||||
if (update == TD_ROW_DISCARD_UPDATE) {
|
||||
tSkipListIterNext(pCheckInfo->iter);
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
|
||||
|
@ -903,6 +932,16 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
|
|||
*extraRow = rimem;
|
||||
return rmem;
|
||||
}
|
||||
#endif
|
||||
if (TD_SUPPORT_UPDATE(update)) {
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH;
|
||||
*extraRow = rimem;
|
||||
return rmem;
|
||||
} else {
|
||||
tSkipListIterNext(pCheckInfo->iter);
|
||||
pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM;
|
||||
return rimem;
|
||||
}
|
||||
} else {
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
if (r1 < r2) {
|
||||
|
@ -973,7 +1012,7 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
|
|||
initTableMemIterator(pHandle, pCheckInfo);
|
||||
}
|
||||
|
||||
STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL);
|
||||
STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL, TD_VER_MAX);
|
||||
if (row == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1250,7 +1289,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
|||
/*bool hasData = */ initTableMemIterator(pTsdbReadHandle, pCheckInfo);
|
||||
assert(cur->pos >= 0 && cur->pos <= binfo.rows);
|
||||
|
||||
key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update);
|
||||
key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update, TD_VER_MAX);
|
||||
|
||||
if (key != TSKEY_INITIAL_VAL) {
|
||||
tsdbDebug("%p key in mem:%" PRId64 ", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr);
|
||||
|
@ -1497,10 +1536,10 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
|||
TASSERT(0);
|
||||
}
|
||||
|
||||
if (sVal.valType == TD_VTYPE_NULL) {
|
||||
colDataAppendNULL(pColInfo, rowIndex);
|
||||
} else {
|
||||
if (sVal.valType == TD_VTYPE_NORM) {
|
||||
colDataAppend(pColInfo, rowIndex, sVal.val, false);
|
||||
} else {
|
||||
colDataAppendNULL(pColInfo, rowIndex);
|
||||
}
|
||||
}
|
||||
} else { // handle the var-string
|
||||
|
@ -1513,10 +1552,10 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
|||
TASSERT(0);
|
||||
}
|
||||
|
||||
if (sVal.valType == TD_VTYPE_NULL) {
|
||||
colDataAppendNULL(pColInfo, rowIndex);
|
||||
} else {
|
||||
if (sVal.valType == TD_VTYPE_NORM) {
|
||||
colDataAppend(pColInfo, rowIndex, sVal.val, false);
|
||||
} else {
|
||||
colDataAppendNULL(pColInfo, rowIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1541,11 +1580,26 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
|
|||
return numOfRows + num;
|
||||
}
|
||||
|
||||
// TODO fix bug for reverse copy data problem
|
||||
// Note: row1 always has high priority
|
||||
static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1,
|
||||
STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
|
||||
bool forceSetNull) {
|
||||
/**
|
||||
* @brief // TODO fix bug for reverse copy data problem
|
||||
* Note: row1 always has high priority
|
||||
*
|
||||
* @param pTsdbReadHandle
|
||||
* @param capacity
|
||||
* @param curRow
|
||||
* @param row1
|
||||
* @param row2
|
||||
* @param numOfCols
|
||||
* @param uid
|
||||
* @param pSchema1
|
||||
* @param pSchema2
|
||||
* @param update
|
||||
* @param lastRowKey
|
||||
* @return int32_t The quantity of rows appended
|
||||
*/
|
||||
static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
|
||||
STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2,
|
||||
bool update, TSKEY* lastRowKey) {
|
||||
#if 1
|
||||
STSchema* pSchema;
|
||||
STSRow* row;
|
||||
|
@ -1557,12 +1611,16 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
bool isChosenRowDataRow;
|
||||
int32_t chosen_itr;
|
||||
SCellVal sVal = {0};
|
||||
TSKEY rowKey = TSKEY_INITIAL_VAL;
|
||||
int32_t nResult = 0;
|
||||
|
||||
// the schema version info is embeded in STSRow
|
||||
int32_t numOfColsOfRow1 = 0;
|
||||
|
||||
if (pSchema1 == NULL) {
|
||||
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
|
||||
// pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
|
||||
// TODO: use the real schemaVersion
|
||||
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0);
|
||||
}
|
||||
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
|
@ -1579,7 +1637,9 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
if (row2) {
|
||||
isRow2DataRow = TD_IS_TP_ROW(row2);
|
||||
if (pSchema2 == NULL) {
|
||||
pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
|
||||
// pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
|
||||
// TODO: use the real schemaVersion
|
||||
pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0);
|
||||
}
|
||||
if (isRow2DataRow) {
|
||||
numOfColsOfRow2 = schemaNCols(pSchema2);
|
||||
|
@ -1610,19 +1670,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
colIdOfRow2 = tdKvRowColIdAt(row2, k);
|
||||
}
|
||||
|
||||
if (colIdOfRow1 == colIdOfRow2) {
|
||||
if (colIdOfRow1 < colIdOfRow2) { // the most probability
|
||||
if (colIdOfRow1 < pColInfo->info.colId) {
|
||||
j++;
|
||||
k++;
|
||||
++j;
|
||||
continue;
|
||||
}
|
||||
row = row1;
|
||||
pSchema = pSchema1;
|
||||
isChosenRowDataRow = isRow1DataRow;
|
||||
chosen_itr = j;
|
||||
} else if (colIdOfRow1 < colIdOfRow2) {
|
||||
} else if (colIdOfRow1 == colIdOfRow2) {
|
||||
if (colIdOfRow1 < pColInfo->info.colId) {
|
||||
j++;
|
||||
++j;
|
||||
++k;
|
||||
continue;
|
||||
}
|
||||
row = row1;
|
||||
|
@ -1631,7 +1691,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
chosen_itr = j;
|
||||
} else {
|
||||
if (colIdOfRow2 < pColInfo->info.colId) {
|
||||
k++;
|
||||
++k;
|
||||
continue;
|
||||
}
|
||||
row = row2;
|
||||
|
@ -1639,16 +1699,35 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
chosen_itr = k;
|
||||
isChosenRowDataRow = isRow2DataRow;
|
||||
}
|
||||
|
||||
if (isChosenRowDataRow) {
|
||||
colId = pSchema->columns[chosen_itr].colId;
|
||||
offset = pSchema->columns[chosen_itr].offset;
|
||||
// TODO: use STSRowIter
|
||||
tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal);
|
||||
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
rowKey = *(TSKEY*)sVal.val;
|
||||
if (rowKey != *lastRowKey) {
|
||||
if (*lastRowKey != TSKEY_INITIAL_VAL) {
|
||||
++(*curRow);
|
||||
}
|
||||
++nResult;
|
||||
}
|
||||
*lastRowKey = rowKey;
|
||||
}
|
||||
} else {
|
||||
// TODO: use STSRowIter
|
||||
if (chosen_itr == 0) {
|
||||
colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal);
|
||||
rowKey = *(TSKEY*)sVal.val;
|
||||
if (rowKey != *lastRowKey) {
|
||||
if (*lastRowKey != TSKEY_INITIAL_VAL) {
|
||||
++(*curRow);
|
||||
}
|
||||
++nResult;
|
||||
}
|
||||
*lastRowKey = rowKey;
|
||||
} else {
|
||||
SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1);
|
||||
colId = pColIdx->colId;
|
||||
|
@ -1657,35 +1736,46 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(rowKey != TSKEY_INITIAL_VAL);
|
||||
|
||||
if (colId == pColInfo->info.colId) {
|
||||
if (tdValTypeIsNorm(sVal.valType)) {
|
||||
colDataAppend(pColInfo, numOfRows, sVal.val, false);
|
||||
} else if (forceSetNull) {
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
colDataAppend(pColInfo, *curRow, sVal.val, false);
|
||||
} else if (tdValTypeIsNull(sVal.valType)) {
|
||||
colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
} else if (tdValTypeIsNone(sVal.valType)) {
|
||||
// TODO: Set null if nothing append for this row
|
||||
if (*lastRowKey != rowKey) {
|
||||
colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
i++;
|
||||
++i;
|
||||
|
||||
if (row == row1) {
|
||||
j++;
|
||||
++j;
|
||||
} else {
|
||||
k++;
|
||||
++k;
|
||||
}
|
||||
} else {
|
||||
if (forceSetNull) {
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
if (*lastRowKey != rowKey) {
|
||||
colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
}
|
||||
i++;
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
if (forceSetNull) {
|
||||
if (*lastRowKey != rowKey) {
|
||||
while (i < numOfCols) { // the remain columns are all null data
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
i++;
|
||||
colDataAppend(pColInfo, *curRow, NULL, true);
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
return nResult;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -1859,6 +1949,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
|
||||
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
||||
int32_t numOfRows = 0;
|
||||
int32_t curRow = 0;
|
||||
|
||||
int16_t rv1 = -1;
|
||||
int16_t rv2 = -1;
|
||||
|
@ -1874,9 +1965,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
return;
|
||||
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
|
||||
SSkipListNode* node = NULL;
|
||||
TSKEY lastRowKey = TSKEY_INITIAL_VAL;
|
||||
|
||||
do {
|
||||
STSRow* row2 = NULL;
|
||||
STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2);
|
||||
STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2, TD_VER_MAX);
|
||||
if (row1 == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -1905,9 +1998,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
rv2 = TD_ROW_SVER(row2);
|
||||
}
|
||||
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
|
||||
pCheckInfo->tableId, pSchema1, pSchema2, true);
|
||||
numOfRows += 1;
|
||||
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
|
||||
pCheckInfo->tableId, pSchema1, pSchema2, true, &lastRowKey);
|
||||
// numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
}
|
||||
|
@ -1918,6 +2011,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
|
||||
#if 0
|
||||
if (pCfg->update) {
|
||||
if (pCfg->update == TD_ROW_PARTIAL_UPDATE) {
|
||||
doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos);
|
||||
|
@ -1933,7 +2027,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
|
||||
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols,
|
||||
pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
|
||||
pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull, &lastRowKey);
|
||||
numOfRows += 1;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
|
@ -1943,6 +2037,35 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
cur->lastKey = key + step;
|
||||
cur->mixBlock = true;
|
||||
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
pos += step;
|
||||
} else {
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
}
|
||||
#endif
|
||||
if (TD_SUPPORT_UPDATE(pCfg->update)) {
|
||||
doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
|
||||
|
||||
if (rv1 != TD_ROW_SVER(row1)) {
|
||||
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
|
||||
rv1 = TD_ROW_SVER(row1);
|
||||
}
|
||||
if (row2 && rv2 != TD_ROW_SVER(row2)) {
|
||||
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
|
||||
rv2 = TD_ROW_SVER(row2);
|
||||
}
|
||||
|
||||
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
|
||||
pCheckInfo->tableId, pSchema1, pSchema2, true, &lastRowKey);
|
||||
// ++numOfRows;
|
||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||
cur->win.skey = key;
|
||||
}
|
||||
|
||||
cur->win.ekey = key;
|
||||
cur->lastKey = key + step;
|
||||
cur->mixBlock = true;
|
||||
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
pos += step;
|
||||
} else {
|
||||
|
@ -1958,11 +2081,18 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
|||
assert(end != -1);
|
||||
|
||||
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
|
||||
#if 0
|
||||
if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
} else {
|
||||
end -= step;
|
||||
}
|
||||
#endif
|
||||
if (!TD_SUPPORT_UPDATE(pCfg->update)) {
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
} else {
|
||||
end -= step;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t qstart = 0, qend = 0;
|
||||
|
@ -2572,6 +2702,7 @@ static UNUSED_FUNC void changeQueryHandleForInterpQuery(tsdbReaderT pHandle) {
|
|||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||
STsdbReadHandle* pTsdbReadHandle) {
|
||||
int numOfRows = 0;
|
||||
int curRows = 0;
|
||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
|
||||
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||
win->skey = TSKEY_INITIAL_VAL;
|
||||
|
@ -2579,9 +2710,11 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
int64_t st = taosGetTimestampUs();
|
||||
int16_t rv = -1;
|
||||
STSchema* pSchema = NULL;
|
||||
TSKEY lastRowKey = TSKEY_INITIAL_VAL;
|
||||
|
||||
|
||||
do {
|
||||
STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL);
|
||||
STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -2604,16 +2737,18 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
|
||||
rv = TD_ROW_SVER(row);
|
||||
}
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
|
||||
NULL, true);
|
||||
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
|
||||
NULL, true, &lastRowKey);
|
||||
|
||||
if (++numOfRows >= maxRowsToRead) {
|
||||
if (numOfRows >= maxRowsToRead) {
|
||||
moveToNextRowInMem(pCheckInfo);
|
||||
break;
|
||||
}
|
||||
|
||||
} while (moveToNextRowInMem(pCheckInfo));
|
||||
|
||||
taosMemoryFreeClear(pSchema); // free the STSChema
|
||||
|
||||
assert(numOfRows <= maxRowsToRead);
|
||||
|
||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||
|
@ -2731,6 +2866,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
|
|||
STSRow* pRow = NULL;
|
||||
TSKEY key = TSKEY_INITIAL_VAL;
|
||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
||||
TSKEY lastRowKey = TSKEY_INITIAL_VAL;
|
||||
int32_t curRow = 0;
|
||||
|
||||
if (++pTsdbReadHandle->activeIndex < numOfTables) {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
|
||||
|
@ -2738,8 +2875,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
|
|||
// if (ret != TSDB_CODE_SUCCESS) {
|
||||
// return false;
|
||||
// }
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId,
|
||||
NULL, NULL, true);
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId,
|
||||
NULL, NULL, true, &lastRowKey);
|
||||
taosMemoryFreeClear(pRow);
|
||||
|
||||
// update the last key value
|
||||
|
|
|
@ -266,11 +266,22 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
|||
if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
|
||||
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
|
||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
|
||||
update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0)
|
||||
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
|
||||
return -1;
|
||||
}
|
||||
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
tdResetDataCols(pReadh->pDCols[1]);
|
||||
pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode;
|
||||
if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL,
|
||||
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) {
|
||||
return -1;
|
||||
}
|
||||
memcpy(pReadh->pDCols[0], pReadh->pDCols[1], sizeof(SDataCols));
|
||||
ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
|
||||
}
|
||||
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
|
||||
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
|
||||
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);
|
||||
|
||||
|
@ -297,9 +308,20 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
|||
if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
|
||||
// TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
|
||||
if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
|
||||
update != TD_ROW_PARTIAL_UPDATE, UINT64_MAX) < 0)
|
||||
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
|
||||
return -1;
|
||||
}
|
||||
// if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
tdResetDataCols(pReadh->pDCols[1]);
|
||||
pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode;
|
||||
if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL,
|
||||
TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) {
|
||||
return -1;
|
||||
}
|
||||
memcpy(pReadh->pDCols[0], pReadh->pDCols[1], sizeof(SDataCols));
|
||||
ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
|
||||
}
|
||||
|
||||
if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) {
|
||||
for (int i = 0; i < numOfColsIds; ++i) {
|
||||
|
@ -312,7 +334,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
|
||||
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
|
||||
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
|
||||
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);
|
||||
|
||||
|
@ -623,15 +645,15 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
|||
}
|
||||
|
||||
if (dcol != 0) {
|
||||
ccol++;
|
||||
++ccol;
|
||||
}
|
||||
dcol++;
|
||||
++dcol;
|
||||
} else if (tcolId < pDataCol->colId) {
|
||||
ccol++;
|
||||
++ccol;
|
||||
} else {
|
||||
// Set current column as NULL and forward
|
||||
dataColReset(pDataCol);
|
||||
dcol++;
|
||||
++dcol;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1635,17 +1635,20 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
|
|||
SSmaCfg vCreateSmaReq = {0};
|
||||
if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tsdbWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
|
||||
tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
|
||||
return -1;
|
||||
}
|
||||
tsdbDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, REPO_ID(pTsdb), vCreateSmaReq.tSma.indexName,
|
||||
vCreateSmaReq.tSma.indexUid);
|
||||
|
||||
tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb),
|
||||
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
|
||||
|
||||
// record current timezone of server side
|
||||
vCreateSmaReq.tSma.timezoneInt = tsTimezone;
|
||||
|
||||
if (metaCreateTSma(REPO_META(pTsdb), &vCreateSmaReq) < 0) {
|
||||
// TODO: handle error
|
||||
tsdbWarn("vgId:%d tsma %s:%" PRIi64 " create failed for table %" PRIi64 " since %s", REPO_ID(pTsdb),
|
||||
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid, terrstr(terrno));
|
||||
tdDestroyTSma(&vCreateSmaReq.tSma);
|
||||
return -1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue