diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 73d1e08f37..531a89b3f7 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -451,6 +451,7 @@ typedef struct SStreamScanInfo { SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; SExprSupp tbnameCalSup; + SExprSupp* pPartTbnameSup; SExprSupp tagCalSup; int32_t primaryTsIndex; // primary time stamp slot id int32_t primaryKeyIndex; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 87b8cd366c..670771adcc 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1255,7 +1255,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) { taosMemoryFreeClear(param); } -void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) { +void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) { SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI; if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -1265,6 +1265,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->partitionSup = *pParSup; pScanInfo->pPartScalarSup = pExpr; + pScanInfo->pPartTbnameSup = pTbnameExpr; if (!pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate); } @@ -1408,7 +1409,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); - initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); + initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup); code = appendDownstream(pOperator, &downstream, 1); return pOperator; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8c04095943..7062b0dd27 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1783,28 +1783,57 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS return TSDB_CODE_SUCCESS; } -static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { - SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; +static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) { blockDataCleanup(pInfo->pCreateTbRes); if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { pBlock->info.parTbName[0] = 0; } else { appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, - pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore); + pBlock->info.id.groupId, pBlock, rowId, pInfo->pCreateTbRes, &pInfo->stateStore); } } -static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { - blockDataCleanup(pDestBlock); - int32_t rows = pSrcBlock->info.rows; - if (rows == 0) { - return TSDB_CODE_SUCCESS; - } - int32_t code = blockDataEnsureCapacity(pDestBlock, rows); - if (code != TSDB_CODE_SUCCESS) { - return code; - } +static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { + SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData; + SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; + SColumnInfoData* pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); + ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; + TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; + int64_t ver = pSrcBlock->info.version - 1; + for (int32_t delI = 0; delI < pSrcBlock->info.rows; delI++) { + uint64_t groupId = 0; + uint64_t srcUid = srcUidData[delI]; + char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0}; + SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[delI], srcEndTsCol[delI], ver); + blockDataEnsureCapacity(pDestBlock, pDestBlock->info.rows + pPreRes->info.rows); + for (int32_t preJ = 0; preJ < pPreRes->info.rows; preJ++) { + groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, preJ); + if (pInfo->pPartTbnameSup) { + void* parTbname = NULL; + int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); + if (code != TSDB_CODE_SUCCESS) { + calBlockTbName(pInfo, pPreRes, preJ); + memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName)); + } else { + memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); + } + varDataSetLen(tbname, strlen(varDataVal(tbname))); + pInfo->stateStore.streamStateFreeVal(parTbname); + } + appendDataToSpecialBlock(pDestBlock, srcStartTsCol + delI, srcEndTsCol + delI, srcUidData + delI, &groupId, + tbname[0] == 0 ? NULL : tbname); + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); @@ -1830,12 +1859,11 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS } if (pInfo->tbnameCalSup.pExprInfo) { void* parTbname = NULL; - code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); + int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); if (code != TSDB_CODE_SUCCESS) { - // todo(liuyao) 这里可能需要修改,需要考虑复合主键时,pPreRes包含多行数据。 SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver); printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo)); - calBlockTbName(pInfo, pPreRes); + calBlockTbName(pInfo, pPreRes, 0); memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName)); } else { memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); @@ -1849,6 +1877,22 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS return TSDB_CODE_SUCCESS; } +static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { + blockDataCleanup(pDestBlock); + int32_t rows = pSrcBlock->info.rows; + if (rows == 0) { + return TSDB_CODE_SUCCESS; + } + int32_t code = blockDataEnsureCapacity(pDestBlock, rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pInfo->partitionSup.needCalc) { + return generatePartitionDelResBlock(pInfo, pSrcBlock, pDestBlock); + } + return generateDeleteResultBlockImpl(pInfo, pSrcBlock, pDestBlock); +} + static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType type) { int32_t code = TSDB_CODE_SUCCESS; if (isIntervalWindow(pInfo)) { @@ -2120,7 +2164,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock return 0; } - calBlockTbName(pInfo, pInfo->pRes); + calBlockTbName(pInfo, pInfo->pRes, 0); return 0; } @@ -2380,7 +2424,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp); if (pInfo->pRecoverRes != NULL) { - calBlockTbName(pInfo, pInfo->pRecoverRes); + calBlockTbName(pInfo, pInfo->pRecoverRes, 0); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); @@ -2469,11 +2513,11 @@ FETCH_NEXT_BLOCK: if (!isStreamWindow(pInfo)) { generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; - printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); + pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; blockDataDestroy(pDelBlock); if (pInfo->pDeleteDataRes->info.rows > 0) { + printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); return pInfo->pDeleteDataRes; } else { goto FETCH_NEXT_BLOCK; @@ -2485,12 +2529,12 @@ FETCH_NEXT_BLOCK: prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); if (pInfo->tqReader) { blockDataDestroy(pDelBlock); } if (pInfo->pDeleteDataRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo)); return pInfo->pDeleteDataRes; } else { goto FETCH_NEXT_BLOCK; @@ -2542,7 +2586,7 @@ FETCH_NEXT_BLOCK: checkUpdateData(pInfo, true, pSDB, false); } printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo)); - calBlockTbName(pInfo, pSDB); + calBlockTbName(pInfo, pSDB, 0); return pSDB; } blockDataCleanup(pInfo->pUpdateDataRes); @@ -2940,6 +2984,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } } + pInfo->pPartTbnameSup = NULL; if (pTableScanNode->pSubtable != NULL) { SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); if (pSubTableExpr == NULL) { diff --git a/tests/script/tsim/stream/deleteScalar.sim b/tests/script/tsim/stream/deleteScalar.sim new file mode 100644 index 0000000000..c489d1d0d8 --- /dev/null +++ b/tests/script/tsim/stream/deleteScalar.sim @@ -0,0 +1,175 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 1000 +sql connect + +sql drop database if exists test; +sql create database test vgroups 1; +sql use test; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select ts, a, b from t1 partition by a; + +sleep 1000 + +sql insert into t1 values(1648791213000,0,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); + +sql insert into t1 values(1648791213003,0,2,3,1.0); +sql insert into t1 values(1648791213004,1,2,3,1.0); +sql insert into t1 values(1648791213005,2,2,3,1.0); + +$loop_count = 0 + +loop0: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 +print $data50 $data51 $data52 +print $data60 $data61 $data62 +print $data70 $data71 $data72 + +if $rows != 6 then + print ======rows=$rows + goto loop0 +endi + +print delete from t1 where ts <= 1648791213002; +sql delete from t1 where ts <= 1648791213002; + +$loop_count = 0 + +loop1: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt order by 1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + +if $rows != 3 then + print ======rows=$rows + goto loop1 +endi + +if $data01 != 0 then + print ======data01=$data01 + goto loop1 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop1 +endi + +if $data21 != 2 then + print ======data21=$data21 + goto loop1 +endi + +print ======================step 2 + +sql drop database if exists test1; +sql create database test1 vgroups 1; +sql use test1; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 subtable(concat("aaa-", cast( a as varchar(10) ))) as select ts, a, b from t1 partition by a; + +sleep 1000 + +sql insert into t1 values(1648791213000,0,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); + +sql insert into t1 values(1648791213003,0,2,3,1.0); +sql insert into t1 values(1648791213004,1,2,3,1.0); +sql insert into t1 values(1648791213005,2,2,3,1.0); + +$loop_count = 0 + +loop2: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 +print $data50 $data51 $data52 +print $data60 $data61 $data62 +print $data70 $data71 $data72 + +if $rows != 6 then + print ======rows=$rows + goto loop2 +endi + +print delete from t1 where ts <= 1648791213002; +sql delete from t1 where ts <= 1648791213002; + +$loop_count = 0 + +loop3: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt1 order by 1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + +if $rows != 3 then + print ======rows=$rows + goto loop3 +endi + +if $data01 != 0 then + print ======data01=$data01 + goto loop3 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop3 +endi + +if $data21 != 2 then + print ======data21=$data21 + goto loop3 +endi + +system sh/stop_dnodes.sh + +#goto looptest \ No newline at end of file diff --git a/tests/script/tsim/stream/streamPrimaryKey3.sim b/tests/script/tsim/stream/streamPrimaryKey3.sim index 39d55bda6c..319fa59da6 100644 --- a/tests/script/tsim/stream/streamPrimaryKey3.sim +++ b/tests/script/tsim/stream/streamPrimaryKey3.sim @@ -70,5 +70,36 @@ if $data12 != 4 then goto loop0 endi +sql insert into t1 values(1648791210001,3,1,1,2.0); + +print sql delete from t1 where ts = 1648791210000; +sql delete from t1 where ts = 1648791210000; + +$loop_count = 0 + +loop1: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt1 order by 1,2 +sql select * from streamt1 order by 1,2; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 + +if $rows != 1 then + print =====rows=$rows + goto loop1 +endi + +if $data01 != 3 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file