fix:dispatch create sub table
This commit is contained in:
parent
605b5012ae
commit
88970c88a9
|
@ -498,7 +498,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
taosArrayPush(tagArray, &tagVal);
|
taosArrayPush(tagArray, &tagVal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
|
pCreateTbReq->ctb.tagNum = size;
|
||||||
|
|
||||||
STag* pTag = NULL;
|
STag* pTag = NULL;
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
|
|
|
@ -1013,7 +1013,7 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp*
|
||||||
|
|
||||||
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||||
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
||||||
|
pDestBlock->info.id.groupId = groupId;
|
||||||
pDestBlock->info.rows++;
|
pDestBlock->info.rows++;
|
||||||
blockDataDestroy(pTmpBlock);
|
blockDataDestroy(pTmpBlock);
|
||||||
}
|
}
|
||||||
|
@ -1030,7 +1030,7 @@ static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
|
||||||
blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
|
blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
|
||||||
SSDataBlock* pSrc = pInfo->pInputDataBlock;
|
SSDataBlock* pSrc = pInfo->pInputDataBlock;
|
||||||
|
|
||||||
while (pInfo->pTbNameIte != NULL) {
|
if (pInfo->pTbNameIte != NULL) {
|
||||||
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
|
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
|
||||||
int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
|
int32_t rowId = *(int32_t*)taosArrayGet(pParInfo->rowIds, 0);
|
||||||
appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
|
appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
|
||||||
|
|
|
@ -4778,6 +4778,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
|
printDataBlock(pBlock, "single interval");
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
|
|
|
@ -261,6 +261,21 @@ if $data04 != NULL then
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ===== drop ...
|
||||||
|
|
||||||
|
sql drop stream if exists streams0;
|
||||||
|
sql drop stream if exists streams1;
|
||||||
|
sql drop stream if exists streams2;
|
||||||
|
sql drop stream if exists streams3;
|
||||||
|
sql drop database if exists test;
|
||||||
|
sql drop database if exists test1;
|
||||||
|
sql drop database if exists test2;
|
||||||
|
sql drop database if exists test3;
|
||||||
|
sql drop database if exists result;
|
||||||
|
sql drop database if exists result1;
|
||||||
|
sql drop database if exists result2;
|
||||||
|
sql drop database if exists result3;
|
||||||
|
|
||||||
print ===== step6
|
print ===== step6
|
||||||
|
|
||||||
sql create database result4 vgroups 1;
|
sql create database result4 vgroups 1;
|
||||||
|
|
Loading…
Reference in New Issue