Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

This commit is contained in:
Hongze Cheng 2022-06-10 03:47:38 +00:00
commit a4638486d9
18 changed files with 233 additions and 6 deletions

View File

@ -0,0 +1,43 @@
---
sidebar_label: 删除数据
description: "删除指定表或超级表中的数据记录"
title: "删除数据"
---
删除数据是 TDengine 提供的根据指定时间段删除指定表或超级表中数据记录的功能,方便用户清理由于设备故障等原因产生的异常数据。
注意:本功能只在企业版 2.6.0.0 及以后的版本中提供,如需此功能请点击下面的链接访问[企业版产品](https://www.taosdata.com/products#enterprise-edition-link)
**语法:**
```sql
DELETE FROM [ db_name. ] tb_name [WHERE condition];
```
**功能:** 删除指定表或超级表中的数据记录
**参数:**
- `db_name` 可选参数,指定要删除表所在的数据库名,不填写则在当前数据库中
- `tb_name` 必填参数,指定要删除数据的表名,可以是普通表、子表,也可以是超级表。
- `condition` 可选参数指定删除数据的过滤条件不指定过滤条件则为表中所有数据请慎重使用。特别说明这里的where 条件中只支持对第一列时间列的过滤,如果是超级表,支持对 tag 列过滤。
**特别说明:**
数据删除后不可恢复,请慎重使用。为了确保删除的数据确实是自己要删除的,建议可以先使用 `select` 语句加 `where` 后的删除条件查看要删除的数据内容,确认无误后再执行 `delete` 命令。
**示例:**
`meters` 是一个超级表,`groupid` 是 int 类型的 tag 列,现在要删除 `meters` 表中时间小于 2021-10-01 10:40:00.100 且 tag 列 `groupid` 值为 1 的所有数据sql 如下:
```sql
delete from meters where ts < '2021-10-01 10:40:00.100' and groupid=1 ;
```
执行后显示结果为:
```
Deleted 102000 row(s) from 1020 table(s) (0.421950s)
```
表示从 1020 个子表中共删除了 102000 行数据

View File

@ -0,0 +1,42 @@
---
sidebar_label: Delete Data
description: "Delete data from table or Stable"
title: Delete Data
---
TDengine provides the functionality of deleting data from a table or STable according to specified time range, it can be used to cleanup abnormal data generated due to device failure. Please be noted that this functionality is only available in Enterprise version, please refer to [TDengine Enterprise Edition](https://tdengine.com/products#enterprise-edition-link)
**Syntax:**
```sql
DELETE FROM [ db_name. ] tb_name [WHERE condition];
```
**Description:** Delete data from a table or STable
**Parameters:**
- `db_name`: Optional parameter, specifies the database in which the table exists; if not specified, the current database will be used.
- `tb_name`: Mandatory parameter, specifies the table name from which data will be deleted, it can be normal table, subtable or STable.
- `condition`: Optional parameter, specifies the data filter condition. If no condition is specified all data will be deleted, so please be cautions to delete data without any condition. The condition used here is only applicable to the first column, i.e. the timestamp column. If the table is a STable, the condition is also applicable to tag columns.
**More Explanations:**
The data can't be recovered once deleted, so please be cautious to use the functionality of deleting data. It's better to firstly make sure the data to be deleted using `select` then execute `delete`.
**Example:**
`meters` is a STable, in which `groupid` is a tag column of int type. Now we want to delete the data older than 2021-10-01 10:40:00.100 and `groupid` is 1. The SQL for this purpose is like below:
```sql
delete from meters where ts < '2021-10-01 10:40:00.100' and groupid=1 ;
```
The output is:
```
Deleted 102000 row(s) from 1020 table(s) (0.421950s)
```
It means totally 102,000 rows of data have been deleted from 1,020 sub tables.

View File

@ -1205,6 +1205,7 @@ typedef struct {
int8_t completed; // all results are returned to client
int8_t precision;
int8_t compressed;
int8_t streamBlockType;
int32_t compLen;
int32_t numOfRows;
int32_t numOfCols;
@ -2512,7 +2513,6 @@ int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
void tFreeSTableIndexInfo(void *pInfo);
typedef struct {
int8_t mqMsgType;
int32_t code;
@ -2753,8 +2753,8 @@ typedef struct {
char* msg;
} SVDeleteReq;
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq);
int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
typedef struct {
int64_t affectedRows;

View File

@ -131,6 +131,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_HISTOGRAM_MERGE,
FUNCTION_TYPE_HYPERLOGLOG_PARTIAL,
FUNCTION_TYPE_HYPERLOGLOG_MERGE,
FUNCTION_TYPE_ELAPSED_PARTIAL,
FUNCTION_TYPE_ELAPSED_MERGE,
// user defined funcion
FUNCTION_TYPE_UDF = 10000

View File

@ -35,6 +35,13 @@
extern bool tsStreamSchedV;
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->childId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
int32_t* pLen, double filesFactor) {
SNode* pAst = NULL;
@ -195,7 +202,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayPush(tasks, &pTask);
mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
@ -235,7 +242,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayPush(tasks, &pTask);
mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pStream->fixedSinkVgId;
#if 0
@ -378,7 +385,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return -1;
}
sdbRelease(pSdb, pVgroup);
taosArrayPush(taskOneLevel, &pTask);
mndAddTaskToTaskSet(taskOneLevel, pTask);
}
} else {
// merge plan

View File

@ -116,7 +116,10 @@ int32_t getSpreadInfoSize();
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t elapsedFunction(SqlFunctionCtx* pCtx);
int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx);
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getElapsedInfoSize();
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);

View File

@ -444,6 +444,64 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
return TSDB_CODE_SUCCESS;
}
static int32_t translateElapsedImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (isPartial) {
if (1 != numOfParams && 2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (TSDB_DATA_TYPE_TIMESTAMP != paraType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
// param1
if (2 == numOfParams) {
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
if (QUERY_NODE_VALUE != nodeType(pParamNode1)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)pParamNode1;
pValue->notReserved = true;
paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i == 0) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be greater than db precision");
}
}
pFunc->node.resType = (SDataType){.bytes = getElapsedInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (TSDB_DATA_TYPE_BINARY != paraType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateElapsedPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateElapsedImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateElapsedMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateElapsedImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (3 != numOfParams) {
@ -1468,6 +1526,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getElapsedFuncEnv,
.initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunction,
.finalizeFunc = elapsedFinalize,
.pPartialFunc = "_elapsed_partial",
.pMergeFunc = "_elapsed_merge"
},
{
.name = "_elapsed_partial",
.type = FUNCTION_TYPE_ELAPSED,
.classification = FUNC_MGT_AGG_FUNC,
.dataRequiredFunc = statisDataRequired,
.translateFunc = translateElapsedPartial,
.getEnvFunc = getElapsedFuncEnv,
.initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunction,
.finalizeFunc = elapsedPartialFinalize
},
{
.name = "_elapsed_merge",
.type = FUNCTION_TYPE_ELAPSED,
.classification = FUNC_MGT_AGG_FUNC,
.dataRequiredFunc = statisDataRequired,
.translateFunc = translateElapsedMerge,
.getEnvFunc = getElapsedFuncEnv,
.initFunc = elapsedFunctionSetup,
.processFunc = elapsedFunctionMerge,
.finalizeFunc = elapsedFinalize
},
{

View File

@ -3101,6 +3101,10 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
int32_t getElapsedInfoSize() {
return (int32_t)sizeof(SElapsedInfo);
}
bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SElapsedInfo);
return true;
@ -3202,6 +3206,30 @@ _elapsed_over:
return TSDB_CODE_SUCCESS;
}
int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
char* data = colDataGetData(pCol, start);
SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data);
pInfo->timeUnit = pInputInfo->timeUnit;
if (pInfo->min > pInputInfo->min) {
pInfo->min = pInputInfo->min;
}
if (pInfo->max < pInputInfo->max) {
pInfo->max = pInputInfo->max;
}
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
double result = (double)pInfo->max - (double)pInfo->min;
@ -3210,6 +3238,24 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return functionFinalize(pCtx, pBlock);
}
int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getElapsedInfoSize();
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return pResInfo->numOfRes;
}
int32_t getHistogramInfoSize() {
return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
}

View File

@ -53,6 +53,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor
pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->sourceChildId;
}
pData->blocks = pArray;

View File

@ -72,6 +72,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->numOfCols = htonl(pBlock->info.numOfCols);