fix(stream):set correct primary key column index

This commit is contained in:
54liuyao 2024-09-19 18:30:31 +08:00
parent 911e70d53a
commit f71b14da13
4 changed files with 98 additions and 11 deletions

View File

@ -1555,7 +1555,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
}
int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr,
SExprSupp* pTbnameExpr) {
SExprSupp* pTbnameExpr, SExprSupp* pResExprSupp, int32_t* pPkColIndex) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI;
@ -1568,6 +1568,11 @@ int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pPar
pScanInfo->partitionSup = *pParSup;
pScanInfo->pPartScalarSup = pExpr;
pScanInfo->pPartTbnameSup = pTbnameExpr;
for (int32_t j = 0; j < pResExprSupp->numOfExprs; j++) {
if (pScanInfo->primaryKeyIndex == pResExprSupp->pExprInfo[j].base.pParam[0].pCol->slotId) {
*pPkColIndex = j;
}
}
if (!pScanInfo->pUpdateInfo) {
code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate,
pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
@ -1729,7 +1734,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
pInfo->basic.primaryPkIndex = -1;
code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup, &pOperator->exprSupp, &pInfo->basic.primaryPkIndex);
QUERY_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);

View File

@ -4078,6 +4078,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
SDataType pkType = {0};
pInfo->primaryKeyIndex = -1;
pInfo->basic.primaryPkIndex = -1;
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
@ -4095,6 +4096,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
}
if (id->isPk) {
pInfo->primaryKeyIndex = id->dstSlotId;
pInfo->basic.primaryPkIndex = id->dstSlotId;
pkType = id->dataType;
}
}

View File

@ -541,14 +541,21 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
}
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo, struct SSteamOpBasicInfo* pBasic) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
}
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
return initIntervalDownStream(downstream->pDownstream[0], type, pInfo);
return initIntervalDownStream(downstream->pDownstream[0], type, pInfo, pBasic);
}
SStreamScanInfo* pScanInfo = downstream->info;
@ -564,7 +571,9 @@ int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStream
pScanInfo->twAggSup = pInfo->twAggSup;
pScanInfo->pState = pInfo->pState;
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
pInfo->basic.primaryPkIndex = pScanInfo->primaryKeyIndex;
if (!hasSrcPrimaryKeyCol(pBasic)) {
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -1013,8 +1022,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return startPos;
}
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
int32_t code = TSDB_CODE_SUCCESS;
@ -2036,7 +2043,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo);
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic);
QUERY_CHECK_CODE(code, lino, _error);
}
code = appendDownstream(pOperator, &downstream, 1);
@ -2140,6 +2147,7 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup,
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
pScanInfo->tsColIndex = tsColIndex;
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
}
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
@ -2157,7 +2165,9 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup,
}
pScanInfo->twAggSup = *pTwSup;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex;
if (!hasSrcPrimaryKeyCol(pBasic)) {
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
@ -3218,7 +3228,7 @@ int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOp
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
void* key = tSimpleHashGetKey(pIte, &keyLen);
tlen += encodeSSessionKey(buf, key);
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
}
@ -5377,7 +5387,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
taosMemoryFree(buff);
}
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo);
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic);
QUERY_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);

View File

@ -242,4 +242,73 @@ if $data01 != 2 then
goto loop6
endi
print step3=============
sql create database test3 vgroups 4;
sql use test3;
sql create table st(ts timestamp, a int primary key, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
sql create table t1 using st tags("aa", 1, 2);
sql create stream streams3_1 trigger at_once ignore expired 0 ignore update 0 into streamt3_1 as select _wstart, a, max(b), count(*), ta from st partition by ta, a interval(10s);
sql create stream streams3_2 trigger at_once ignore expired 0 ignore update 0 into streamt3_2 as select _wstart, a, max(b), count(*), ta from st partition by ta, a session(ts, 10s);
sql insert into t1 values(1648791210001,1,2,3,4.1);
sql insert into t1 values(1648791210002,2,2,3,1.1);
sql insert into t1 values(1648791220000,3,2,3,2.1);
sql insert into t1 values(1648791220001,4,2,3,3.1);
$loop_count = 0
loop7:
print 1 select * from streamt3_1;
sql select * from streamt3_1;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
print $data50 $data51 $data52
print $data60 $data61 $data62
print $data70 $data71 $data72
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop7
endi
loop8:
print 1 select * from streamt3_2;
sql select * from streamt3_2;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
print $data50 $data51 $data52
print $data60 $data61 $data62
print $data70 $data71 $data72
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop8
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT