Merge branch '3.0' of github.com:taosdata/TDengine into szhou/cenc
This commit is contained in:
commit
75e8813386
|
@ -2,7 +2,7 @@
|
|||
# taos-tools
|
||||
ExternalProject_Add(taos-tools
|
||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||
GIT_TAG 3588b3d
|
||||
GIT_TAG e7270c9
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -168,7 +168,7 @@ Query OK, 8 row(s) in set (0.001154s)
|
|||
|
||||
## 删除数据节点
|
||||
|
||||
先停止要删除的数据节点的 taosd 进程,然后启动 CLI 程序 taos,执行:
|
||||
启动 CLI 程序 taos,执行:
|
||||
|
||||
```sql
|
||||
DROP DNODE "fqdn:port";
|
||||
|
|
|
@ -104,7 +104,7 @@ SELECT location, groupid, current FROM d1001 LIMIT 2;
|
|||
|
||||
### 结果去重
|
||||
|
||||
`DISINTCT` 关键字可以对结果集中的一列或多列进行去重,去除的列既可以是标签列也可以是数据列。
|
||||
`DISTINCT` 关键字可以对结果集中的一列或多列进行去重,去除的列既可以是标签列也可以是数据列。
|
||||
|
||||
对标签列去重:
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ password = "taosdata"
|
|||
|
||||
# 需要被监控的 taosAdapter
|
||||
[taosAdapter]
|
||||
address = ["127.0.0.1:6041","192.168.1.95:6041"]
|
||||
address = ["127.0.0.1:6041"]
|
||||
|
||||
[metrics]
|
||||
# 监控指标前缀
|
||||
|
@ -92,7 +92,7 @@ cluster = "production"
|
|||
database = "log"
|
||||
|
||||
# 指定需要监控的普通表
|
||||
tables = ["normal_table"]
|
||||
tables = []
|
||||
```
|
||||
|
||||
### 获取监控指标
|
||||
|
@ -141,4 +141,4 @@ taos_cluster_info_dnodes_total{cluster_id="5981392874047724755"} 1
|
|||
# HELP taos_cluster_info_first_ep
|
||||
# TYPE taos_cluster_info_first_ep gauge
|
||||
taos_cluster_info_first_ep{cluster_id="5981392874047724755",value="hlb:6030"} 1
|
||||
```
|
||||
```
|
||||
|
|
|
@ -36,8 +36,13 @@ typedef struct STSRow2 STSRow2;
|
|||
typedef struct STSRowBuilder STSRowBuilder;
|
||||
typedef struct STagVal STagVal;
|
||||
typedef struct STag STag;
|
||||
typedef struct SColData SColData;
|
||||
|
||||
// bitmap
|
||||
#define HAS_NONE ((uint8_t)0x1)
|
||||
#define HAS_NULL ((uint8_t)0x2)
|
||||
#define HAS_VALUE ((uint8_t)0x4)
|
||||
|
||||
// bitmap ================================
|
||||
const static uint8_t BIT2_MAP[4][4] = {{0b00000000, 0b00000001, 0b00000010, 0},
|
||||
{0b00000000, 0b00000100, 0b00001000, 2},
|
||||
{0b00000000, 0b00010000, 0b00100000, 4},
|
||||
|
@ -51,21 +56,21 @@ const static uint8_t BIT2_MAP[4][4] = {{0b00000000, 0b00000001, 0b00000010, 0},
|
|||
#define SET_BIT2(p, i, v) ((p)[(i) >> 2] = (p)[(i) >> 2] & N1(BIT2_MAP[(i)&3][3]) | BIT2_MAP[(i)&3][(v)])
|
||||
#define GET_BIT2(p, i) (((p)[(i) >> 2] >> BIT2_MAP[(i)&3][3]) & ((uint8_t)3))
|
||||
|
||||
// STSchema
|
||||
// STSchema ================================
|
||||
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
|
||||
void tTSchemaDestroy(STSchema *pTSchema);
|
||||
|
||||
// SValue
|
||||
// SValue ================================
|
||||
int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type);
|
||||
int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type);
|
||||
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
|
||||
|
||||
// SColVal
|
||||
// SColVal ================================
|
||||
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
|
||||
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
|
||||
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
|
||||
|
||||
// STSRow2
|
||||
// STSRow2 ================================
|
||||
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL)
|
||||
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL)
|
||||
|
||||
|
@ -77,7 +82,7 @@ int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
|
|||
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
|
||||
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow);
|
||||
|
||||
// STSRowBuilder
|
||||
// STSRowBuilder ================================
|
||||
#define tsRowBuilderInit() ((STSRowBuilder){0})
|
||||
#define tsRowBuilderClear(B) \
|
||||
do { \
|
||||
|
@ -86,7 +91,7 @@ int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow);
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
// STag
|
||||
// STag ================================
|
||||
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
|
||||
void tTagFree(STag *pTag);
|
||||
bool tTagIsJson(const void *pTag);
|
||||
|
@ -100,7 +105,16 @@ void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid);
|
|||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove
|
||||
int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf);
|
||||
|
||||
// STRUCT =================
|
||||
// SColData ================================
|
||||
void tColDataDestroy(void *ph);
|
||||
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
|
||||
void tColDataClear(SColData *pColData);
|
||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
||||
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
|
||||
uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal);
|
||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
||||
|
||||
// STRUCT ================================
|
||||
struct STColumn {
|
||||
col_id_t colId;
|
||||
int8_t type;
|
||||
|
@ -166,6 +180,18 @@ struct SColVal {
|
|||
SValue value;
|
||||
};
|
||||
|
||||
struct SColData {
|
||||
int16_t cid;
|
||||
int8_t type;
|
||||
int8_t smaOn;
|
||||
int32_t nVal;
|
||||
uint8_t flag;
|
||||
uint8_t *pBitMap;
|
||||
int32_t *aOffset;
|
||||
int32_t nData;
|
||||
uint8_t *pData;
|
||||
};
|
||||
|
||||
#pragma pack(push, 1)
|
||||
struct STagVal {
|
||||
// char colName[TSDB_COL_NAME_LEN]; // only used for tmq_get_meta
|
||||
|
|
|
@ -151,6 +151,8 @@ typedef struct SVnodeModifyLogicNode {
|
|||
SArray* pDataBlocks;
|
||||
SVgDataBlocks* pVgDataBlocks;
|
||||
SNode* pAffectedRows; // SColumnNode
|
||||
SNode* pStartTs; // SColumnNode
|
||||
SNode* pEndTs; // SColumnNode
|
||||
uint64_t tableId;
|
||||
uint64_t stableId;
|
||||
int8_t tableType; // table type
|
||||
|
@ -525,6 +527,8 @@ typedef struct SDataDeleterNode {
|
|||
char tsColName[TSDB_COL_NAME_LEN];
|
||||
STimeWindow deleteTimeRange;
|
||||
SNode* pAffectedRows;
|
||||
SNode* pStartTs;
|
||||
SNode* pEndTs;
|
||||
} SDataDeleterNode;
|
||||
|
||||
typedef struct SSubplan {
|
||||
|
|
|
@ -315,6 +315,8 @@ typedef struct SDeleteStmt {
|
|||
SNode* pFromTable; // FROM clause
|
||||
SNode* pWhere; // WHERE clause
|
||||
SNode* pCountFunc; // count the number of rows affected
|
||||
SNode* pFirstFunc; // the start timestamp when the data was actually deleted
|
||||
SNode* pLastFunc; // the end timestamp when the data was actually deleted
|
||||
SNode* pTagCond; // pWhere divided into pTagCond and timeRange
|
||||
STimeWindow timeRange;
|
||||
uint8_t precision;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tdataformat.h"
|
||||
#include "tRealloc.h"
|
||||
#include "tcoding.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tlog.h"
|
||||
|
@ -680,7 +681,7 @@ int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow) {
|
|||
return n;
|
||||
}
|
||||
|
||||
// STSchema
|
||||
// STSchema ========================================
|
||||
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t ncols, STSchema **ppTSchema) {
|
||||
*ppTSchema = (STSchema *)taosMemoryMalloc(sizeof(STSchema) + sizeof(STColumn) * ncols);
|
||||
if (*ppTSchema == NULL) {
|
||||
|
@ -720,9 +721,7 @@ void tTSchemaDestroy(STSchema *pTSchema) {
|
|||
if (pTSchema) taosMemoryFree(pTSchema);
|
||||
}
|
||||
|
||||
// STSRowBuilder
|
||||
|
||||
// STag
|
||||
// STag ========================================
|
||||
static int tTagValCmprFn(const void *p1, const void *p2) {
|
||||
if (((STagVal *)p1)->cid < ((STagVal *)p2)->cid) {
|
||||
return -1;
|
||||
|
@ -1172,4 +1171,495 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
|
|||
return pSchema;
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
||||
// SColData ========================================
|
||||
void tColDataDestroy(void *ph) {
|
||||
SColData *pColData = (SColData *)ph;
|
||||
|
||||
tFree(pColData->pBitMap);
|
||||
tFree((uint8_t *)pColData->aOffset);
|
||||
tFree(pColData->pData);
|
||||
}
|
||||
|
||||
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) {
|
||||
pColData->cid = cid;
|
||||
pColData->type = type;
|
||||
pColData->smaOn = smaOn;
|
||||
tColDataClear(pColData);
|
||||
}
|
||||
|
||||
void tColDataClear(SColData *pColData) {
|
||||
pColData->nVal = 0;
|
||||
pColData->flag = 0;
|
||||
pColData->nData = 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, SColVal *pColVal) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
code = tRealloc((uint8_t **)(&pColData->aOffset), sizeof(int32_t) * (pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
pColData->aOffset[pColData->nVal] = pColData->nData;
|
||||
|
||||
if (pColVal->value.nData) {
|
||||
code = tRealloc(&pColData->pData, pColData->nData + pColVal->value.nData);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColData->pData + pColData->nData, pColVal->value.pData, pColVal->value.nData);
|
||||
pColData->nData += pColVal->value.nData;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pColData->nData == tDataTypes[pColData->type].bytes * pColData->nVal);
|
||||
code = tRealloc(&pColData->pData, pColData->nData + tDataTypes[pColData->type].bytes);
|
||||
if (code) goto _exit;
|
||||
pColData->nData += tPutValue(pColData->pData + pColData->nData, &pColVal->value, pColVal->type);
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue0(SColData *pColData, SColVal *pColVal) { // 0
|
||||
int32_t code = 0;
|
||||
|
||||
if (pColVal->isNone) {
|
||||
pColData->flag = HAS_NONE;
|
||||
} else if (pColVal->isNull) {
|
||||
pColData->flag = HAS_NULL;
|
||||
} else {
|
||||
pColData->flag = HAS_VALUE;
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue1(SColData *pColData, SColVal *pColVal) { // HAS_NONE
|
||||
int32_t code = 0;
|
||||
|
||||
if (!pColVal->isNone) {
|
||||
int32_t nBit = BIT1_SIZE(pColData->nVal + 1);
|
||||
|
||||
code = tRealloc(&pColData->pBitMap, nBit);
|
||||
if (code) goto _exit;
|
||||
|
||||
memset(pColData->pBitMap, 0, nBit);
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 1);
|
||||
|
||||
if (pColVal->isNull) {
|
||||
pColData->flag |= HAS_NULL;
|
||||
} else {
|
||||
pColData->flag |= HAS_VALUE;
|
||||
|
||||
if (pColData->nVal) {
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
int32_t nOffset = sizeof(int32_t) * pColData->nVal;
|
||||
code = tRealloc((uint8_t **)(&pColData->aOffset), nOffset);
|
||||
if (code) goto _exit;
|
||||
memset(pColData->aOffset, 0, nOffset);
|
||||
} else {
|
||||
pColData->nData = tDataTypes[pColData->type].bytes * pColData->nVal;
|
||||
code = tRealloc(&pColData->pData, pColData->nData);
|
||||
if (code) goto _exit;
|
||||
memset(pColData->pData, 0, pColData->nData);
|
||||
}
|
||||
}
|
||||
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
}
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue2(SColData *pColData, SColVal *pColVal) { // HAS_NULL
|
||||
int32_t code = 0;
|
||||
|
||||
if (!pColVal->isNull) {
|
||||
int32_t nBit = BIT1_SIZE(pColData->nVal + 1);
|
||||
code = tRealloc(&pColData->pBitMap, nBit);
|
||||
if (code) goto _exit;
|
||||
|
||||
if (pColVal->isNone) {
|
||||
pColData->flag |= HAS_NONE;
|
||||
|
||||
memset(pColData->pBitMap, 255, nBit);
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 0);
|
||||
} else {
|
||||
pColData->flag |= HAS_VALUE;
|
||||
|
||||
memset(pColData->pBitMap, 0, nBit);
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 1);
|
||||
|
||||
if (pColData->nVal) {
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
int32_t nOffset = sizeof(int32_t) * pColData->nVal;
|
||||
code = tRealloc((uint8_t **)(&pColData->aOffset), nOffset);
|
||||
if (code) goto _exit;
|
||||
memset(pColData->aOffset, 0, nOffset);
|
||||
} else {
|
||||
pColData->nData = tDataTypes[pColData->type].bytes * pColData->nVal;
|
||||
code = tRealloc(&pColData->pData, pColData->nData);
|
||||
if (code) goto _exit;
|
||||
memset(pColData->pData, 0, pColData->nData);
|
||||
}
|
||||
}
|
||||
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
}
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue3(SColData *pColData, SColVal *pColVal) { // HAS_NULL|HAS_NONE
|
||||
int32_t code = 0;
|
||||
|
||||
if (pColVal->isNone) {
|
||||
code = tRealloc(&pColData->pBitMap, BIT1_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 0);
|
||||
} else if (pColVal->isNull) {
|
||||
code = tRealloc(&pColData->pBitMap, BIT1_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 1);
|
||||
} else {
|
||||
pColData->flag |= HAS_VALUE;
|
||||
|
||||
uint8_t *pBitMap = NULL;
|
||||
code = tRealloc(&pBitMap, BIT2_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
|
||||
SET_BIT2(pBitMap, iVal, GET_BIT1(pColData->pBitMap, iVal));
|
||||
}
|
||||
SET_BIT2(pBitMap, pColData->nVal, 2);
|
||||
|
||||
tFree(pColData->pBitMap);
|
||||
pColData->pBitMap = pBitMap;
|
||||
|
||||
if (pColData->nVal) {
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
int32_t nOffset = sizeof(int32_t) * pColData->nVal;
|
||||
code = tRealloc((uint8_t **)(&pColData->aOffset), nOffset);
|
||||
if (code) goto _exit;
|
||||
memset(pColData->aOffset, 0, nOffset);
|
||||
} else {
|
||||
pColData->nData = tDataTypes[pColData->type].bytes * pColData->nVal;
|
||||
code = tRealloc(&pColData->pData, pColData->nData);
|
||||
if (code) goto _exit;
|
||||
memset(pColData->pData, 0, pColData->nData);
|
||||
}
|
||||
}
|
||||
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue4(SColData *pColData, SColVal *pColVal) { // HAS_VALUE
|
||||
int32_t code = 0;
|
||||
|
||||
if (pColVal->isNone || pColVal->isNull) {
|
||||
if (pColVal->isNone) {
|
||||
pColData->flag |= HAS_NONE;
|
||||
} else {
|
||||
pColData->flag |= HAS_NULL;
|
||||
}
|
||||
|
||||
int32_t nBit = BIT1_SIZE(pColData->nVal + 1);
|
||||
code = tRealloc(&pColData->pBitMap, nBit);
|
||||
if (code) goto _exit;
|
||||
|
||||
memset(pColData->pBitMap, 255, nBit);
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 0);
|
||||
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
} else {
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue5(SColData *pColData, SColVal *pColVal) { // HAS_VALUE|HAS_NONE
|
||||
int32_t code = 0;
|
||||
|
||||
if (pColVal->isNull) {
|
||||
pColData->flag |= HAS_NULL;
|
||||
|
||||
uint8_t *pBitMap = NULL;
|
||||
code = tRealloc(&pBitMap, BIT2_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
|
||||
SET_BIT2(pBitMap, iVal, GET_BIT1(pColData->pBitMap, iVal) ? 2 : 0);
|
||||
}
|
||||
SET_BIT2(pBitMap, pColData->nVal, 1);
|
||||
|
||||
tFree(pColData->pBitMap);
|
||||
pColData->pBitMap = pBitMap;
|
||||
} else {
|
||||
code = tRealloc(&pColData->pBitMap, BIT1_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
if (pColVal->isNone) {
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 0);
|
||||
} else {
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 1);
|
||||
}
|
||||
}
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue6(SColData *pColData, SColVal *pColVal) { // HAS_VALUE|HAS_NULL
|
||||
int32_t code = 0;
|
||||
|
||||
if (pColVal->isNone) {
|
||||
pColData->flag |= HAS_NONE;
|
||||
|
||||
uint8_t *pBitMap = NULL;
|
||||
code = tRealloc(&pBitMap, BIT2_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
|
||||
SET_BIT2(pBitMap, iVal, GET_BIT1(pColData->pBitMap, iVal) ? 2 : 1);
|
||||
}
|
||||
SET_BIT2(pBitMap, pColData->nVal, 0);
|
||||
|
||||
tFree(pColData->pBitMap);
|
||||
pColData->pBitMap = pBitMap;
|
||||
} else {
|
||||
code = tRealloc(&pColData->pBitMap, BIT1_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
if (pColVal->isNull) {
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 0);
|
||||
} else {
|
||||
SET_BIT1(pColData->pBitMap, pColData->nVal, 1);
|
||||
}
|
||||
}
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static FORCE_INLINE int32_t tColDataAppendValue7(SColData *pColData,
|
||||
SColVal *pColVal) { // HAS_VALUE|HAS_NULL|HAS_NONE
|
||||
int32_t code = 0;
|
||||
|
||||
code = tRealloc(&pColData->pBitMap, BIT2_SIZE(pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
|
||||
if (pColVal->isNone) {
|
||||
SET_BIT2(pColData->pBitMap, pColData->nVal, 0);
|
||||
} else if (pColVal->isNull) {
|
||||
SET_BIT2(pColData->pBitMap, pColData->nVal, 1);
|
||||
} else {
|
||||
SET_BIT2(pColData->pBitMap, pColData->nVal, 2);
|
||||
}
|
||||
code = tColDataPutValue(pColData, pColVal);
|
||||
if (code) goto _exit;
|
||||
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
static int32_t (*tColDataAppendValueImpl[])(SColData *pColData, SColVal *pColVal) = {
|
||||
tColDataAppendValue0, // 0
|
||||
tColDataAppendValue1, // HAS_NONE
|
||||
tColDataAppendValue2, // HAS_NULL
|
||||
tColDataAppendValue3, // HAS_NULL|HAS_NONE
|
||||
tColDataAppendValue4, // HAS_VALUE
|
||||
tColDataAppendValue5, // HAS_VALUE|HAS_NONE
|
||||
tColDataAppendValue6, // HAS_VALUE|HAS_NULL
|
||||
tColDataAppendValue7 // HAS_VALUE|HAS_NULL|HAS_NONE
|
||||
};
|
||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
|
||||
ASSERT(pColData->cid == pColVal->cid && pColData->type == pColVal->type);
|
||||
return tColDataAppendValueImpl[pColData->flag](pColData, pColVal);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tColDataGetValue1(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_NONE
|
||||
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue2(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_NULL
|
||||
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue3(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_NULL|HAS_NONE
|
||||
switch (GET_BIT1(pColData->pBitMap, iVal)) {
|
||||
case 0:
|
||||
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||
break;
|
||||
case 1:
|
||||
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue4(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_VALUE
|
||||
SValue value;
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
if (iVal + 1 < pColData->nVal) {
|
||||
value.nData = pColData->aOffset[iVal + 1] - pColData->aOffset[iVal];
|
||||
} else {
|
||||
value.nData = pColData->nData - pColData->aOffset[iVal];
|
||||
}
|
||||
value.pData = pColData->pData + pColData->aOffset[iVal];
|
||||
} else {
|
||||
tGetValue(pColData->pData + tDataTypes[pColData->type].bytes * iVal, &value, pColData->type);
|
||||
}
|
||||
*pColVal = COL_VAL_VALUE(pColData->cid, pColData->type, value);
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue5(SColData *pColData, int32_t iVal,
|
||||
SColVal *pColVal) { // HAS_VALUE|HAS_NONE
|
||||
switch (GET_BIT1(pColData->pBitMap, iVal)) {
|
||||
case 0:
|
||||
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||
break;
|
||||
case 1:
|
||||
tColDataGetValue4(pColData, iVal, pColVal);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue6(SColData *pColData, int32_t iVal,
|
||||
SColVal *pColVal) { // HAS_VALUE|HAS_NULL
|
||||
switch (GET_BIT1(pColData->pBitMap, iVal)) {
|
||||
case 0:
|
||||
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
|
||||
break;
|
||||
case 1:
|
||||
tColDataGetValue4(pColData, iVal, pColVal);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE void tColDataGetValue7(SColData *pColData, int32_t iVal,
|
||||
SColVal *pColVal) { // HAS_VALUE|HAS_NULL|HAS_NONE
|
||||
switch (GET_BIT2(pColData->pBitMap, iVal)) {
|
||||
case 0:
|
||||
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||
break;
|
||||
case 1:
|
||||
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
|
||||
break;
|
||||
case 2:
|
||||
tColDataGetValue4(pColData, iVal, pColVal);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal *pColVal) = {
|
||||
NULL, // 0
|
||||
tColDataGetValue1, // HAS_NONE
|
||||
tColDataGetValue2, // HAS_NULL
|
||||
tColDataGetValue3, // HAS_NULL | HAS_NONE
|
||||
tColDataGetValue4, // HAS_VALUE
|
||||
tColDataGetValue5, // HAS_VALUE | HAS_NONE
|
||||
tColDataGetValue6, // HAS_VALUE | HAS_NULL
|
||||
tColDataGetValue7 // HAS_VALUE | HAS_NULL | HAS_NONE
|
||||
};
|
||||
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
|
||||
ASSERT(iVal >= 0 && iVal < pColData->nVal && pColData->flag);
|
||||
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
|
||||
}
|
||||
|
||||
uint8_t tColDataGetBitValue(SColData *pColData, int32_t iVal) {
|
||||
uint8_t v;
|
||||
switch (pColData->flag) {
|
||||
case HAS_NONE:
|
||||
v = 0;
|
||||
break;
|
||||
case HAS_NULL:
|
||||
v = 1;
|
||||
break;
|
||||
case (HAS_NULL | HAS_NONE):
|
||||
v = GET_BIT1(pColData->pBitMap, iVal);
|
||||
break;
|
||||
case HAS_VALUE:
|
||||
v = 2;
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NONE):
|
||||
v = GET_BIT1(pColData->pBitMap, iVal);
|
||||
if (v) v = 2;
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NULL):
|
||||
v = GET_BIT1(pColData->pBitMap, iVal) + 1;
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NULL | HAS_NONE):
|
||||
v = GET_BIT2(pColData->pBitMap, iVal);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
|
||||
int32_t code = 0;
|
||||
int32_t size;
|
||||
|
||||
ASSERT(pColDataSrc->nVal > 0);
|
||||
ASSERT(pColDataDest->cid = pColDataSrc->cid);
|
||||
ASSERT(pColDataDest->type = pColDataSrc->type);
|
||||
|
||||
pColDataDest->smaOn = pColDataSrc->smaOn;
|
||||
pColDataDest->nVal = pColDataSrc->nVal;
|
||||
pColDataDest->flag = pColDataSrc->flag;
|
||||
|
||||
// bitmap
|
||||
if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) {
|
||||
size = BIT2_SIZE(pColDataSrc->nVal);
|
||||
code = tRealloc(&pColDataDest->pBitMap, size);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, size);
|
||||
}
|
||||
|
||||
// offset
|
||||
if (IS_VAR_DATA_TYPE(pColDataDest->type)) {
|
||||
size = sizeof(int32_t) * pColDataSrc->nVal;
|
||||
|
||||
code = tRealloc((uint8_t **)&pColDataDest->aOffset, size);
|
||||
if (code) goto _exit;
|
||||
|
||||
memcpy(pColDataDest->aOffset, pColDataSrc->aOffset, size);
|
||||
}
|
||||
|
||||
// value
|
||||
pColDataDest->nData = pColDataSrc->nData;
|
||||
code = tRealloc(&pColDataDest->pData, pColDataSrc->nData);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataDest->nData);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
|
@ -44,7 +44,6 @@ typedef struct SMapData SMapData;
|
|||
typedef struct SBlockIdx SBlockIdx;
|
||||
typedef struct SDataBlk SDataBlk;
|
||||
typedef struct SSttBlk SSttBlk;
|
||||
typedef struct SColData SColData;
|
||||
typedef struct SDiskDataHdr SDiskDataHdr;
|
||||
typedef struct SBlockData SBlockData;
|
||||
typedef struct SDelFile SDelFile;
|
||||
|
@ -71,10 +70,6 @@ typedef struct SLDataIter SLDataIter;
|
|||
#define TSDB_MAX_SUBBLOCKS 8
|
||||
#define TSDB_FHDR_SIZE 512
|
||||
|
||||
#define HAS_NONE ((int8_t)0x1)
|
||||
#define HAS_NULL ((int8_t)0x2)
|
||||
#define HAS_VALUE ((int8_t)0x4)
|
||||
|
||||
#define VERSION_MIN 0
|
||||
#define VERSION_MAX INT64_MAX
|
||||
|
||||
|
@ -148,15 +143,6 @@ int32_t tPutBlockIdx(uint8_t *p, void *ph);
|
|||
int32_t tGetBlockIdx(uint8_t *p, void *ph);
|
||||
int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
|
||||
int32_t tCmprBlockL(void const *lhs, void const *rhs);
|
||||
// SColdata
|
||||
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
|
||||
void tColDataReset(SColData *pColData);
|
||||
void tColDataClear(void *ph);
|
||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
||||
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
|
||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
||||
int32_t tPutColData(uint8_t *p, SColData *pColData);
|
||||
int32_t tGetColData(uint8_t *p, SColData *pColData);
|
||||
// SBlockData
|
||||
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
|
||||
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
|
||||
|
@ -470,18 +456,6 @@ struct SSttBlk {
|
|||
SBlockInfo bInfo;
|
||||
};
|
||||
|
||||
struct SColData {
|
||||
int16_t cid;
|
||||
int8_t type;
|
||||
int8_t smaOn;
|
||||
int32_t nVal;
|
||||
uint8_t flag;
|
||||
uint8_t *pBitMap;
|
||||
int32_t *aOffset;
|
||||
int32_t nData;
|
||||
uint8_t *pData;
|
||||
};
|
||||
|
||||
// (SBlockData){.suid = 0, .uid = 0}: block data not initialized
|
||||
// (SBlockData){.suid = suid, .uid = uid}: block data for ONE child table int .data file
|
||||
// (SBlockData){.suid = suid, .uid = 0}: block data for N child tables int .last file
|
||||
|
|
|
@ -832,7 +832,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|||
tDecoderClear(pCoder);
|
||||
|
||||
int32_t sz = taosArrayGetSize(pRes->uidList);
|
||||
if (sz == 0) {
|
||||
if (sz == 0 || pRes->affectedRows == 0) {
|
||||
taosArrayDestroy(pRes->uidList);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -903,7 +903,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
// null value exists, check one-by-one
|
||||
if (pData->flag != HAS_VALUE) {
|
||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < remain; j += step, rowIndex++) {
|
||||
uint8_t v = GET_BIT2(pData->pBitMap, j);
|
||||
uint8_t v = tColDataGetBitValue(pData, j);
|
||||
if (v == 0 || v == 1) {
|
||||
colDataSetNull_f(pColData->nullbitmap, rowIndex);
|
||||
}
|
||||
|
@ -1362,15 +1362,10 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
|
||||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
|
||||
|
||||
// todo here we need to each key in the last files to identify if it is really overlapped with last block
|
||||
// todo
|
||||
bool overlapWithlastBlock = false;
|
||||
#if 0
|
||||
if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
|
||||
SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
|
||||
overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
|
||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
|
||||
}
|
||||
#endif
|
||||
|
||||
pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
|
||||
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
|
||||
|
@ -1896,151 +1891,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||
SRowMerger merge = {0};
|
||||
STSRow* pTSRow = NULL;
|
||||
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||
|
||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||
ASSERT(pRow != NULL && piRow != NULL);
|
||||
|
||||
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
bool freeTSRow = false;
|
||||
|
||||
uint64_t uid = pBlockScanInfo->uid;
|
||||
|
||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||
// [1&2] key <= [k.ts && ik.ts]
|
||||
if (key <= k.ts && key <= ik.ts) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
|
||||
if (ik.ts == key) {
|
||||
tRowMerge(&merge, piRow);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
if (k.ts == key) {
|
||||
tRowMerge(&merge, pRow);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // key > ik.ts || key > k.ts
|
||||
ASSERT(key != ik.ts);
|
||||
|
||||
// [3] ik.ts < key <= k.ts
|
||||
// [4] ik.ts < k.ts <= key
|
||||
if (ik.ts < k.ts) {
|
||||
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
if (freeTSRow) {
|
||||
taosMemoryFree(pTSRow);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// [5] k.ts < key <= ik.ts
|
||||
// [6] k.ts < ik.ts <= key
|
||||
if (k.ts < ik.ts) {
|
||||
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
if (freeTSRow) {
|
||||
taosMemoryFree(pTSRow);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// [7] k.ts == ik.ts < key
|
||||
if (k.ts == ik.ts) {
|
||||
ASSERT(key > ik.ts && key > k.ts);
|
||||
|
||||
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
taosMemoryFree(pTSRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
} else { // descending order scan
|
||||
// [1/2] k.ts >= ik.ts && k.ts >= key
|
||||
if (k.ts >= ik.ts && k.ts >= key) {
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
|
||||
tRowMergerInit(&merge, pRow, pSchema);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
|
||||
if (ik.ts == k.ts) {
|
||||
tRowMerge(&merge, piRow);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
if (k.ts == key) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMerge(&merge, &fRow);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
|
||||
|
||||
// [3] ik.ts > k.ts >= Key
|
||||
// [4] ik.ts > key >= k.ts
|
||||
if (ik.ts > key) {
|
||||
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
if (freeTSRow) {
|
||||
taosMemoryFree(pTSRow);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// [5] key > ik.ts > k.ts
|
||||
// [6] key > k.ts > ik.ts
|
||||
if (key > ik.ts) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
taosMemoryFree(pTSRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//[7] key = ik.ts > k.ts
|
||||
if (key == ik.ts) {
|
||||
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMerge(&merge, &fRow);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||
if (pBlockScanInfo->iterInit) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2257,9 +2107,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
||||
|
||||
|
||||
while (1) {
|
||||
// todo check the validate of row in file block
|
||||
bool hasBlockData = false;
|
||||
{
|
||||
while (pBlockData->nRow > 0) { // find the first qualified row in data block
|
||||
|
@ -2620,7 +2468,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
code = buildComposedDataBlock(pReader);
|
||||
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
|
||||
// data in memory that are earlier than current file block
|
||||
// todo rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||
// rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||
} else {
|
||||
|
@ -4078,4 +3926,4 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
|
|||
}
|
||||
|
||||
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -909,248 +909,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
|||
return code;
|
||||
}
|
||||
|
||||
// SColData ========================================
|
||||
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) {
|
||||
pColData->cid = cid;
|
||||
pColData->type = type;
|
||||
pColData->smaOn = smaOn;
|
||||
tColDataReset(pColData);
|
||||
}
|
||||
|
||||
void tColDataReset(SColData *pColData) {
|
||||
pColData->nVal = 0;
|
||||
pColData->flag = 0;
|
||||
pColData->nData = 0;
|
||||
}
|
||||
|
||||
void tColDataClear(void *ph) {
|
||||
SColData *pColData = (SColData *)ph;
|
||||
|
||||
tFree(pColData->pBitMap);
|
||||
tFree((uint8_t *)pColData->aOffset);
|
||||
tFree(pColData->pData);
|
||||
}
|
||||
|
||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
SValue value = {0};
|
||||
SValue *pValue = &value;
|
||||
|
||||
ASSERT(pColVal->cid == pColData->cid);
|
||||
ASSERT(pColVal->type == pColData->type);
|
||||
|
||||
// realloc bitmap
|
||||
size = BIT2_SIZE(pColData->nVal + 1);
|
||||
code = tRealloc(&pColData->pBitMap, size);
|
||||
if (code) goto _exit;
|
||||
if ((pColData->nVal & 3) == 0) {
|
||||
pColData->pBitMap[pColData->nVal >> 2] = 0;
|
||||
}
|
||||
|
||||
// put value
|
||||
if (pColVal->isNone) {
|
||||
pColData->flag |= HAS_NONE;
|
||||
SET_BIT2(pColData->pBitMap, pColData->nVal, 0);
|
||||
} else if (pColVal->isNull) {
|
||||
pColData->flag |= HAS_NULL;
|
||||
SET_BIT2(pColData->pBitMap, pColData->nVal, 1);
|
||||
} else {
|
||||
pColData->flag |= HAS_VALUE;
|
||||
SET_BIT2(pColData->pBitMap, pColData->nVal, 2);
|
||||
pValue = &pColVal->value;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
// offset
|
||||
code = tRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * (pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
pColData->aOffset[pColData->nVal] = pColData->nData;
|
||||
|
||||
// value
|
||||
if ((!pColVal->isNone) && (!pColVal->isNull)) {
|
||||
code = tRealloc(&pColData->pData, pColData->nData + pColVal->value.nData);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColData->pData + pColData->nData, pColVal->value.pData, pColVal->value.nData);
|
||||
pColData->nData += pColVal->value.nData;
|
||||
}
|
||||
} else {
|
||||
code = tRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, pValue, pColVal->type));
|
||||
if (code) goto _exit;
|
||||
pColData->nData += tPutValue(pColData->pData + pColData->nData, pValue, pColVal->type);
|
||||
}
|
||||
|
||||
pColData->nVal++;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
|
||||
int32_t code = 0;
|
||||
int32_t size;
|
||||
|
||||
ASSERT(pColDataSrc->nVal > 0);
|
||||
ASSERT(pColDataDest->cid = pColDataSrc->cid);
|
||||
ASSERT(pColDataDest->type = pColDataSrc->type);
|
||||
|
||||
pColDataDest->smaOn = pColDataSrc->smaOn;
|
||||
pColDataDest->nVal = pColDataSrc->nVal;
|
||||
pColDataDest->flag = pColDataSrc->flag;
|
||||
|
||||
// bitmap
|
||||
if (pColDataSrc->flag != HAS_NONE && pColDataSrc->flag != HAS_NULL && pColDataSrc->flag != HAS_VALUE) {
|
||||
size = BIT2_SIZE(pColDataSrc->nVal);
|
||||
code = tRealloc(&pColDataDest->pBitMap, size);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, size);
|
||||
}
|
||||
|
||||
// offset
|
||||
if (IS_VAR_DATA_TYPE(pColDataDest->type)) {
|
||||
size = sizeof(int32_t) * pColDataSrc->nVal;
|
||||
|
||||
code = tRealloc((uint8_t **)&pColDataDest->aOffset, size);
|
||||
if (code) goto _exit;
|
||||
|
||||
memcpy(pColDataDest->aOffset, pColDataSrc->aOffset, size);
|
||||
}
|
||||
|
||||
// value
|
||||
pColDataDest->nData = pColDataSrc->nData;
|
||||
code = tRealloc(&pColDataDest->pData, pColDataSrc->nData);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataDest->nData);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(iVal < pColData->nVal);
|
||||
ASSERT(pColData->flag);
|
||||
|
||||
if (pColData->flag == HAS_NONE) {
|
||||
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||
goto _exit;
|
||||
} else if (pColData->flag == HAS_NULL) {
|
||||
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
|
||||
goto _exit;
|
||||
} else if (pColData->flag != HAS_VALUE) {
|
||||
uint8_t v = GET_BIT2(pColData->pBitMap, iVal);
|
||||
if (v == 0) {
|
||||
*pColVal = COL_VAL_NONE(pColData->cid, pColData->type);
|
||||
goto _exit;
|
||||
} else if (v == 1) {
|
||||
*pColVal = COL_VAL_NULL(pColData->cid, pColData->type);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
// get value
|
||||
SValue value;
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
if (iVal + 1 < pColData->nVal) {
|
||||
value.nData = pColData->aOffset[iVal + 1] - pColData->aOffset[iVal];
|
||||
} else {
|
||||
value.nData = pColData->nData - pColData->aOffset[iVal];
|
||||
}
|
||||
|
||||
value.pData = pColData->pData + pColData->aOffset[iVal];
|
||||
} else {
|
||||
tGetValue(pColData->pData + tDataTypes[pColData->type].bytes * iVal, &value, pColData->type);
|
||||
}
|
||||
*pColVal = COL_VAL_VALUE(pColData->cid, pColData->type, value);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tPutColData(uint8_t *p, SColData *pColData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI16v(p ? p + n : p, pColData->cid);
|
||||
n += tPutI8(p ? p + n : p, pColData->type);
|
||||
n += tPutI8(p ? p + n : p, pColData->smaOn);
|
||||
n += tPutI32v(p ? p + n : p, pColData->nVal);
|
||||
n += tPutU8(p ? p + n : p, pColData->flag);
|
||||
|
||||
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
|
||||
if (pColData->flag != HAS_VALUE) {
|
||||
// bitmap
|
||||
|
||||
int32_t size = BIT2_SIZE(pColData->nVal);
|
||||
if (p) {
|
||||
memcpy(p + n, pColData->pBitMap, size);
|
||||
}
|
||||
n += size;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
// offset
|
||||
|
||||
int32_t size = sizeof(int32_t) * pColData->nVal;
|
||||
if (p) {
|
||||
memcpy(p + n, pColData->aOffset, size);
|
||||
}
|
||||
n += size;
|
||||
}
|
||||
n += tPutI32v(p ? p + n : p, pColData->nData);
|
||||
if (p) {
|
||||
memcpy(p + n, pColData->pData, pColData->nData);
|
||||
}
|
||||
n += pColData->nData;
|
||||
|
||||
_exit:
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetColData(uint8_t *p, SColData *pColData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetI16v(p + n, &pColData->cid);
|
||||
n += tGetI8(p + n, &pColData->type);
|
||||
n += tGetI8(p + n, &pColData->smaOn);
|
||||
n += tGetI32v(p + n, &pColData->nVal);
|
||||
n += tGetU8(p + n, &pColData->flag);
|
||||
|
||||
if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit;
|
||||
if (pColData->flag != HAS_VALUE) {
|
||||
// bitmap
|
||||
|
||||
int32_t size = BIT2_SIZE(pColData->nVal);
|
||||
pColData->pBitMap = p + n;
|
||||
n += size;
|
||||
}
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
// offset
|
||||
|
||||
int32_t size = sizeof(int32_t) * pColData->nVal;
|
||||
pColData->aOffset = (int32_t *)(p + n);
|
||||
n += size;
|
||||
}
|
||||
n += tGetI32v(p + n, &pColData->nData);
|
||||
pColData->pData = p + n;
|
||||
n += pColData->nData;
|
||||
|
||||
_exit:
|
||||
return n;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
|
||||
SColData *pColData1 = (SColData *)p1;
|
||||
SColData *pColData2 = (SColData *)p2;
|
||||
|
||||
if (pColData1->cid < pColData2->cid) {
|
||||
return -1;
|
||||
} else if (pColData1->cid > pColData2->cid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// SBlockData ======================================================
|
||||
int32_t tBlockDataCreate(SBlockData *pBlockData) {
|
||||
int32_t code = 0;
|
||||
|
@ -1182,7 +940,7 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) {
|
|||
tFree((uint8_t *)pBlockData->aVersion);
|
||||
tFree((uint8_t *)pBlockData->aTSKEY);
|
||||
taosArrayDestroy(pBlockData->aIdx);
|
||||
taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataClear : NULL);
|
||||
taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataDestroy : NULL);
|
||||
pBlockData->aUid = NULL;
|
||||
pBlockData->aVersion = NULL;
|
||||
pBlockData->aTSKEY = NULL;
|
||||
|
@ -1251,7 +1009,7 @@ void tBlockDataClear(SBlockData *pBlockData) {
|
|||
pBlockData->nRow = 0;
|
||||
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
|
||||
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
|
||||
tColDataReset(pColData);
|
||||
tColDataClear(pColData);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1501,7 +1259,7 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
|
|||
while (lidx <= ridx) {
|
||||
int32_t midx = (lidx + ridx) / 2;
|
||||
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, midx);
|
||||
int32_t c = tColDataCmprFn(pColData, &(SColData){.cid = cid});
|
||||
int32_t c = (pColData->cid == cid) ? 0 : ((pColData->cid > cid) ? 1 : -1);
|
||||
|
||||
if (c == 0) {
|
||||
*ppColData = pColData;
|
||||
|
@ -1986,47 +1744,16 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
|
|||
int32_t size = 0;
|
||||
// bitmap
|
||||
if (pColData->flag != HAS_VALUE) {
|
||||
uint8_t *pBitMap = pColData->pBitMap;
|
||||
int32_t szBitMap = BIT2_SIZE(pColData->nVal);
|
||||
|
||||
// BIT2 to BIT1
|
||||
if (pColData->flag != (HAS_VALUE | HAS_NULL | HAS_NONE)) {
|
||||
int32_t szBitMap;
|
||||
if (pColData->flag == (HAS_VALUE | HAS_NULL | HAS_NONE)) {
|
||||
szBitMap = BIT2_SIZE(pColData->nVal);
|
||||
} else {
|
||||
szBitMap = BIT1_SIZE(pColData->nVal);
|
||||
pBitMap = taosMemoryCalloc(1, szBitMap);
|
||||
if (pBitMap == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) {
|
||||
uint8_t v = GET_BIT2(pColData->pBitMap, iVal);
|
||||
switch (pColData->flag) {
|
||||
case (HAS_NULL | HAS_NONE):
|
||||
SET_BIT1(pBitMap, iVal, v);
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NONE):
|
||||
if (v) {
|
||||
SET_BIT1(pBitMap, iVal, 1);
|
||||
} else {
|
||||
SET_BIT1(pBitMap, iVal, 0);
|
||||
}
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NULL):
|
||||
SET_BIT1(pBitMap, iVal, v - 1);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbCmprData(pBitMap, szBitMap, TSDB_DATA_TYPE_TINYINT, cmprAlg, ppOut, nOut + size, &pBlockCol->szBitmap,
|
||||
ppBuf);
|
||||
code = tsdbCmprData(pColData->pBitMap, szBitMap, TSDB_DATA_TYPE_TINYINT, cmprAlg, ppOut, nOut + size,
|
||||
&pBlockCol->szBitmap, ppBuf);
|
||||
if (code) goto _exit;
|
||||
|
||||
if (pColData->flag != (HAS_VALUE | HAS_NULL | HAS_NONE)) {
|
||||
taosMemoryFree(pBitMap);
|
||||
}
|
||||
}
|
||||
size += pBlockCol->szBitmap;
|
||||
|
||||
|
@ -2064,46 +1791,15 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
|
|||
uint8_t *p = pIn;
|
||||
// bitmap
|
||||
if (pBlockCol->szBitmap) {
|
||||
if (pBlockCol->flag != (HAS_VALUE | HAS_NULL | HAS_NONE)) {
|
||||
uint8_t *pBitMap = NULL;
|
||||
code = tsdbDecmprData(p, pBlockCol->szBitmap, TSDB_DATA_TYPE_TINYINT, cmprAlg, &pBitMap,
|
||||
BIT1_SIZE(pColData->nVal), ppBuf);
|
||||
if (code) goto _exit;
|
||||
|
||||
code = tRealloc(&pColData->pBitMap, BIT2_SIZE(pColData->nVal));
|
||||
if (code) {
|
||||
tFree(pBitMap);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// BIT1 to BIT2
|
||||
for (int32_t iVal = 0; iVal < nVal; iVal++) {
|
||||
uint8_t v = GET_BIT1(pBitMap, iVal);
|
||||
switch (pBlockCol->flag) {
|
||||
case (HAS_NULL | HAS_NONE):
|
||||
SET_BIT2(pColData->pBitMap, iVal, v);
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NONE):
|
||||
if (v) {
|
||||
SET_BIT2(pColData->pBitMap, iVal, 2);
|
||||
} else {
|
||||
SET_BIT2(pColData->pBitMap, iVal, 0);
|
||||
}
|
||||
break;
|
||||
case (HAS_VALUE | HAS_NULL):
|
||||
SET_BIT2(pColData->pBitMap, iVal, v + 1);
|
||||
break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
tFree(pBitMap);
|
||||
int32_t szBitMap;
|
||||
if (pColData->flag == (HAS_VALUE | HAS_NULL | HAS_NONE)) {
|
||||
szBitMap = BIT2_SIZE(pColData->nVal);
|
||||
} else {
|
||||
code = tsdbDecmprData(p, pBlockCol->szBitmap, TSDB_DATA_TYPE_TINYINT, cmprAlg, &pColData->pBitMap,
|
||||
BIT2_SIZE(pColData->nVal), ppBuf);
|
||||
if (code) goto _exit;
|
||||
szBitMap = BIT1_SIZE(pColData->nVal);
|
||||
}
|
||||
|
||||
code = tsdbDecmprData(p, pBlockCol->szBitmap, TSDB_DATA_TYPE_TINYINT, cmprAlg, &pColData->pBitMap, szBitMap, ppBuf);
|
||||
if (code) goto _exit;
|
||||
}
|
||||
p += pBlockCol->szBitmap;
|
||||
|
||||
|
|
|
@ -79,25 +79,33 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
|
|||
pEntry->dataLen = sizeof(SDeleterRes);
|
||||
|
||||
ASSERT(1 == pEntry->numOfRows);
|
||||
ASSERT(1 == pEntry->numOfCols);
|
||||
ASSERT(3 == pEntry->numOfCols);
|
||||
|
||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||
|
||||
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
|
||||
SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
|
||||
SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
|
||||
|
||||
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
|
||||
pRes->suid = pHandle->pParam->suid;
|
||||
pRes->uidList = pHandle->pParam->pUidList;
|
||||
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
|
||||
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
|
||||
strcpy(pRes->tableName, pHandle->pDeleter->tableFName);
|
||||
strcpy(pRes->tsColName, pHandle->pDeleter->tsColName);
|
||||
pRes->affectedRows = *(int64_t*)pColRes->pData;
|
||||
if (pRes->affectedRows) {
|
||||
pRes->skey = *(int64_t*)pColSKey->pData;
|
||||
pRes->ekey = *(int64_t*)pColEKey->pData;
|
||||
ASSERT(pRes->skey <= pRes->ekey);
|
||||
} else {
|
||||
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
|
||||
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
|
||||
}
|
||||
|
||||
pBuf->useSize += pEntry->dataLen;
|
||||
|
||||
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
||||
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||
|
||||
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
||||
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||
}
|
||||
|
||||
static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
|
||||
|
@ -172,7 +180,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
|||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
|
||||
*pLen = pEntry->dataLen;
|
||||
*pQueryEnd = pDeleter->queryEnd;
|
||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
|
||||
((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
||||
}
|
||||
|
||||
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||
|
@ -186,14 +195,14 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
|
||||
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||
pDeleter->pParam->pUidList = NULL;
|
||||
pOutput->numOfRows = pEntry->numOfRows;
|
||||
pOutput->numOfCols = pEntry->numOfCols;
|
||||
pOutput->compressed = pEntry->compressed;
|
||||
|
||||
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
|
||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
|
||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||
|
||||
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
|
||||
pOutput->bufStatus = updateStatus(pDeleter);
|
||||
|
@ -202,7 +211,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
pOutput->useconds = pDeleter->useconds;
|
||||
pOutput->precision = pDeleter->pSchema->precision;
|
||||
taosThreadMutexUnlock(&pDeleter->mutex);
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -211,7 +220,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
|
||||
taosMemoryFreeClear(pDeleter->nextOutput.pData);
|
||||
taosArrayDestroy(pDeleter->pParam->pUidList);
|
||||
taosMemoryFree(pDeleter->pParam);
|
||||
taosMemoryFree(pDeleter->pParam);
|
||||
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
|
||||
SDataDeleterBuf* pBuf = NULL;
|
||||
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||
|
@ -230,14 +239,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
|
||||
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
|
||||
void* pParam) {
|
||||
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
|
||||
if (NULL == deleter) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink;
|
||||
SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
|
||||
deleter->sink.fPut = putDataBlock;
|
||||
deleter->sink.fEndPut = endPut;
|
||||
deleter->sink.fGetLen = getDataLength;
|
||||
|
|
|
@ -399,6 +399,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
|
|||
COPY_SCALAR_FIELD(modifyType);
|
||||
COPY_SCALAR_FIELD(msgType);
|
||||
CLONE_NODE_FIELD(pAffectedRows);
|
||||
CLONE_NODE_FIELD(pStartTs);
|
||||
CLONE_NODE_FIELD(pEndTs);
|
||||
COPY_SCALAR_FIELD(tableId);
|
||||
COPY_SCALAR_FIELD(stableId);
|
||||
COPY_SCALAR_FIELD(tableType);
|
||||
|
|
|
@ -2431,6 +2431,8 @@ static const char* jkDeletePhysiPlanTsColName = "TsColName";
|
|||
static const char* jkDeletePhysiPlanDeleteTimeRangeStartKey = "DeleteTimeRangeStartKey";
|
||||
static const char* jkDeletePhysiPlanDeleteTimeRangeEndKey = "DeleteTimeRangeEndKey";
|
||||
static const char* jkDeletePhysiPlanAffectedRows = "AffectedRows";
|
||||
static const char* jkDeletePhysiPlanStartTs = "StartTs";
|
||||
static const char* jkDeletePhysiPlanEndTs = "EndTs";
|
||||
|
||||
static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SDataDeleterNode* pNode = (const SDataDeleterNode*)pObj;
|
||||
|
@ -2457,6 +2459,12 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkDeletePhysiPlanAffectedRows, nodeToJson, pNode->pAffectedRows);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkDeletePhysiPlanStartTs, nodeToJson, pNode->pStartTs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkDeletePhysiPlanEndTs, nodeToJson, pNode->pEndTs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2486,6 +2494,12 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkDeletePhysiPlanAffectedRows, &pNode->pAffectedRows);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkDeletePhysiPlanStartTs, &pNode->pStartTs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkDeletePhysiPlanEndTs, &pNode->pEndTs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2665,7 +2665,9 @@ enum {
|
|||
PHY_DELETER_CODE_TABLE_FNAME,
|
||||
PHY_DELETER_CODE_TS_COL_NAME,
|
||||
PHY_DELETER_CODE_DELETE_TIME_RANGE,
|
||||
PHY_DELETER_CODE_AFFECTED_ROWS
|
||||
PHY_DELETER_CODE_AFFECTED_ROWS,
|
||||
PHY_DELETER_CODE_START_TS,
|
||||
PHY_DELETER_CODE_END_TS
|
||||
};
|
||||
|
||||
static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -2690,6 +2692,12 @@ static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_AFFECTED_ROWS, nodeToMsg, pNode->pAffectedRows);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_START_TS, nodeToMsg, pNode->pStartTs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_END_TS, nodeToMsg, pNode->pEndTs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2722,6 +2730,12 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_DELETER_CODE_AFFECTED_ROWS:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAffectedRows);
|
||||
break;
|
||||
case PHY_DELETER_CODE_START_TS:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartTs);
|
||||
break;
|
||||
case PHY_DELETER_CODE_END_TS:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndTs);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -727,6 +727,8 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyNode(pStmt->pFromTable);
|
||||
nodesDestroyNode(pStmt->pWhere);
|
||||
nodesDestroyNode(pStmt->pCountFunc);
|
||||
nodesDestroyNode(pStmt->pFirstFunc);
|
||||
nodesDestroyNode(pStmt->pLastFunc);
|
||||
nodesDestroyNode(pStmt->pTagCond);
|
||||
break;
|
||||
}
|
||||
|
@ -791,6 +793,8 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
destroyVgDataBlockArray(pLogicNode->pDataBlocks);
|
||||
// pVgDataBlocks is weak reference
|
||||
nodesDestroyNode(pLogicNode->pAffectedRows);
|
||||
nodesDestroyNode(pLogicNode->pStartTs);
|
||||
nodesDestroyNode(pLogicNode->pEndTs);
|
||||
taosMemoryFreeClear(pLogicNode->pVgroupList);
|
||||
nodesDestroyList(pLogicNode->pInsertCols);
|
||||
break;
|
||||
|
@ -997,6 +1001,8 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
|
||||
destroyDataSinkNode((SDataSinkNode*)pSink);
|
||||
nodesDestroyNode(pSink->pAffectedRows);
|
||||
nodesDestroyNode(pSink->pStartTs);
|
||||
nodesDestroyNode(pSink->pEndTs);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_SUBPLAN: {
|
||||
|
|
|
@ -1787,10 +1787,10 @@ SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDb
|
|||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createCountFuncForDelete(SAstCreateContext* pCxt) {
|
||||
SNode* createFuncForDelete(SAstCreateContext* pCxt, const char* pFuncName) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
CHECK_OUT_OF_MEM(pFunc);
|
||||
strcpy(pFunc->functionName, "count");
|
||||
strcpy(pFunc->functionName, pFuncName);
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt, NULL))) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
|
@ -1804,8 +1804,10 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
|
|||
CHECK_OUT_OF_MEM(pStmt);
|
||||
pStmt->pFromTable = pTable;
|
||||
pStmt->pWhere = pWhere;
|
||||
pStmt->pCountFunc = createCountFuncForDelete(pCxt);
|
||||
if (NULL == pStmt->pCountFunc) {
|
||||
pStmt->pCountFunc = createFuncForDelete(pCxt, "count");
|
||||
pStmt->pFirstFunc = createFuncForDelete(pCxt, "first");
|
||||
pStmt->pLastFunc = createFuncForDelete(pCxt, "last");
|
||||
if (NULL == pStmt->pCountFunc || NULL == pStmt->pFirstFunc || NULL == pStmt->pLastFunc) {
|
||||
nodesDestroyNode((SNode*)pStmt);
|
||||
CHECK_OUT_OF_MEM(NULL);
|
||||
}
|
||||
|
|
|
@ -3347,10 +3347,16 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateDeleteWhere(pCxt, pDelete);
|
||||
}
|
||||
pCxt->currClause = SQL_CLAUSE_SELECT;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pCxt->currClause = SQL_CLAUSE_SELECT;
|
||||
code = translateExpr(pCxt, &pDelete->pCountFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, &pDelete->pFirstFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, &pDelete->pLastFunc);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1372,9 +1372,21 @@ static int32_t createDeleteAggLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pD
|
|||
}
|
||||
|
||||
int32_t code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, nodesCloneNode(pDelete->pCountFunc));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppend(pAgg->pAggFuncs, nodesCloneNode(pDelete->pFirstFunc));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListStrictAppend(pAgg->pAggFuncs, nodesCloneNode(pDelete->pLastFunc));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pCountFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pFirstFunc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pLastFunc);
|
||||
}
|
||||
// set the output
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
|
||||
|
@ -1405,7 +1417,9 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
|
|||
strcpy(pModify->tsColName, pRealTable->pMeta->schema->name);
|
||||
pModify->deleteTimeRange = pDelete->timeRange;
|
||||
pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc);
|
||||
if (NULL == pModify->pAffectedRows) {
|
||||
pModify->pStartTs = nodesCloneNode(pDelete->pFirstFunc);
|
||||
pModify->pEndTs = nodesCloneNode(pDelete->pLastFunc);
|
||||
if (NULL == pModify->pAffectedRows || NULL == pModify->pStartTs || NULL == pModify->pEndTs) {
|
||||
nodesDestroyNode((SNode*)pModify);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
|
@ -1323,9 +1323,9 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
|
||||
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
|
||||
SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
|
||||
SPartitionPhysiNode* pPart =
|
||||
(SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode,
|
||||
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION);
|
||||
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(
|
||||
pCxt, (SLogicNode*)pPartLogicNode,
|
||||
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION);
|
||||
if (NULL == pPart) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -1670,6 +1670,12 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
|
|||
|
||||
int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
|
||||
&pDeleter->pAffectedRows);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pStartTs, &pDeleter->pStartTs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pEndTs, &pDeleter->pEndTs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pDeleter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
|
||||
if (NULL == pDeleter->sink.pInputDataBlockDesc) {
|
||||
|
|
Loading…
Reference in New Issue