Merge pull request #16840 from taosdata/enh/3.0_planner_optimize

enh: the delete physical plan increases the timestamp interval of the actual deleted data
This commit is contained in:
Liu Jicong 2022-09-15 10:56:08 +08:00 committed by GitHub
commit deb731c1c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 105 additions and 25 deletions

View File

@ -151,6 +151,8 @@ typedef struct SVnodeModifyLogicNode {
SArray* pDataBlocks; SArray* pDataBlocks;
SVgDataBlocks* pVgDataBlocks; SVgDataBlocks* pVgDataBlocks;
SNode* pAffectedRows; // SColumnNode SNode* pAffectedRows; // SColumnNode
SNode* pStartTs; // SColumnNode
SNode* pEndTs; // SColumnNode
uint64_t tableId; uint64_t tableId;
uint64_t stableId; uint64_t stableId;
int8_t tableType; // table type int8_t tableType; // table type
@ -525,6 +527,8 @@ typedef struct SDataDeleterNode {
char tsColName[TSDB_COL_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN];
STimeWindow deleteTimeRange; STimeWindow deleteTimeRange;
SNode* pAffectedRows; SNode* pAffectedRows;
SNode* pStartTs;
SNode* pEndTs;
} SDataDeleterNode; } SDataDeleterNode;
typedef struct SSubplan { typedef struct SSubplan {

View File

@ -315,6 +315,8 @@ typedef struct SDeleteStmt {
SNode* pFromTable; // FROM clause SNode* pFromTable; // FROM clause
SNode* pWhere; // WHERE clause SNode* pWhere; // WHERE clause
SNode* pCountFunc; // count the number of rows affected SNode* pCountFunc; // count the number of rows affected
SNode* pFirstFunc; // the start timestamp when the data was actually deleted
SNode* pLastFunc; // the end timestamp when the data was actually deleted
SNode* pTagCond; // pWhere divided into pTagCond and timeRange SNode* pTagCond; // pWhere divided into pTagCond and timeRange
STimeWindow timeRange; STimeWindow timeRange;
uint8_t precision; uint8_t precision;

View File

@ -832,7 +832,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
tDecoderClear(pCoder); tDecoderClear(pCoder);
int32_t sz = taosArrayGetSize(pRes->uidList); int32_t sz = taosArrayGetSize(pRes->uidList);
if (sz == 0) { if (sz == 0 || pRes->affectedRows == 0) {
taosArrayDestroy(pRes->uidList); taosArrayDestroy(pRes->uidList);
return 0; return 0;
} }

View File

@ -79,20 +79,28 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pEntry->dataLen = sizeof(SDeleterRes); pEntry->dataLen = sizeof(SDeleterRes);
ASSERT(1 == pEntry->numOfRows); ASSERT(1 == pEntry->numOfRows);
ASSERT(1 == pEntry->numOfCols); ASSERT(3 == pEntry->numOfCols);
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
SDeleterRes* pRes = (SDeleterRes*)pEntry->data; SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
pRes->suid = pHandle->pParam->suid; pRes->suid = pHandle->pParam->suid;
pRes->uidList = pHandle->pParam->pUidList; pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
strcpy(pRes->tableName, pHandle->pDeleter->tableFName); strcpy(pRes->tableName, pHandle->pDeleter->tableFName);
strcpy(pRes->tsColName, pHandle->pDeleter->tsColName); strcpy(pRes->tsColName, pHandle->pDeleter->tsColName);
pRes->affectedRows = *(int64_t*)pColRes->pData; pRes->affectedRows = *(int64_t*)pColRes->pData;
if (pRes->affectedRows) {
pRes->skey = *(int64_t*)pColSKey->pData;
pRes->ekey = *(int64_t*)pColEKey->pData;
ASSERT(pRes->skey <= pRes->ekey);
} else {
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
}
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;
@ -172,7 +180,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData; SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
*pLen = pEntry->dataLen; *pLen = pEntry->dataLen;
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
@ -230,14 +239,15 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) { int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam) {
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
if (NULL == deleter) { if (NULL == deleter) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink; SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
deleter->sink.fPut = putDataBlock; deleter->sink.fPut = putDataBlock;
deleter->sink.fEndPut = endPut; deleter->sink.fEndPut = endPut;
deleter->sink.fGetLen = getDataLength; deleter->sink.fGetLen = getDataLength;

View File

@ -399,6 +399,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD(modifyType); COPY_SCALAR_FIELD(modifyType);
COPY_SCALAR_FIELD(msgType); COPY_SCALAR_FIELD(msgType);
CLONE_NODE_FIELD(pAffectedRows); CLONE_NODE_FIELD(pAffectedRows);
CLONE_NODE_FIELD(pStartTs);
CLONE_NODE_FIELD(pEndTs);
COPY_SCALAR_FIELD(tableId); COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(stableId); COPY_SCALAR_FIELD(stableId);
COPY_SCALAR_FIELD(tableType); COPY_SCALAR_FIELD(tableType);

View File

@ -2431,6 +2431,8 @@ static const char* jkDeletePhysiPlanTsColName = "TsColName";
static const char* jkDeletePhysiPlanDeleteTimeRangeStartKey = "DeleteTimeRangeStartKey"; static const char* jkDeletePhysiPlanDeleteTimeRangeStartKey = "DeleteTimeRangeStartKey";
static const char* jkDeletePhysiPlanDeleteTimeRangeEndKey = "DeleteTimeRangeEndKey"; static const char* jkDeletePhysiPlanDeleteTimeRangeEndKey = "DeleteTimeRangeEndKey";
static const char* jkDeletePhysiPlanAffectedRows = "AffectedRows"; static const char* jkDeletePhysiPlanAffectedRows = "AffectedRows";
static const char* jkDeletePhysiPlanStartTs = "StartTs";
static const char* jkDeletePhysiPlanEndTs = "EndTs";
static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
const SDataDeleterNode* pNode = (const SDataDeleterNode*)pObj; const SDataDeleterNode* pNode = (const SDataDeleterNode*)pObj;
@ -2457,6 +2459,12 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeletePhysiPlanAffectedRows, nodeToJson, pNode->pAffectedRows); code = tjsonAddObject(pJson, jkDeletePhysiPlanAffectedRows, nodeToJson, pNode->pAffectedRows);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeletePhysiPlanStartTs, nodeToJson, pNode->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeletePhysiPlanEndTs, nodeToJson, pNode->pEndTs);
}
return code; return code;
} }
@ -2486,6 +2494,12 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeletePhysiPlanAffectedRows, &pNode->pAffectedRows); code = jsonToNodeObject(pJson, jkDeletePhysiPlanAffectedRows, &pNode->pAffectedRows);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeletePhysiPlanStartTs, &pNode->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeletePhysiPlanEndTs, &pNode->pEndTs);
}
return code; return code;
} }

View File

@ -2665,7 +2665,9 @@ enum {
PHY_DELETER_CODE_TABLE_FNAME, PHY_DELETER_CODE_TABLE_FNAME,
PHY_DELETER_CODE_TS_COL_NAME, PHY_DELETER_CODE_TS_COL_NAME,
PHY_DELETER_CODE_DELETE_TIME_RANGE, PHY_DELETER_CODE_DELETE_TIME_RANGE,
PHY_DELETER_CODE_AFFECTED_ROWS PHY_DELETER_CODE_AFFECTED_ROWS,
PHY_DELETER_CODE_START_TS,
PHY_DELETER_CODE_END_TS
}; };
static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2690,6 +2692,12 @@ static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_AFFECTED_ROWS, nodeToMsg, pNode->pAffectedRows); code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_AFFECTED_ROWS, nodeToMsg, pNode->pAffectedRows);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_START_TS, nodeToMsg, pNode->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_END_TS, nodeToMsg, pNode->pEndTs);
}
return code; return code;
} }
@ -2722,6 +2730,12 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_DELETER_CODE_AFFECTED_ROWS: case PHY_DELETER_CODE_AFFECTED_ROWS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAffectedRows); code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAffectedRows);
break; break;
case PHY_DELETER_CODE_START_TS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartTs);
break;
case PHY_DELETER_CODE_END_TS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndTs);
break;
default: default:
break; break;
} }

View File

@ -727,6 +727,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pFromTable); nodesDestroyNode(pStmt->pFromTable);
nodesDestroyNode(pStmt->pWhere); nodesDestroyNode(pStmt->pWhere);
nodesDestroyNode(pStmt->pCountFunc); nodesDestroyNode(pStmt->pCountFunc);
nodesDestroyNode(pStmt->pFirstFunc);
nodesDestroyNode(pStmt->pLastFunc);
nodesDestroyNode(pStmt->pTagCond); nodesDestroyNode(pStmt->pTagCond);
break; break;
} }
@ -791,6 +793,8 @@ void nodesDestroyNode(SNode* pNode) {
destroyVgDataBlockArray(pLogicNode->pDataBlocks); destroyVgDataBlockArray(pLogicNode->pDataBlocks);
// pVgDataBlocks is weak reference // pVgDataBlocks is weak reference
nodesDestroyNode(pLogicNode->pAffectedRows); nodesDestroyNode(pLogicNode->pAffectedRows);
nodesDestroyNode(pLogicNode->pStartTs);
nodesDestroyNode(pLogicNode->pEndTs);
taosMemoryFreeClear(pLogicNode->pVgroupList); taosMemoryFreeClear(pLogicNode->pVgroupList);
nodesDestroyList(pLogicNode->pInsertCols); nodesDestroyList(pLogicNode->pInsertCols);
break; break;
@ -997,6 +1001,8 @@ void nodesDestroyNode(SNode* pNode) {
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode; SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink); destroyDataSinkNode((SDataSinkNode*)pSink);
nodesDestroyNode(pSink->pAffectedRows); nodesDestroyNode(pSink->pAffectedRows);
nodesDestroyNode(pSink->pStartTs);
nodesDestroyNode(pSink->pEndTs);
break; break;
} }
case QUERY_NODE_PHYSICAL_SUBPLAN: { case QUERY_NODE_PHYSICAL_SUBPLAN: {

View File

@ -1787,10 +1787,10 @@ SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDb
return (SNode*)pStmt; return (SNode*)pStmt;
} }
SNode* createCountFuncForDelete(SAstCreateContext* pCxt) { SNode* createFuncForDelete(SAstCreateContext* pCxt, const char* pFuncName) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(pFunc); CHECK_OUT_OF_MEM(pFunc);
strcpy(pFunc->functionName, "count"); strcpy(pFunc->functionName, pFuncName);
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt, NULL))) { if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt, NULL))) {
nodesDestroyNode((SNode*)pFunc); nodesDestroyNode((SNode*)pFunc);
CHECK_OUT_OF_MEM(NULL); CHECK_OUT_OF_MEM(NULL);
@ -1804,8 +1804,10 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
CHECK_OUT_OF_MEM(pStmt); CHECK_OUT_OF_MEM(pStmt);
pStmt->pFromTable = pTable; pStmt->pFromTable = pTable;
pStmt->pWhere = pWhere; pStmt->pWhere = pWhere;
pStmt->pCountFunc = createCountFuncForDelete(pCxt); pStmt->pCountFunc = createFuncForDelete(pCxt, "count");
if (NULL == pStmt->pCountFunc) { pStmt->pFirstFunc = createFuncForDelete(pCxt, "first");
pStmt->pLastFunc = createFuncForDelete(pCxt, "last");
if (NULL == pStmt->pCountFunc || NULL == pStmt->pFirstFunc || NULL == pStmt->pLastFunc) {
nodesDestroyNode((SNode*)pStmt); nodesDestroyNode((SNode*)pStmt);
CHECK_OUT_OF_MEM(NULL); CHECK_OUT_OF_MEM(NULL);
} }

View File

@ -3347,10 +3347,16 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateDeleteWhere(pCxt, pDelete); code = translateDeleteWhere(pCxt, pDelete);
} }
if (TSDB_CODE_SUCCESS == code) {
pCxt->currClause = SQL_CLAUSE_SELECT; pCxt->currClause = SQL_CLAUSE_SELECT;
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pDelete->pCountFunc); code = translateExpr(pCxt, &pDelete->pCountFunc);
} }
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pDelete->pFirstFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pDelete->pLastFunc);
}
return code; return code;
} }

View File

@ -1372,9 +1372,21 @@ static int32_t createDeleteAggLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pD
} }
int32_t code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, nodesCloneNode(pDelete->pCountFunc)); int32_t code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, nodesCloneNode(pDelete->pCountFunc));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pAgg->pAggFuncs, nodesCloneNode(pDelete->pFirstFunc));
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pAgg->pAggFuncs, nodesCloneNode(pDelete->pLastFunc));
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pCountFunc); code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pCountFunc);
} }
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pFirstFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pLastFunc);
}
// set the output // set the output
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets); code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
@ -1405,7 +1417,9 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
strcpy(pModify->tsColName, pRealTable->pMeta->schema->name); strcpy(pModify->tsColName, pRealTable->pMeta->schema->name);
pModify->deleteTimeRange = pDelete->timeRange; pModify->deleteTimeRange = pDelete->timeRange;
pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc); pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc);
if (NULL == pModify->pAffectedRows) { pModify->pStartTs = nodesCloneNode(pDelete->pFirstFunc);
pModify->pEndTs = nodesCloneNode(pDelete->pLastFunc);
if (NULL == pModify->pAffectedRows || NULL == pModify->pStartTs || NULL == pModify->pEndTs) {
nodesDestroyNode((SNode*)pModify); nodesDestroyNode((SNode*)pModify);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }

View File

@ -1323,8 +1323,8 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) { SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
SPartitionPhysiNode* pPart = SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(
(SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, pCxt, (SLogicNode*)pPartLogicNode,
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION); pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION);
if (NULL == pPart) { if (NULL == pPart) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1670,6 +1670,12 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows, int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
&pDeleter->pAffectedRows); &pDeleter->pAffectedRows);
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pStartTs, &pDeleter->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pEndTs, &pDeleter->pEndTs);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pDeleter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc); pDeleter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
if (NULL == pDeleter->sink.pInputDataBlockDesc) { if (NULL == pDeleter->sink.pInputDataBlockDesc) {