[td-11818] fix bug in topic .
This commit is contained in:
parent
d1dce49f14
commit
699b37cc18
|
@ -138,7 +138,7 @@ bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
|
||||||
* @param reqId
|
* @param reqId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
||||||
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
||||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
|
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
|
||||||
/**
|
/**
|
||||||
|
@ -205,7 +205,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
||||||
* @param pGroupInfo the generated result
|
* @param pGroupInfo the generated result
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
|
|
@ -3635,29 +3635,29 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
|
||||||
// return TSDB_CODE_SUCCESS;
|
// return TSDB_CODE_SUCCESS;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
||||||
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
||||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
|
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
|
||||||
STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid);
|
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
|
||||||
if (pTbCfg == NULL) {
|
if (pTbCfg == NULL) {
|
||||||
tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
// tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTbCfg->type != META_SUPER_TABLE) {
|
if (pTbCfg->type != META_SUPER_TABLE) {
|
||||||
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
// tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId);
|
||||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
//NOTE: not add ref count for super table
|
//NOTE: not add ref count for super table
|
||||||
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||||
SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
|
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
|
||||||
|
|
||||||
// no tags and tbname condition, all child tables of this stable are involved
|
// no tags and tbname condition, all child tables of this stable are involved
|
||||||
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
|
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
|
||||||
int32_t ret = getAllTableList(tsdb->pMeta, uid, res);
|
int32_t ret = getAllTableList(pMeta, uid, res);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -3665,8 +3665,8 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
|
||||||
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
|
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
|
||||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
||||||
|
|
||||||
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb,
|
// tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb,
|
||||||
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
|
// pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
|
||||||
|
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -3722,8 +3722,8 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
|
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
|
||||||
STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid);
|
STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid);
|
||||||
if (pTbCfg == NULL) {
|
if (pTbCfg == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -7774,7 +7774,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
|
||||||
} else if (pPhyNode->info.type == OP_StreamScan) {
|
} else if (pPhyNode->info.type == OP_StreamScan) {
|
||||||
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
|
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
|
||||||
STableGroupInfo groupInfo = {0};
|
STableGroupInfo groupInfo = {0};
|
||||||
int32_t code = doCreateTableGroup(readerHandle, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
|
|
||||||
|
int32_t code = doCreateTableGroup(((STqReadHandle*)readerHandle)->pMeta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
|
||||||
|
|
||||||
SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0);
|
SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0);
|
||||||
ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1);
|
ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1);
|
||||||
|
@ -7828,12 +7829,12 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S
|
||||||
return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, taskId);
|
return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doCreateTableGroup(void* readerHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
|
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (tableType == TSDB_SUPER_TABLE) {
|
if (tableType == TSDB_SUPER_TABLE) {
|
||||||
code = tsdbQuerySTableByTagCond(readerHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId);
|
code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId);
|
||||||
} else { // Create one table group.
|
} else { // Create one table group.
|
||||||
code = tsdbGetOneTableGroup(readerHandle, tableUid, 0, pGroupInfo);
|
code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue