diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ecb022426b..b870c406ec 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList); + int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid, diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 8fbd1e24e1..6ab2b0d917 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -235,3 +235,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { return 0; } + +int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { + ASSERT(pHandle->tbIdHash != NULL); + + for(int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*) taosArrayGet(tbUidList, i); + taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t)); + } + + return 0; +} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fa840e1cd6..6d308d7221 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { return pTaskInfo; } +static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) { + SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + + // let's discard the tables those are not created according to the queried super table. + SMetaReader mr = {0}; + metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); + for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { + int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); + + int32_t code = metaGetTableEntryByUid(&mr, *id); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); + continue; + } + + ASSERT(mr.me.type == TSDB_CHILD_TABLE); + if (mr.me.ctbEntry.suid != pScanInfo->tableUid) { + continue; + } + + taosArrayPush(qa, id); + } + + metaReaderClear(&mr); + return qa; +} + int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; @@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo pInfo = pInfo->pDownstream[0]; } + int32_t code = 0; SStreamBlockScanInfo* pScanInfo = pInfo->info; - if (isAdd) { - SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); - - SMetaReader mr = {0}; - metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); - for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { - int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); - - int32_t code = metaGetTableEntryByUid(&mr, *id); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); - continue; - } - - ASSERT(mr.me.type == TSDB_CHILD_TABLE); - if (mr.me.ctbEntry.suid != pScanInfo->tableUid) { - continue; - } - - taosArrayPush(qa, id); - } - - metaReaderClear(&mr); + if (isAdd) { // add new table id + SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); - int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - assert(0); + code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); + taosArrayDestroy(qa); + + } else { // remove the table id in current list + SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); + + qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); + code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, tableIdList); + taosArrayDestroy(qa); } - return TSDB_CODE_SUCCESS; + return code; } int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) {