fix: insert stable error
This commit is contained in:
parent
0a889b0679
commit
f2e2aa7722
|
@ -792,6 +792,8 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isSt
|
|||
*pMissCache = true;
|
||||
} else if (isStb && TSDB_SUPER_TABLE != (*pTableMeta)->tableType) {
|
||||
code = buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
|
||||
} else if (!isStb && TSDB_SUPER_TABLE == (*pTableMeta)->tableType) {
|
||||
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
|
||||
}
|
||||
}
|
||||
return code;
|
||||
|
@ -1571,16 +1573,16 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
|
|||
|
||||
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
|
||||
|
||||
static int32_t createVnodeModifOpStmt(SParseContext* pCxt, bool reentry, SNode** pOutput) {
|
||||
static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, SNode** pOutput) {
|
||||
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (NULL == pStmt) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (pCxt->pStmtCb) {
|
||||
if (pCxt->pComCxt->pStmtCb) {
|
||||
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
|
||||
}
|
||||
pStmt->pSql = pCxt->pSql;
|
||||
pStmt->pSql = pCxt->pComCxt->pSql;
|
||||
pStmt->freeHashFunc = insDestroyBlockHashmap;
|
||||
pStmt->freeArrayFunc = insDestroyBlockArrayList;
|
||||
|
||||
|
@ -1604,7 +1606,7 @@ static int32_t createVnodeModifOpStmt(SParseContext* pCxt, bool reentry, SNode**
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createInsertQuery(SParseContext* pCxt, SQuery** pOutput) {
|
||||
static int32_t createInsertQuery(SInsertParseContext* pCxt, SQuery** pOutput) {
|
||||
SQuery* pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
||||
if (NULL == pQuery) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1667,11 +1669,15 @@ static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifOpSt
|
|||
sizeof(SVgroupInfo));
|
||||
}
|
||||
|
||||
static int32_t getTableSchemaFromMetaData(const SMetaData* pMetaData, SVnodeModifOpStmt* pStmt, bool isStb) {
|
||||
static int32_t getTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
|
||||
SVnodeModifOpStmt* pStmt, bool isStb) {
|
||||
int32_t code = checkAuthFromMetaData(pMetaData->pUser);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
|
||||
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
|
||||
}
|
||||
|
@ -1696,24 +1702,25 @@ static void clearCatalogReq(SCatalogReq* pCatalogReq) {
|
|||
pCatalogReq->pUser = NULL;
|
||||
}
|
||||
|
||||
static int32_t setVnodeModifOpStmt(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
SVnodeModifOpStmt* pStmt) {
|
||||
clearCatalogReq(pCatalogReq);
|
||||
|
||||
if (pStmt->usingTableProcessing) {
|
||||
return getTableSchemaFromMetaData(pMetaData, pStmt, true);
|
||||
return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, true);
|
||||
}
|
||||
return getTableSchemaFromMetaData(pMetaData, pStmt, false);
|
||||
return getTableSchemaFromMetaData(pCxt, pMetaData, pStmt, false);
|
||||
}
|
||||
|
||||
static int32_t resetVnodeModifOpStmt(SParseContext* pCxt, SQuery* pQuery) {
|
||||
static int32_t resetVnodeModifOpStmt(SInsertParseContext* pCxt, SQuery* pQuery) {
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
|
||||
int32_t code = createVnodeModifOpStmt(pCxt, true, &pQuery->pRoot);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||
|
||||
(*pCxt->pStmtCb->getExecInfoFn)(pCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj, &pStmt->pTableBlockHashObj);
|
||||
(*pCxt->pComCxt->pStmtCb->getExecInfoFn)(pCxt->pComCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj,
|
||||
&pStmt->pTableBlockHashObj);
|
||||
if (NULL == pStmt->pVgroupsHashObj) {
|
||||
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
}
|
||||
|
@ -1729,13 +1736,13 @@ static int32_t resetVnodeModifOpStmt(SParseContext* pCxt, SQuery* pQuery) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t initInsertQuery(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||
SQuery** pQuery) {
|
||||
if (NULL == *pQuery) {
|
||||
return createInsertQuery(pCxt, pQuery);
|
||||
}
|
||||
|
||||
if (NULL != pCxt->pStmtCb) {
|
||||
if (NULL != pCxt->pComCxt->pStmtCb) {
|
||||
return resetVnodeModifOpStmt(pCxt, *pQuery);
|
||||
}
|
||||
|
||||
|
@ -1896,7 +1903,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
|
|||
.usingDuplicateTable = false,
|
||||
};
|
||||
|
||||
int32_t code = initInsertQuery(pCxt, pCatalogReq, pMetaData, pQuery);
|
||||
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = parseInsertSqlImpl(&context, (SVnodeModifOpStmt*)(*pQuery)->pRoot);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue