Merge pull request #27973 from taosdata/fix/TD-32211
fix(stream):set correct primary key column index
This commit is contained in:
commit
85de3f76ac
|
@ -1555,7 +1555,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr,
|
||||
SExprSupp* pTbnameExpr) {
|
||||
SExprSupp* pTbnameExpr, SExprSupp* pResExprSupp, int32_t* pPkColIndex) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI;
|
||||
|
@ -1568,6 +1568,11 @@ int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pPar
|
|||
pScanInfo->partitionSup = *pParSup;
|
||||
pScanInfo->pPartScalarSup = pExpr;
|
||||
pScanInfo->pPartTbnameSup = pTbnameExpr;
|
||||
for (int32_t j = 0; j < pResExprSupp->numOfExprs; j++) {
|
||||
if (pScanInfo->primaryKeyIndex == pResExprSupp->pExprInfo[j].base.pParam[0].pCol->slotId) {
|
||||
*pPkColIndex = j;
|
||||
}
|
||||
}
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate,
|
||||
pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
|
||||
|
@ -1729,7 +1734,8 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
|
|||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
|
||||
|
||||
code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
|
||||
pInfo->basic.primaryPkIndex = -1;
|
||||
code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup, &pOperator->exprSupp, &pInfo->basic.primaryPkIndex);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
|
@ -4078,6 +4078,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
|
||||
SDataType pkType = {0};
|
||||
pInfo->primaryKeyIndex = -1;
|
||||
pInfo->basic.primaryPkIndex = -1;
|
||||
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
||||
pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
||||
QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
|
||||
|
@ -4095,6 +4096,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
}
|
||||
if (id->isPk) {
|
||||
pInfo->primaryKeyIndex = id->dstSlotId;
|
||||
pInfo->basic.primaryPkIndex = id->dstSlotId;
|
||||
pkType = id->dataType;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -541,14 +541,21 @@ void reloadFromDownStream(SOperatorInfo* downstream, SStreamIntervalOperatorInfo
|
|||
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
|
||||
}
|
||||
|
||||
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) {
|
||||
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
|
||||
|
||||
int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo, struct SSteamOpBasicInfo* pBasic) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SStateStore* pAPI = &downstream->pTaskInfo->storageAPI.stateStore;
|
||||
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
|
||||
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
|
||||
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
|
||||
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
|
||||
}
|
||||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
return initIntervalDownStream(downstream->pDownstream[0], type, pInfo);
|
||||
return initIntervalDownStream(downstream->pDownstream[0], type, pInfo, pBasic);
|
||||
}
|
||||
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
|
@ -564,7 +571,9 @@ int32_t initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStream
|
|||
pScanInfo->twAggSup = pInfo->twAggSup;
|
||||
pScanInfo->pState = pInfo->pState;
|
||||
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
|
||||
pInfo->basic.primaryPkIndex = pScanInfo->primaryKeyIndex;
|
||||
if (!hasSrcPrimaryKeyCol(pBasic)) {
|
||||
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1013,8 +1022,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
|
|||
return startPos;
|
||||
}
|
||||
|
||||
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
|
||||
|
||||
static int32_t doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -2036,7 +2043,8 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState);
|
||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||
pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) {
|
||||
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
||||
pInfo->basic.primaryPkIndex = -1;
|
||||
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
@ -2140,6 +2148,7 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup,
|
|||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
|
||||
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->tsColIndex = tsColIndex;
|
||||
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
|
||||
}
|
||||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
|
@ -2157,7 +2166,9 @@ int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup,
|
|||
}
|
||||
pScanInfo->twAggSup = *pTwSup;
|
||||
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
|
||||
pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex;
|
||||
if (!hasSrcPrimaryKeyCol(pBasic)) {
|
||||
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3218,7 +3229,7 @@ int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOp
|
|||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
||||
void* key = taosHashGetKey(pIte, &keyLen);
|
||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||
tlen += encodeSSessionKey(buf, key);
|
||||
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
||||
}
|
||||
|
@ -3866,6 +3877,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||
|
||||
if (downstream) {
|
||||
pInfo->basic.primaryPkIndex = -1;
|
||||
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
|
||||
&pInfo->twAggSup, &pInfo->basic);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -5377,7 +5389,8 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
taosMemoryFree(buff);
|
||||
}
|
||||
|
||||
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo);
|
||||
pInfo->basic.primaryPkIndex = -1;
|
||||
code = initIntervalDownStream(downstream, pPhyNode->type, pInfo, &pInfo->basic);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
|
@ -242,4 +242,75 @@ if $data01 != 2 then
|
|||
goto loop6
|
||||
endi
|
||||
|
||||
print step3=============
|
||||
|
||||
sql create database test3 vgroups 4;
|
||||
sql use test3;
|
||||
sql create table st(ts timestamp, a int primary key, b int , c int, d double) tags(ta varchar(100),tb int,tc int);
|
||||
sql create table t1 using st tags("aa", 1, 2);
|
||||
|
||||
sql create stream streams3_1 trigger at_once ignore expired 0 ignore update 0 into streamt3_1 as select _wstart, a, max(b), count(*), ta from st partition by ta, a interval(10s);
|
||||
sql create stream streams3_2 trigger at_once ignore expired 0 ignore update 0 into streamt3_2 as select _wstart, a, max(b), count(*), ta from st partition by ta, a session(ts, 10s);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(1648791210001,1,2,3,4.1);
|
||||
sql insert into t1 values(1648791210002,2,2,3,1.1);
|
||||
sql insert into t1 values(1648791220000,3,2,3,2.1);
|
||||
sql insert into t1 values(1648791220001,4,2,3,3.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop7:
|
||||
|
||||
print 1 select * from streamt3_1;
|
||||
sql select * from streamt3_1;
|
||||
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
print $data30 $data31 $data32
|
||||
print $data40 $data41 $data42
|
||||
print $data50 $data51 $data52
|
||||
print $data60 $data61 $data62
|
||||
print $data70 $data71 $data72
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
loop8:
|
||||
|
||||
print 1 select * from streamt3_2;
|
||||
sql select * from streamt3_2;
|
||||
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
print $data30 $data31 $data32
|
||||
print $data40 $data41 $data42
|
||||
print $data50 $data51 $data52
|
||||
print $data60 $data61 $data62
|
||||
print $data70 $data71 $data72
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 4 then
|
||||
print =====rows=$rows
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue