This commit is contained in:
54liuyao 2024-04-01 16:20:45 +08:00
parent 1405b30677
commit a65ce818de
3 changed files with 158 additions and 55 deletions

View File

@ -1595,6 +1595,39 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
return code;
}
void appendOneRowToSpecialBlockImpl(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs,
TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName, void* pPkData) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false);
colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pCalEndTs, false);
colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) {
SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
colDataSetVal(pPkCol, pBlock->info.rows, (const char*)pPkData, pPkData == NULL);
}
pBlock->info.rows++;
}
void appendPkToSpecialBlock(SSDataBlock* pBlock, TSKEY* pTsArray, SColumnInfoData* pPkCol, int32_t rowId,
uint64_t* pUid, uint64_t* pGp, void* pTbName) {
void* pVal = NULL;
if (pPkCol) {
pVal = colDataGetData(pPkCol, rowId);
}
appendOneRowToSpecialBlockImpl(pBlock, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pUid,
pGp, pTbName, pVal);
}
static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int64_t version, char* taskIdStr,
SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, startTs, endTs, version);
@ -1606,9 +1639,13 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int
}
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
SColumnInfoData* pPkCol = NULL;
if (hasPrimaryKey(pInfo)) {
pPkCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex);
}
for (int32_t i = 0; i < pPreRes->info.rows; i++) {
uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
appendDataToSpecialBlock(pBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &uid, &groupId, NULL);
appendPkToSpecialBlock(pBlock, (TSKEY*)pTsCol->pData, pPkCol, i, &uid, &groupId, NULL);
}
printDataBlock(pBlock, "new delete", taskIdStr);
}
@ -1624,7 +1661,9 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pSrcPkCol = NULL;
uint64_t* pSrcGp = (uint64_t*)pGpCol->pData;
if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) {
pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
}
@ -1635,6 +1674,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
startData = (TSKEY*)pStartTsCol->pData;
endData = (TSKEY*)pEndTsCol->pData;
uidCol = (uint64_t*)pUidCol->pData;
pSrcGp = (uint64_t*)pGpCol->pData;
}
blockDataCleanup(pDestBlock);
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
@ -1649,11 +1689,14 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = pSrcGp[i];
if (groupId == 0) {
void* pVal = NULL;
if (hasPrimaryKey(pInfo)) {
pVal = colDataGetData(pSrcPkCol, i);
}
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
}
// gap must be 0.
SSessionKey startWin = {0};
getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
@ -1695,6 +1738,8 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pSrcGp = (uint64_t*)pGpCol->pData;
SColumnInfoData* pSrcPkCol = NULL;
if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX ) {
pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
@ -1706,6 +1751,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
startData = (TSKEY*)pStartTsCol->pData;
endData = (TSKEY*)pEndTsCol->pData;
uidCol = (uint64_t*)pUidCol->pData;
pSrcGp = (uint64_t*)pGpCol->pData;
}
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
@ -1720,11 +1766,14 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t groupId = pSrcGp[i];
if (groupId == 0) {
void* pVal = NULL;
if (hasPrimaryKey(pInfo)) {
pVal = colDataGetData(pSrcPkCol, i);
}
uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
}
SSessionKey startWin = {.win.skey = startData[i], .win.ekey = endData[i], .groupId = groupId};
SSessionKey range = {0};
getCountWinRange(pInfo->windowSup.pStreamAggSup, &startWin, mode, &range);
@ -1939,39 +1988,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code;
}
void appendOneRowToSpecialBlockImpl(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, TSKEY* pCalStartTs,
TSKEY* pCalEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName, void* pPkData) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pCalStartTs, false);
colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pCalEndTs, false);
colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
if (taosArrayGetSize(pBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) {
SColumnInfoData* pPkCol = taosArrayGet(pBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
colDataSetVal(pPkCol, pBlock->info.rows, (const char*)pPkData, pPkData == NULL);
}
pBlock->info.rows++;
}
void appendPkToSpecialBlock(SSDataBlock* pBlock, TSKEY* pTsArray, SColumnInfoData* pPkCol, int32_t rowId,
uint64_t* pUid, uint64_t* pGp, void* pTbName) {
void* pVal = NULL;
if (pPkCol) {
pVal = colDataGetData(pPkCol, rowId);
}
appendOneRowToSpecialBlockImpl(pBlock, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pTsArray + rowId, pUid,
pGp, pTbName, pVal);
}
void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp,
void* pTbName) {
appendOneRowToSpecialBlockImpl(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName, NULL);

View File

@ -1262,6 +1262,7 @@
,,y,script,./test.sh -f tsim/stream/streamPrimaryKey0.sim
,,y,script,./test.sh -f tsim/stream/streamPrimaryKey1.sim
,,y,script,./test.sh -f tsim/stream/streamPrimaryKey2.sim
,,y,script,./test.sh -f tsim/stream/streamPrimaryKey3.sim
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndCol0.sim

View File

@ -238,7 +238,57 @@ sql insert into t1 values(1648791210000,1,3,3,1.0);
$loop_count = 0
loop2:
loop6:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3 order by 1,2
sql select * from streamt3 order by 1,2;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
if $rows != 2 then
print =====rows=$rows
goto loop6
endi
if $data01 != 3 then
print =====data01=$data01
goto loop6
endi
if $data02 != 1 then
print =====data02=$data02
goto loop6
endi
if $data11 != 4 then
print =====data11=$data11
goto loop6
endi
if $data12 != 1 then
print =====data12=$data12
goto loop6
endi
sql insert into t1 values(1648791210000,3,5,3,1.0);
sql insert into t1 values(1648791210001,1,3,3,1.0);
sql insert into t1 values(1648791210001,2,4,3,1.0);
sql insert into t1 values(1648791210001,3,5,3,1.0);
$loop_count = 0
loop7:
sleep 200
@ -254,29 +304,65 @@ print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows != 2 then
if $rows != 3 then
print =====rows=$rows
goto loop2
goto loop7
endi
if $data01 != 3 then
print =====data01=$data01
goto loop2
if $data02 != 2 then
print =====data02=$data02
goto loop7
endi
if $data12 != 2 then
print =====data12=$data12
goto loop7
endi
if $data22 != 2 then
print =====data22=$data22
goto loop7
endi
print delete from t1 where ts = 1648791210000;
sql delete from t1 where ts = 1648791210000;
$loop_count = 0
loop8:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 1 select * from streamt3 order by 1,2
sql select * from streamt3 order by 1,2;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
if $rows != 3 then
print =====rows=$rows
goto loop8
endi
if $data02 != 1 then
print =====data02=$data02
goto loop2
endi
if $data11 != 4 then
print =====data11=$data11
goto loop2
goto loop8
endi
if $data12 != 1 then
print =====data12=$data12
goto loop2
goto loop8
endi
if $data22 != 1 then
print =====data22=$data22
goto loop8
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT