diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e4ace6b83a..6c52dbaed0 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1555,7 +1555,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) { } int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, - SExprSupp* pTbnameExpr) { + SExprSupp* pTbnameExpr, SExprSupp* pResExprSupp, int32_t* pPkColIndex) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI; @@ -1568,6 +1568,11 @@ int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pPar pScanInfo->partitionSup = *pParSup; pScanInfo->pPartScalarSup = pExpr; pScanInfo->pPartTbnameSup = pTbnameExpr; + for (int32_t j = 0; j < pResExprSupp->numOfExprs; j++) { + if (pScanInfo->primaryKeyIndex == pResExprSupp->pExprInfo[j].base.pParam[0].pCol->slotId) { + *pPkColIndex = j; + } + } if (!pScanInfo->pUpdateInfo) { code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo); @@ -1729,7 +1734,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); - code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup); + pInfo->basic.primaryPkIndex = -1; + code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup, &pOperator->exprSupp, &pInfo->basic.primaryPkIndex); QUERY_CHECK_CODE(code, lino, _error); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b6b5c5484e..233e039f49 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4078,6 +4078,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* SDataType pkType = {0}; pInfo->primaryKeyIndex = -1; + pInfo->basic.primaryPkIndex = -1; int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno); @@ -4095,6 +4096,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } if (id->isPk) { pInfo->primaryKeyIndex = id->dstSlotId; + pInfo->basic.primaryPkIndex = id->dstSlotId; pkType = id->dataType; } } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index b9484decdc..f6d6ae55db 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -541,14 +541,21 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo pInfo->pUpdateInfo = pScanInfo->pUpdateInfo; } -int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { +bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; } + +int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo, struct SSteamOpBasicInfo* pBasic) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore; SExecTaskInfo* pTaskInfo = downstream->pTaskInfo; + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { + SStreamPartitionOperatorInfo* pScanInfo = downstream->info; + pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex; + } + if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - return initIntervalDownStream(downstream->pDownstream[0], type, pInfo); + return initIntervalDownStream(downstream->pDownstream[0], type, pInfo, pBasic); } SStreamScanInfo* pScanInfo = downstream->info; @@ -564,7 +571,9 @@ int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStream pScanInfo->twAggSup = pInfo->twAggSup; pScanInfo->pState = pInfo->pState; pInfo->pUpdateInfo = pScanInfo->pUpdateInfo; - pInfo->basic.primaryPkIndex = pScanInfo->primaryKeyIndex; + if (!hasSrcPrimaryKeyCol(pBasic)) { + pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex; + } _end: if (code != TSDB_CODE_SUCCESS) { @@ -1013,8 +1022,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; } - static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { int32_t code = TSDB_CODE_SUCCESS; @@ -2036,7 +2043,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { - code = initIntervalDownStream(downstream, pPhyNode->type, pInfo); + code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic); QUERY_CHECK_CODE(code, lino, _error); } code = appendDownstream(pOperator, &downstream, 1); @@ -2140,6 +2147,7 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { SStreamPartitionOperatorInfo* pScanInfo = downstream->info; pScanInfo->tsColIndex = tsColIndex; + pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex; } if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -2157,7 +2165,9 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, } pScanInfo->twAggSup = *pTwSup; pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; - pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex; + if (!hasSrcPrimaryKeyCol(pBasic)) { + pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex; + } _end: if (code != TSDB_CODE_SUCCESS) { @@ -3218,7 +3228,7 @@ int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOp size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { - void* key = taosHashGetKey(pIte, &keyLen); + void* key = tSimpleHashGetKey(pIte, &keyLen); tlen += encodeSSessionKey(buf, key); tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); } @@ -5377,7 +5387,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* taosMemoryFree(buff); } - code = initIntervalDownStream(downstream, pPhyNode->type, pInfo); + code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic); QUERY_CHECK_CODE(code, lino, _error); code = appendDownstream(pOperator, &downstream, 1); diff --git a/tests/script/tsim/stream/streamPrimaryKey0.sim b/tests/script/tsim/stream/streamPrimaryKey0.sim index 94f62c4719..dbc1787a14 100644 --- a/tests/script/tsim/stream/streamPrimaryKey0.sim +++ b/tests/script/tsim/stream/streamPrimaryKey0.sim @@ -242,4 +242,73 @@ if $data01 != 2 then goto loop6 endi +print step3============= + +sql create database test3 vgroups 4; +sql use test3; +sql create table st(ts timestamp, a int primary key, b int , c int, d double) tags(ta varchar(100),tb int,tc int); +sql create table t1 using st tags("aa", 1, 2); + +sql create stream streams3_1 trigger at_once ignore expired 0 ignore update 0 into streamt3_1 as select _wstart, a, max(b), count(*), ta from st partition by ta, a interval(10s); +sql create stream streams3_2 trigger at_once ignore expired 0 ignore update 0 into streamt3_2 as select _wstart, a, max(b), count(*), ta from st partition by ta, a session(ts, 10s); + +sql insert into t1 values(1648791210001,1,2,3,4.1); +sql insert into t1 values(1648791210002,2,2,3,1.1); +sql insert into t1 values(1648791220000,3,2,3,2.1); +sql insert into t1 values(1648791220001,4,2,3,3.1); + +$loop_count = 0 + +loop7: + +print 1 select * from streamt3_1; +sql select * from streamt3_1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 +print $data50 $data51 $data52 +print $data60 $data61 $data62 +print $data70 $data71 $data72 + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop7 +endi + +loop8: + +print 1 select * from streamt3_2; +sql select * from streamt3_2; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 +print $data50 $data51 $data52 +print $data60 $data61 $data62 +print $data70 $data71 $data72 + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop8 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file