enh(query): enable remove the candidate table ids in the stream scanner.
This commit is contained in:
parent
5e819fbdb3
commit
fa84d0585b
|
@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
|
|||
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
|
||||
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
||||
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
|
||||
int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList);
|
||||
|
||||
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
|
||||
bool tqNextDataBlock(STqReadHandle *pHandle);
|
||||
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
|
||||
|
|
|
@ -235,3 +235,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
|
||||
ASSERT(pHandle->tbIdHash != NULL);
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t* pKey = (int64_t*) taosArrayGet(tbUidList, i);
|
||||
taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
|||
return pTaskInfo;
|
||||
}
|
||||
|
||||
static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) {
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
|
||||
// let's discard the tables those are not created according to the queried super table.
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
||||
int64_t* id = (int64_t*)taosArrayGet(tableIdList, i);
|
||||
|
||||
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(mr.me.type == TSDB_CHILD_TABLE);
|
||||
if (mr.me.ctbEntry.suid != pScanInfo->tableUid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayPush(qa, id);
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
return qa;
|
||||
}
|
||||
|
||||
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
|
@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
pInfo = pInfo->pDownstream[0];
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SStreamBlockScanInfo* pScanInfo = pInfo->info;
|
||||
if (isAdd) {
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
||||
int64_t* id = (int64_t*)taosArrayGet(tableIdList, i);
|
||||
|
||||
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(mr.me.type == TSDB_CHILD_TABLE);
|
||||
if (mr.me.ctbEntry.suid != pScanInfo->tableUid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayPush(qa, id);
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
if (isAdd) { // add new table id
|
||||
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
|
||||
|
||||
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||
int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
assert(0);
|
||||
code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
|
||||
taosArrayDestroy(qa);
|
||||
|
||||
} else { // remove the table id in current list
|
||||
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
|
||||
|
||||
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
|
||||
code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, tableIdList);
|
||||
taosArrayDestroy(qa);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) {
|
||||
|
|
Loading…
Reference in New Issue