diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8d93cbb001..fefc1d6b79 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1595,6 +1595,39 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, return code; } +void appendOneRowToSpecialBlockImpl(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs, + TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName, void* pPkData) { + SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false); + colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); + colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false); + colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false); + colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false); + colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pCalEndTs, false); + colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL); + if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { + SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); + colDataSetVal(pPkCol, pBlock->info.rows, (const char*)pPkData, pPkData == NULL); + } + pBlock->info.rows++; +} + +void appendPkToSpecialBlock(SSDataBlock* pBlock, TSKEY* pTsArray, SColumnInfoData* pPkCol, int32_t rowId, + uint64_t* pUid, uint64_t* pGp, void* pTbName) { + void* pVal = NULL; + if (pPkCol) { + pVal = colDataGetData(pPkCol, rowId); + } + appendOneRowToSpecialBlockImpl(pBlock, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pUid, + pGp, pTbName, pVal); +} + static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int64_t version, char* taskIdStr, SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, startTs, endTs, version); @@ -1606,9 +1639,13 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int } SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex); + SColumnInfoData* pPkCol = NULL; + if (hasPrimaryKey(pInfo)) { + pPkCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex); + } for (int32_t i = 0; i < pPreRes->info.rows; i++) { uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i); - appendDataToSpecialBlock(pBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &uid, &groupId, NULL); + appendPkToSpecialBlock(pBlock, (TSKEY*)pTsCol->pData, pPkCol, i, &uid, &groupId, NULL); } printDataBlock(pBlock, "new delete", taskIdStr); } @@ -1624,7 +1661,9 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr TSKEY* endData = (TSKEY*)pEndTsCol->pData; SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); uint64_t* uidCol = (uint64_t*)pUidCol->pData; + SColumnInfoData* pGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pSrcPkCol = NULL; + uint64_t* pSrcGp = (uint64_t*)pGpCol->pData; if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); } @@ -1635,6 +1674,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; uidCol = (uint64_t*)pUidCol->pData; + pSrcGp = (uint64_t*)pGpCol->pData; } blockDataCleanup(pDestBlock); int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); @@ -1649,11 +1689,14 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { - void* pVal = NULL; - if (hasPrimaryKey(pInfo)) { - pVal = colDataGetData(pSrcPkCol, i); + uint64_t groupId = pSrcGp[i]; + if (groupId == 0) { + void* pVal = NULL; + if (hasPrimaryKey(pInfo)) { + pVal = colDataGetData(pSrcPkCol, i); + } + groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); } - uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); // gap must be 0. SSessionKey startWin = {0}; getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin); @@ -1695,6 +1738,8 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB TSKEY* endData = (TSKEY*)pEndTsCol->pData; SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); uint64_t* uidCol = (uint64_t*)pUidCol->pData; + SColumnInfoData* pGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* pSrcGp = (uint64_t*)pGpCol->pData; SColumnInfoData* pSrcPkCol = NULL; if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX ) { pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); @@ -1706,6 +1751,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB startData = (TSKEY*)pStartTsCol->pData; endData = (TSKEY*)pEndTsCol->pData; uidCol = (uint64_t*)pUidCol->pData; + pSrcGp = (uint64_t*)pGpCol->pData; } int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); @@ -1720,11 +1766,14 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { - void* pVal = NULL; - if (hasPrimaryKey(pInfo)) { - pVal = colDataGetData(pSrcPkCol, i); + uint64_t groupId = pSrcGp[i]; + if (groupId == 0) { + void* pVal = NULL; + if (hasPrimaryKey(pInfo)) { + pVal = colDataGetData(pSrcPkCol, i); + } + groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); } - uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); SSessionKey startWin = {.win.skey = startData[i], .win.ekey = endData[i], .groupId = groupId}; SSessionKey range = {0}; getCountWinRange(pInfo->windowSup.pStreamAggSup, &startWin, mode, &range); @@ -1939,39 +1988,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -void appendOneRowToSpecialBlockImpl(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs, - TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName, void* pPkData) { - SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); - SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); - SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); - SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); - colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false); - colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); - colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false); - colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false); - colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false); - colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pCalEndTs, false); - colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL); - if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { - SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); - colDataSetVal(pPkCol, pBlock->info.rows, (const char*)pPkData, pPkData == NULL); - } - pBlock->info.rows++; -} - -void appendPkToSpecialBlock(SSDataBlock* pBlock, TSKEY* pTsArray, SColumnInfoData* pPkCol, int32_t rowId, - uint64_t* pUid, uint64_t* pGp, void* pTbName) { - void* pVal = NULL; - if (pPkCol) { - pVal = colDataGetData(pPkCol, rowId); - } - appendOneRowToSpecialBlockImpl(pBlock, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pUid, - pGp, pTbName, pVal); -} - void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName) { appendOneRowToSpecialBlockImpl(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName, NULL); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 48ee9137f6..89926b1041 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1262,6 +1262,7 @@ ,,y,script,./test.sh -f tsim/stream/streamPrimaryKey0.sim ,,y,script,./test.sh -f tsim/stream/streamPrimaryKey1.sim ,,y,script,./test.sh -f tsim/stream/streamPrimaryKey2.sim +,,y,script,./test.sh -f tsim/stream/streamPrimaryKey3.sim ,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim ,,y,script,./test.sh -f tsim/stream/triggerSession0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndCol0.sim diff --git a/tests/script/tsim/stream/streamPrimaryKey3.sim b/tests/script/tsim/stream/streamPrimaryKey3.sim index 16b05d02ae..73038b6732 100644 --- a/tests/script/tsim/stream/streamPrimaryKey3.sim +++ b/tests/script/tsim/stream/streamPrimaryKey3.sim @@ -238,7 +238,57 @@ sql insert into t1 values(1648791210000,1,3,3,1.0); $loop_count = 0 -loop2: +loop6: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt3 order by 1,2 +sql select * from streamt3 order by 1,2; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + +if $rows != 2 then + print =====rows=$rows + goto loop6 +endi + +if $data01 != 3 then + print =====data01=$data01 + goto loop6 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop6 +endi + +if $data11 != 4 then + print =====data11=$data11 + goto loop6 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop6 +endi + +sql insert into t1 values(1648791210000,3,5,3,1.0); + +sql insert into t1 values(1648791210001,1,3,3,1.0); +sql insert into t1 values(1648791210001,2,4,3,1.0); +sql insert into t1 values(1648791210001,3,5,3,1.0); + +$loop_count = 0 + +loop7: sleep 200 @@ -254,29 +304,65 @@ print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 -if $rows != 2 then +if $rows != 3 then print =====rows=$rows - goto loop2 + goto loop7 endi -if $data01 != 3 then - print =====data01=$data01 - goto loop2 +if $data02 != 2 then + print =====data02=$data02 + goto loop7 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop7 +endi + +if $data22 != 2 then + print =====data22=$data22 + goto loop7 +endi + +print delete from t1 where ts = 1648791210000; +sql delete from t1 where ts = 1648791210000; + +$loop_count = 0 + +loop8: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt3 order by 1,2 +sql select * from streamt3 order by 1,2; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 + +if $rows != 3 then + print =====rows=$rows + goto loop8 endi if $data02 != 1 then print =====data02=$data02 - goto loop2 -endi - -if $data11 != 4 then - print =====data11=$data11 - goto loop2 + goto loop8 endi if $data12 != 1 then print =====data12=$data12 - goto loop2 + goto loop8 +endi + +if $data22 != 1 then + print =====data22=$data22 + goto loop8 endi system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file