[td-11818] create streamscan operator when no tables created.

This commit is contained in:
Haojun Liao 2022-02-17 15:24:00 +08:00
parent ef3b41e2c7
commit 36aedfbb7c
2 changed files with 23 additions and 11 deletions

View File

@ -215,13 +215,15 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
// pHandle->tbUid = tbUid;
} }
return 0; return 0;
} }

View File

@ -5447,7 +5447,12 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList);
tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
if (code != 0) {
tfree(pInfo);
tfree(pOperator);
return NULL;
}
pInfo->readerHandle = streamReadHandle; pInfo->readerHandle = streamReadHandle;
@ -5462,7 +5467,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
return pOperator; return pOperator;
} }
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
assert(pTableScanInfo != NULL && pDownstream != NULL); assert(pTableScanInfo != NULL && pDownstream != NULL);
@ -8186,16 +8190,22 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
STableGroupInfo groupInfo = {0}; STableGroupInfo groupInfo = {0};
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
SArray* idList = NULL;
SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0); if (groupInfo.numOfTables > 0) {
ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1); SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0);
ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1);
// Transfer the Array of STableKeyInfo into uid list. // Transfer the Array of STableKeyInfo into uid list.
size_t numOfTables = taosArrayGetSize(pa); size_t numOfTables = taosArrayGetSize(pa);
SArray* idList = taosArrayInit(numOfTables, sizeof(uint64_t)); for (int32_t i = 0; i < numOfTables; ++i) {
for(int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pkeyInfo = taosArrayGet(pa, i);
STableKeyInfo* pkeyInfo = taosArrayGet(pa, i); taosArrayPush(idList, &pkeyInfo->uid);
taosArrayPush(idList, &pkeyInfo->uid); }
idList = taosArrayInit(numOfTables, sizeof(uint64_t));
} else {
idList = taosArrayInit(4, sizeof(uint64_t));
} }
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pPhyNode->pTargets, idList, pTaskInfo); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pPhyNode->pTargets, idList, pTaskInfo);