fix: [TD-34074] Forbid virtual table in tq and stmt.
This commit is contained in:
parent
eff763dc91
commit
7cffc7aff2
|
@ -560,9 +560,12 @@ This document details the server error codes that may be encountered when using
|
|||
## virtual table
|
||||
|
||||
| Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users |
|
||||
|------------|---------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|
|
||||
|------------|---------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------|
|
||||
| 0x80006200 | Virtual table scan internal error | virtual table scan operator internal error, generally does not occur | Check error logs, contact development for handling |
|
||||
| 0x80006201 | Virtual table scan invalid downstream operator type | The incorrect execution plan generated causes the downstream operator type of the virtual table scan operator to be incorrect. | Check error logs, contact development for handling |
|
||||
| 0x80006202 | Virtual table prim timestamp column should not has ref | The timestamp primary key column of a virtual table should not have a data source. If it does, this error will occur during subsequent queries on the virtual table. | Check error logs, contact development for handling |
|
||||
| 0x80006203 | Create virtual child table must use virtual super table | Create virtual child table using non-virtual super table | create virtual child table using virtual super table |
|
||||
| 0x80006204 | Virtual table not support decimal type | Create virtual table using decimal type | create virtual table without using decimal type |
|
||||
| 0x80006205 | Virtual table not support in STMT query and STMT insert | Use virtual table in stmt query and stmt insert | do not use virtual table in stmt query and insert |
|
||||
| 0x80006206 | Virtual table not support in Topic | Use virtual table in topic | do not use virtual table in topic |
|
||||
| 0x80006206 | Virtual super table query not support origin table from different databases | Virtual super table ‘s child table's origin table from different databases | make sure virtual super table's child table's origin table from same database |
|
||||
|
|
|
@ -579,10 +579,13 @@ description: TDengine 服务端的错误码列表和详细说明
|
|||
## virtual table
|
||||
|
||||
| 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 |
|
||||
|------------|---------------------------------------------------------|------------------------------------------------|----------------------------|
|
||||
|------------|---------------------------------------------------------|------------------------------------------------|-------------------------|
|
||||
| 0x80006200 | Virtual table scan 算子内部错误 | virtual table scan 算子内部逻辑错误,一般不会出现 | 具体查看client端的错误日志提示 |
|
||||
| 0x80006201 | Virtual table scan invalid downstream operator type | 由于生成的执行计划不对,导致 virtual table scan 算子的下游算子类型不正确 | 保留 explain 执行计划,联系开发处理 |
|
||||
| 0x80006202 | Virtual table prim timestamp column should not has ref | 虚拟表的时间戳主键列不应该有数据源,如果有,后续查询虚拟表的时候就会出现该错误 | 检查错误日志,联系开发处理 |
|
||||
| 0x80006203 | Create virtual child table must use virtual super table | 虚拟子表必须建在虚拟超级表下,否则就会出现该错误 | 创建虚拟子表的时候,USING 虚拟超级表 |
|
||||
| 0x80006204 | Virtual table not support decimal type | 虚拟表不支持 decimal 类型 | 创建虚拟表时不使用 decimal 类型的列/tag |
|
||||
| 0x80006205 | Virtual table not support in STMT query and STMT insert | 不支持在 stmt 写入和查询中使用虚拟表 | 不在 stmt 写入和查询中使用虚拟表 |
|
||||
| 0x80006206 | Virtual table not support in Topic | 不支持在订阅中使用虚拟表 | 不在订阅中使用虚拟表 |
|
||||
| 0x80006207 | Virtual super table query not support origin table from different databases | 虚拟超级表不支持子表的数据源来自不同的数据库 | 确保虚拟超级表的子表的数据源都来自同一个数据库 |
|
||||
|
||||
|
|
|
@ -263,6 +263,7 @@ typedef struct SDynQueryCtrlStbJoin {
|
|||
|
||||
typedef struct SDynQueryCtrlVtbScan {
|
||||
bool scanAllCols;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
uint64_t suid;
|
||||
SVgroupsInfo* pVgroupList;
|
||||
} SDynQueryCtrlVtbScan;
|
||||
|
@ -666,6 +667,7 @@ typedef struct SStbJoinDynCtrlBasic {
|
|||
|
||||
typedef struct SVtbScanDynCtrlBasic {
|
||||
bool scanAllCols;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
uint64_t suid;
|
||||
int32_t accountId;
|
||||
SEpSet mgmtEpSet;
|
||||
|
|
|
@ -1072,6 +1072,9 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_VTABLE_PRIMTS_HAS_REF TAOS_DEF_ERROR_CODE(0, 0x6202)
|
||||
#define TSDB_CODE_VTABLE_NOT_VIRTUAL_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x6203)
|
||||
#define TSDB_CODE_VTABLE_NOT_SUPPORT_DATA_TYPE TAOS_DEF_ERROR_CODE(0, 0x6204)
|
||||
#define TSDB_CODE_VTABLE_NOT_SUPPORT_STMT TAOS_DEF_ERROR_CODE(0, 0x6205)
|
||||
#define TSDB_CODE_VTABLE_NOT_SUPPORT_TOPIC TAOS_DEF_ERROR_CODE(0, 0x6206)
|
||||
#define TSDB_CODE_VTABLE_NOT_SUPPORT_CROSS_DB TAOS_DEF_ERROR_CODE(0, 0x6207)
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -76,6 +76,7 @@ typedef struct SStbJoinDynCtrlInfo {
|
|||
|
||||
typedef struct SVtbScanDynCtrlInfo {
|
||||
bool scanAllCols;
|
||||
char* dbName;
|
||||
tsem_t ready;
|
||||
SEpSet epSet;
|
||||
SUseDbRsp* pRsp;
|
||||
|
|
|
@ -100,6 +100,9 @@ void freeUseDbOutput(void* pOutput) {
|
|||
}
|
||||
|
||||
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
|
||||
if (pVtbScan->dbName) {
|
||||
taosMemoryFreeClear(pVtbScan->dbName);
|
||||
}
|
||||
if (pVtbScan->childTableList) {
|
||||
taosArrayDestroy(pVtbScan->childTableList);
|
||||
}
|
||||
|
@ -1136,13 +1139,15 @@ int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
|
||||
QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno);
|
||||
|
||||
QUERY_CHECK_CODE(tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp), lino, _return);
|
||||
code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
|
||||
QUERY_CHECK_CODE(tsem_post(&pScanResInfo->vtbScan.ready), lino, _return);
|
||||
code = tsem_post(&pScanResInfo->vtbScan.ready);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
_return:
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
return code;
|
||||
|
@ -1157,7 +1162,8 @@ static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle,
|
|||
|
||||
pReq = taosMemoryMalloc(sizeof(SUseDbReq));
|
||||
QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
|
||||
QUERY_CHECK_CODE(tNameGetFullDbName(name, pReq->db), lino, _return);
|
||||
code = tNameGetFullDbName(name, pReq->db);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
|
||||
buf1 = taosMemoryCalloc(1, contLen);
|
||||
QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
|
||||
|
@ -1177,11 +1183,14 @@ static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle,
|
|||
pMsgSendInfo->fp = dynProcessUseDbRsp;
|
||||
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
|
||||
|
||||
QUERY_CHECK_CODE(asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo), lino, _return);
|
||||
code = asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
|
||||
QUERY_CHECK_CODE(tsem_wait(&pScanResInfo->vtbScan.ready), lino, _return);
|
||||
code = tsem_wait(&pScanResInfo->vtbScan.ready);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
|
||||
QUERY_CHECK_CODE(queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp), lino, _return);
|
||||
code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
|
||||
_return:
|
||||
if (code) {
|
||||
|
@ -1250,12 +1259,13 @@ int32_t dynHashValueComp(void const* lp, void const* rp) {
|
|||
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
QUERY_CHECK_CODE(dynMakeVgArraySortBy(dbInfo, dynVgInfoComp), lino, _return);
|
||||
code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
|
||||
int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
|
||||
if (vgNum <= 0) {
|
||||
qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
|
||||
QUERY_CHECK_CODE(TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
|
||||
QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
|
||||
}
|
||||
|
||||
SVgroupInfo* vgInfo = NULL;
|
||||
|
@ -1309,8 +1319,10 @@ int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo)
|
|||
|
||||
if (find == NULL) {
|
||||
output = taosMemoryMalloc(sizeof(SUseDbOutput));
|
||||
QUERY_CHECK_CODE(buildDbVgInfoMap(pOperator, pHandle, name, pTaskInfo, output), line, _return);
|
||||
QUERY_CHECK_CODE(taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES), line, _return);
|
||||
code = buildDbVgInfoMap(pOperator, pHandle, name, pTaskInfo, output);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
} else {
|
||||
output = *find;
|
||||
}
|
||||
|
@ -1357,12 +1369,14 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
|
|||
|
||||
while (true) {
|
||||
if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
|
||||
QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes), line, _return);
|
||||
code = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
} else {
|
||||
uint64_t* id = taosArrayGet(pVtbScan->childTableList, pVtbScan->curTableIdx);
|
||||
QUERY_CHECK_NULL(id, code, line, _return, terrno);
|
||||
pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
|
||||
QUERY_CHECK_CODE(pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id), line, _return);
|
||||
code = pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
|
||||
for (int32_t j = 0; j < mr.me.colRef.nCols; j++) {
|
||||
if (mr.me.colRef.pColRef[j].hasRef && colNeedScan(pOperator, mr.me.colRef.pColRef[j].id)) {
|
||||
|
@ -1370,15 +1384,22 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
|
|||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
char orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
|
||||
if (strncmp(mr.me.colRef.pColRef[j].refDbName, pVtbScan->dbName, strlen(pVtbScan->dbName)) != 0) {
|
||||
QUERY_CHECK_CODE(code = TSDB_CODE_VTABLE_NOT_SUPPORT_CROSS_DB, line, _return);
|
||||
}
|
||||
toName(pInfo->vtbScan.acctId, mr.me.colRef.pColRef[j].refDbName, mr.me.colRef.pColRef[j].refTableName, &name);
|
||||
QUERY_CHECK_CODE(getDbVgInfo(pOperator, &name, &dbVgInfo), line, _return);
|
||||
QUERY_CHECK_CODE(tNameGetFullDbName(&name, dbFname), line, _return);
|
||||
QUERY_CHECK_CODE(tNameGetFullTableName(&name, orgTbFName), line, _return);
|
||||
code = getDbVgInfo(pOperator, &name, &dbVgInfo);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
tNameGetFullTableName(&name, orgTbFName);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
|
||||
void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
|
||||
if (!pVal) {
|
||||
SOrgTbInfo map = {0};
|
||||
QUERY_CHECK_CODE(getVgId(dbVgInfo, dbFname, &map.vgId, name.tname), line, _return);
|
||||
code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
|
||||
map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
|
||||
QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno);
|
||||
|
@ -1386,7 +1407,8 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
|
|||
colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
|
||||
tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
|
||||
QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno);
|
||||
QUERY_CHECK_CODE(taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map)), line, _return);
|
||||
code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
} else {
|
||||
SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
|
||||
SColIdNameKV colIdNameKV = {0};
|
||||
|
@ -1398,13 +1420,15 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
|
|||
}
|
||||
|
||||
pVtbScan->vtbScanParam = NULL;
|
||||
QUERY_CHECK_CODE(buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, *id), line, _return);
|
||||
code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, *id);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
|
||||
void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
|
||||
while (pIter != NULL) {
|
||||
SOrgTbInfo* pMap = (SOrgTbInfo*)pIter;
|
||||
SOperatorParam* pExchangeParam = NULL;
|
||||
QUERY_CHECK_CODE(buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap), line, _return);
|
||||
code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno);
|
||||
pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
|
||||
}
|
||||
|
@ -1412,7 +1436,8 @@ int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
|
|||
|
||||
// reset downstream operator's status
|
||||
pOperator->pDownstream[0]->status = OP_NOT_OPENED;
|
||||
QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes), line, _return);
|
||||
code = pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
}
|
||||
|
||||
if (*pRes) {
|
||||
|
@ -1478,7 +1503,8 @@ static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorIn
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t line = 0;
|
||||
|
||||
QUERY_CHECK_CODE(tsem_init(&pInfo->vtbScan.ready, 0, 0), line, _return);
|
||||
code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
|
||||
pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
|
||||
pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
|
||||
|
@ -1487,6 +1513,8 @@ static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorIn
|
|||
pInfo->vtbScan.readHandle = *pHandle;
|
||||
pInfo->vtbScan.curTableIdx = 0;
|
||||
pInfo->vtbScan.lastTableIdx = -1;
|
||||
pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
|
||||
QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno);
|
||||
|
||||
pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
|
||||
QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno);
|
||||
|
@ -1499,7 +1527,8 @@ static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorIn
|
|||
|
||||
pInfo->vtbScan.childTableList = taosArrayInit(10, sizeof(uint64_t));
|
||||
QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno);
|
||||
QUERY_CHECK_CODE(pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList), line, _return);
|
||||
code = pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList);
|
||||
QUERY_CHECK_CODE(code, line, _return);
|
||||
|
||||
pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno);
|
||||
|
@ -1518,6 +1547,7 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO
|
|||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t line = 0;
|
||||
__optr_fn_t nextFp = NULL;
|
||||
SOperatorInfo* pOperator = NULL;
|
||||
SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
|
||||
|
@ -1554,7 +1584,8 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO
|
|||
nextFp = seqStableJoin;
|
||||
break;
|
||||
case DYN_QTYPE_VTB_SCAN:
|
||||
QUERY_CHECK_CODE(initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo), code, _error);
|
||||
code = initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo);
|
||||
QUERY_CHECK_CODE(code, line, _error);
|
||||
nextFp = vtbScan;
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -1239,7 +1239,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
|
|||
freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
|
||||
pOperator->pOperatorGetParam = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
|
||||
|
|
|
@ -1237,11 +1237,13 @@ static int32_t createVTableScanInfoFromParam(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pAPI->metaReaderFn.initReader(&orgTable, pInfo->base.readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
||||
QUERY_CHECK_CODE(pAPI->metaReaderFn.getTableEntryByName(&orgTable, strstr(pParam->pOrgTbInfo->tbName, ".") + 1), lino, _return);
|
||||
code = pAPI->metaReaderFn.getTableEntryByName(&orgTable, strstr(pParam->pOrgTbInfo->tbName, ".") + 1);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
switch (orgTable.me.type) {
|
||||
case TSDB_CHILD_TABLE:
|
||||
pAPI->metaReaderFn.initReader(&superTable, pInfo->base.readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
||||
QUERY_CHECK_CODE(pAPI->metaReaderFn.getTableEntryByUid(&superTable, orgTable.me.ctbEntry.suid), lino, _return);
|
||||
code = pAPI->metaReaderFn.getTableEntryByUid(&superTable, orgTable.me.ctbEntry.suid);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
schema = &superTable.me.stbEntry.schemaRow;
|
||||
break;
|
||||
case TSDB_NORMAL_TABLE:
|
||||
|
@ -1289,8 +1291,10 @@ static int32_t createVTableScanInfoFromParam(SOperatorInfo* pOperator) {
|
|||
blockDataDestroy(pInfo->pResBlock);
|
||||
pInfo->pResBlock = NULL;
|
||||
}
|
||||
QUERY_CHECK_CODE(createOneDataBlockWithColArray(pInfo->pOrgBlock, pBlockColArray, &pInfo->pResBlock), lino, _return);
|
||||
QUERY_CHECK_CODE(initQueryTableDataCondWithColArray(&pInfo->base.cond, &pInfo->base.orgCond, &pInfo->base.readHandle, pColArray), lino, _return);
|
||||
code = createOneDataBlockWithColArray(pInfo->pOrgBlock, pBlockColArray, &pInfo->pResBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
code = initQueryTableDataCondWithColArray(&pInfo->base.cond, &pInfo->base.orgCond, &pInfo->base.readHandle, pColArray);
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
pInfo->base.cond.twindows.skey = pParam->window.ekey + 1;
|
||||
pInfo->base.cond.suid = orgTable.me.type == TSDB_CHILD_TABLE ? superTable.me.uid : 0;
|
||||
pInfo->currentGroupId = 0;
|
||||
|
@ -1304,7 +1308,8 @@ static int32_t createVTableScanInfoFromParam(SOperatorInfo* pOperator) {
|
|||
uint64_t pUid = orgTable.me.uid;
|
||||
STableKeyInfo info = {.groupId = 0, .uid = pUid};
|
||||
int32_t tableIdx = 0;
|
||||
QUERY_CHECK_CODE(taosHashPut(pListInfo->map, &pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t)), lino, _return);
|
||||
code = taosHashPut(pListInfo->map, &pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t));
|
||||
QUERY_CHECK_CODE(code, lino, _return);
|
||||
QUERY_CHECK_NULL(taosArrayPush(pListInfo->pTableList, &info), code, lino, _return, terrno);
|
||||
qDebug("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo));
|
||||
|
||||
|
@ -1470,12 +1475,14 @@ int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
|
||||
SSDataBlock* result = NULL;
|
||||
while (true) {
|
||||
QUERY_CHECK_CODE(startNextGroupScan(pOperator, &result), lino, _end);
|
||||
code = startNextGroupScan(pOperator, &result);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (result || pOperator->status == OP_EXEC_DONE) {
|
||||
SSDataBlock* res = NULL;
|
||||
if (result) {
|
||||
QUERY_CHECK_CODE(createOneDataBlockWithTwoBlock(result, pInfo->pOrgBlock, &res), lino, _end);
|
||||
code = createOneDataBlockWithTwoBlock(result, pInfo->pOrgBlock, &res);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pInfo->pResBlock = res;
|
||||
blockDataDestroy(result);
|
||||
}
|
||||
|
|
|
@ -773,6 +773,7 @@ static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQue
|
|||
COPY_OBJECT_FIELD(stbJoin.srcScan, sizeof(pDst->stbJoin.srcScan));
|
||||
COPY_SCALAR_FIELD(vtbScan.scanAllCols);
|
||||
COPY_SCALAR_FIELD(vtbScan.suid);
|
||||
COPY_CHAR_ARRAY_FIELD(vtbScan.dbName);
|
||||
CLONE_OBJECT_FIELD(vtbScan.pVgroupList, vgroupsInfoClone);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -4386,6 +4386,7 @@ enum {
|
|||
PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_SRC_SCAN1,
|
||||
PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SCAN_ALL_COLS,
|
||||
PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SUID,
|
||||
PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_DBNAME,
|
||||
PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_ACCOUNT_ID,
|
||||
PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_EP_SET,
|
||||
PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SCAN_COLS,
|
||||
|
@ -4427,6 +4428,9 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeU64(pEncoder, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SUID, pNode->vtbScan.suid);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeCStr(pEncoder, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_DBNAME, pNode->vtbScan.dbName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_ACCOUNT_ID, pNode->vtbScan.accountId);
|
||||
}
|
||||
|
@ -4485,6 +4489,9 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_SUID:
|
||||
code = tlvDecodeU64(pTlv, &pNode->vtbScan.suid);
|
||||
break;
|
||||
case PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_DBNAME:
|
||||
code = tlvDecodeCStr(pTlv, pNode->vtbScan.dbName, sizeof(pNode->vtbScan.dbName));
|
||||
break;
|
||||
case PHY_DYN_QUERY_CTRL_CODE_VTB_SCAN_ACCOUNT_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->vtbScan.accountId);
|
||||
break;
|
||||
|
|
|
@ -1206,6 +1206,10 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMet
|
|||
*pMissCache = true;
|
||||
} else if (bUsingTable && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) {
|
||||
code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
|
||||
} else if (((*pTableMeta)->virtualStb) ||
|
||||
TSDB_VIRTUAL_CHILD_TABLE == (*pTableMeta)->tableType ||
|
||||
TSDB_VIRTUAL_NORMAL_TABLE == (*pTableMeta)->tableType) {
|
||||
code = TSDB_CODE_VTABLE_NOT_SUPPORT_STMT;
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
@ -1341,6 +1345,8 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
|||
code = getTableMeta(pCxt, &pStmt->usingTableName, &pStableMeta, &pCxt->missCache, bUsingTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = taosHashPut(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName), &pStableMeta, POINTER_BYTES);
|
||||
} else {
|
||||
taosMemoryFreeClear(pStableMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5479,6 +5479,15 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, bool inJoin) {
|
|||
code = TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
break;
|
||||
}
|
||||
|
||||
if (pCxt->pParseCxt->isStmtBind) {
|
||||
code = TSDB_CODE_VTABLE_NOT_SUPPORT_STMT;
|
||||
break;
|
||||
}
|
||||
if (pCxt->pParseCxt->topicQuery) {
|
||||
code = TSDB_CODE_VTABLE_NOT_SUPPORT_TOPIC;
|
||||
break;
|
||||
}
|
||||
PAR_ERR_RET(translateVirtualTable(pCxt, pTable, &name));
|
||||
SVirtualTableNode *pVirtualTable = (SVirtualTableNode*)*pTable;
|
||||
pVirtualTable->table.singleTable = true;
|
||||
|
@ -8566,7 +8575,7 @@ static int32_t translateInsertTable(STranslateContext* pCxt, SNode** pTable) {
|
|||
int32_t code = translateFrom(pCxt, pTable);
|
||||
if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)*pTable)->pMeta->tableType &&
|
||||
TSDB_NORMAL_TABLE != ((SRealTableNode*)*pTable)->pMeta->tableType) {
|
||||
code = buildInvalidOperationMsg(&pCxt->msgBuf, "insert data into super table is not supported");
|
||||
code = buildInvalidOperationMsg(&pCxt->msgBuf, "insert data into super table or virtual table is not supported");
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -11734,6 +11743,14 @@ static int32_t buildQueryForTableTopic(STranslateContext* pCxt, SCreateTopicStmt
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool isVirtualTable(int8_t tableType) {
|
||||
if (tableType == TSDB_VIRTUAL_CHILD_TABLE || tableType == TSDB_VIRTUAL_NORMAL_TABLE) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
|
||||
if (NULL == pStmt->pQuery && NULL == pStmt->pWhere) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -12010,16 +12027,6 @@ static bool crossTableWithUdaf(SSelectStmt* pSelect) {
|
|||
!hasTbnameFunction(pSelect->pPartitionByList);
|
||||
}
|
||||
|
||||
|
||||
static bool isVirtualTable(int8_t tableType) {
|
||||
if (tableType == TSDB_VIRTUAL_CHILD_TABLE || tableType == TSDB_VIRTUAL_NORMAL_TABLE) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||
if (NULL == pStmt->pQuery) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -1023,6 +1023,7 @@ static int32_t createVirtualSuperTableLogicNode(SLogicPlanContext* pCxt, SSelect
|
|||
pDynCtrl->qType = DYN_QTYPE_VTB_SCAN;
|
||||
pDynCtrl->vtbScan.scanAllCols = pVtableScan->scanAllCols;
|
||||
pDynCtrl->vtbScan.suid = pVtableScan->stableId;
|
||||
tstrncpy(pDynCtrl->vtbScan.dbName, pVtableScan->tableName.dbname, TSDB_DB_NAME_LEN);
|
||||
PLAN_ERR_JRET(nodesListMakeStrictAppend(&pDynCtrl->node.pChildren, (SNode*)pVtableScan));
|
||||
PLAN_ERR_JRET(nodesCloneList(pVtableScan->node.pTargets, &pDynCtrl->node.pTargets));
|
||||
TSWAP(pVtableScan->pVgroupList, pDynCtrl->vtbScan.pVgroupList);
|
||||
|
|
|
@ -7861,6 +7861,9 @@ static int32_t findDepTableScanNode(SColumnNode* pCol, SVirtualScanLogicNode *pV
|
|||
FOREACH(pScanCol, pScanNode->pScanCols) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pScanCol)) {
|
||||
SColumnNode *pScanColNode = (SColumnNode *)pScanCol;
|
||||
if (pScanColNode->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
continue;
|
||||
}
|
||||
if (pScanColNode->hasDep && pCol->hasRef) {
|
||||
if (strcmp(pScanColNode->dbName, pCol->refDbName) == 0 &&
|
||||
strcmp(pScanColNode->tableAlias, pCol->refTableName) == 0 &&
|
||||
|
|
|
@ -1919,6 +1919,7 @@ static int32_t updateDynQueryCtrlVtbScanInfo(SPhysiPlanContext* pCxt, SNodeList*
|
|||
pDynCtrl->vtbScan.suid = pLogicNode->vtbScan.suid;
|
||||
pDynCtrl->vtbScan.mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
pDynCtrl->vtbScan.accountId = pCxt->pPlanCxt->acctId;
|
||||
tstrncpy(pDynCtrl->vtbScan.dbName, pLogicNode->vtbScan.dbName, TSDB_DB_NAME_LEN);
|
||||
|
||||
return code;
|
||||
_return:
|
||||
|
|
|
@ -910,6 +910,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_INVALID_DOWNSTREAM, "Virtual table scan
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_PRIMTS_HAS_REF, "Virtual table prim timestamp column should not has ref column")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_VIRTUAL_SUPER_TABLE, "Create virtual child table must use virtual super table")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_DATA_TYPE, "Virtual table not support decimal type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_STMT, "Virtual table not support in STMT query and STMT insert")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_TOPIC, "Virtual table not support in topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_NOT_SUPPORT_CROSS_DB, "Virtual super table query not support origin table from different databases")
|
||||
#ifdef TAOS_ERROR_C
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -154,6 +154,129 @@ class TDTestCase:
|
|||
conn.close()
|
||||
raise err
|
||||
|
||||
def test_stmt_insert_vtb_error(self,conn):
|
||||
# type: (TaosConnection) -> None
|
||||
|
||||
dbname = "pytest_taos_stmt_vtb_error"
|
||||
try:
|
||||
conn.execute("drop database if exists %s" % dbname)
|
||||
conn.execute("create database if not exists %s" % dbname)
|
||||
conn.select_db(dbname)
|
||||
|
||||
conn.execute(
|
||||
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
|
||||
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
|
||||
ff float, dd double, bb binary(65059), nn nchar(100), tt timestamp)",
|
||||
)
|
||||
|
||||
conn.execute(
|
||||
"create vtable if not exists log_v(ts timestamp, bo bool from pytest_taos_stmt_vtb_error.log.bo, "
|
||||
"nil tinyint, ti tinyint, si smallint, ii int,\
|
||||
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
|
||||
ff float, dd double, bb binary(65059), nn nchar(100), tt timestamp)",
|
||||
)
|
||||
conn.load_table_info("log_v")
|
||||
|
||||
|
||||
stmt = conn.statement("insert into log_v values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
|
||||
params = new_bind_params(16)
|
||||
params[0].timestamp(1626861392589, PrecisionEnum.Milliseconds)
|
||||
params[1].bool(True)
|
||||
params[2].tinyint(None)
|
||||
params[3].tinyint(2)
|
||||
params[4].smallint(3)
|
||||
params[5].int(4)
|
||||
params[6].bigint(5)
|
||||
params[7].tinyint_unsigned(6)
|
||||
params[8].smallint_unsigned(7)
|
||||
params[9].int_unsigned(8)
|
||||
params[10].bigint_unsigned(9)
|
||||
params[11].float(10.1)
|
||||
params[12].double(10.11)
|
||||
binaryStr = '123456789'
|
||||
for i in range(1301):
|
||||
binaryStr += "1234567890abcdefghij1234567890abcdefghij12345hello"
|
||||
params[13].binary(binaryStr)
|
||||
params[14].nchar("stmt")
|
||||
params[15].timestamp(1626861392589, PrecisionEnum.Milliseconds)
|
||||
|
||||
stmt.bind_param(params)
|
||||
stmt.execute()
|
||||
|
||||
assert stmt.affected_rows == 1
|
||||
stmt.close()
|
||||
|
||||
querystmt=conn.statement("select ?, bo, nil, ti, si, ii,bi, tu, su, iu, bu, ff, dd, bb, nn, tt from log")
|
||||
queryparam=new_bind_params(1)
|
||||
print(type(queryparam))
|
||||
queryparam[0].binary("ts")
|
||||
querystmt.bind_param(queryparam)
|
||||
querystmt.execute()
|
||||
result=querystmt.use_result()
|
||||
|
||||
row=result.fetch_all()
|
||||
print(row)
|
||||
|
||||
assert row[0][1] == True
|
||||
assert row[0][2] == None
|
||||
for i in range(3, 10):
|
||||
assert row[0][i] == i - 1
|
||||
#float == may not work as expected
|
||||
# assert row[0][11] == c_float(10.1)
|
||||
assert row[0][12] == 10.11
|
||||
assert row[0][13][65054:] == "hello"
|
||||
assert row[0][14] == "stmt"
|
||||
|
||||
conn.execute("drop database if exists %s" % dbname)
|
||||
conn.close()
|
||||
|
||||
except Exception as err:
|
||||
conn.execute("drop database if exists %s" % dbname)
|
||||
conn.close()
|
||||
raise err
|
||||
|
||||
def test_stmt_insert_vstb_error(self,conn):
|
||||
|
||||
dbname = "pytest_taos_stmt_vstb_error"
|
||||
try:
|
||||
conn.execute("drop database if exists %s" % dbname)
|
||||
conn.execute("create database if not exists %s" % dbname)
|
||||
conn.execute("alter database %s keep 36500" % dbname)
|
||||
conn.select_db(dbname)
|
||||
|
||||
conn.execute("create stable STB_v(ts timestamp, n int) tags(b int) virtual 1")
|
||||
|
||||
stmt = conn.statement("insert into ? using STB_v tags(?) values(?, ?)")
|
||||
params = new_bind_params(1)
|
||||
params[0].int(4);
|
||||
stmt.set_tbname_tags("ct", params);
|
||||
|
||||
multi_params = new_multi_binds(2);
|
||||
multi_params[0].timestamp([9223372036854775808])
|
||||
multi_params[1].int([123])
|
||||
stmt.bind_param_batch(multi_params)
|
||||
|
||||
stmt.execute()
|
||||
result = stmt.use_result()
|
||||
|
||||
result.close()
|
||||
stmt.close()
|
||||
|
||||
stmt = conn.statement("select * from STB")
|
||||
stmt.execute()
|
||||
result = stmt.use_result()
|
||||
print(result.affected_rows)
|
||||
row = result.next()
|
||||
print(row)
|
||||
|
||||
result.close()
|
||||
stmt.close()
|
||||
conn.close()
|
||||
|
||||
except Exception as err:
|
||||
conn.close()
|
||||
raise err
|
||||
|
||||
def test_stmt_insert_error_null_timestamp(self,conn):
|
||||
|
||||
dbname = "pytest_taos_stmt_error_null_ts"
|
||||
|
@ -270,6 +393,24 @@ class TDTestCase:
|
|||
tdLog.info('=========stmt error occured for bind part column(NULL Timestamp) ==============')
|
||||
else:
|
||||
tdLog.exit("expect error(%s) not occured - 2" % str(error))
|
||||
|
||||
try:
|
||||
self.test_stmt_insert_vtb_error(self.conn())
|
||||
except Exception as error :
|
||||
|
||||
if str(error)=='[0x6205]: Virtual table not support in STMT query and STMT insert':
|
||||
tdLog.info('=========stmt error occured for bind part column ==============')
|
||||
else:
|
||||
tdLog.exit("expect error(%s) not occured" % str(error))
|
||||
|
||||
try:
|
||||
self.test_stmt_insert_vstb_error(self.conn())
|
||||
except Exception as error :
|
||||
|
||||
if str(error)=='[0x6205]: Virtual table not support in STMT query and STMT insert':
|
||||
tdLog.info('=========stmt error occured for bind part column ==============')
|
||||
else:
|
||||
tdLog.exit("expect error(%s) not occured" % str(error))
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
|
Loading…
Reference in New Issue