enh: insert optimize
This commit is contained in:
parent
b7bc828172
commit
758b4b93a4
|
@ -1454,6 +1454,7 @@ static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt
|
||||||
tdDestroySVCreateTbReq(&pStmt->createTblReq);
|
tdDestroySVCreateTbReq(&pStmt->createTblReq);
|
||||||
pCxt->missCache = false;
|
pCxt->missCache = false;
|
||||||
pCxt->usingDuplicateTable = false;
|
pCxt->usingDuplicateTable = false;
|
||||||
|
pStmt->pBoundCols = NULL;
|
||||||
pStmt->usingTableProcessing = false;
|
pStmt->usingTableProcessing = false;
|
||||||
pStmt->fileProcessing = false;
|
pStmt->fileProcessing = false;
|
||||||
}
|
}
|
||||||
|
@ -1569,7 +1570,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
|
||||||
|
|
||||||
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
|
static void destroySubTableHashElem(void* p) { taosMemoryFree(*(STableMeta**)p); }
|
||||||
|
|
||||||
static int32_t createVnodeModifOpStmt(SParseContext* pCxt, SNode** pOutput) {
|
static int32_t createVnodeModifOpStmt(SParseContext* pCxt, bool reentry, SNode** pOutput) {
|
||||||
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||||
if (NULL == pStmt) {
|
if (NULL == pStmt) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1582,13 +1583,16 @@ static int32_t createVnodeModifOpStmt(SParseContext* pCxt, SNode** pOutput) {
|
||||||
pStmt->freeHashFunc = insDestroyBlockHashmap;
|
pStmt->freeHashFunc = insDestroyBlockHashmap;
|
||||||
pStmt->freeArrayFunc = insDestroyBlockArrayList;
|
pStmt->freeArrayFunc = insDestroyBlockArrayList;
|
||||||
|
|
||||||
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
if (!reentry) {
|
||||||
pStmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
pStmt->pTableBlockHashObj =
|
||||||
|
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
|
}
|
||||||
pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
if (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj || NULL == pStmt->pSubTableHashObj ||
|
if ((!reentry && (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj)) ||
|
||||||
NULL == pStmt->pTableNameHashObj || NULL == pStmt->pDbFNameHashObj) {
|
NULL == pStmt->pSubTableHashObj || NULL == pStmt->pTableNameHashObj || NULL == pStmt->pDbFNameHashObj) {
|
||||||
nodesDestroyNode((SNode*)pStmt);
|
nodesDestroyNode((SNode*)pStmt);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -1609,7 +1613,7 @@ static int32_t createInsertQuery(SParseContext* pCxt, SQuery** pOutput) {
|
||||||
pQuery->haveResultSet = false;
|
pQuery->haveResultSet = false;
|
||||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
|
|
||||||
int32_t code = createVnodeModifOpStmt(pCxt, &pQuery->pRoot);
|
int32_t code = createVnodeModifOpStmt(pCxt, false, &pQuery->pRoot);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*pOutput = pQuery;
|
*pOutput = pQuery;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1679,6 +1683,10 @@ static void destoryTablesReq(void* p) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clearCatalogReq(SCatalogReq* pCatalogReq) {
|
static void clearCatalogReq(SCatalogReq* pCatalogReq) {
|
||||||
|
if (NULL == pCatalogReq) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
|
taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq);
|
||||||
pCatalogReq->pTableMeta = NULL;
|
pCatalogReq->pTableMeta = NULL;
|
||||||
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
|
taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq);
|
||||||
|
@ -1691,7 +1699,19 @@ static int32_t setVnodeModifOpStmt(SParseContext* pCxt, SCatalogReq* pCatalogReq
|
||||||
SVnodeModifOpStmt* pStmt) {
|
SVnodeModifOpStmt* pStmt) {
|
||||||
clearCatalogReq(pCatalogReq);
|
clearCatalogReq(pCatalogReq);
|
||||||
|
|
||||||
if (pCxt->pStmtCb) {
|
if (pStmt->usingTableProcessing) {
|
||||||
|
return getTableSchemaFromMetaData(pMetaData, pStmt, true);
|
||||||
|
}
|
||||||
|
return getTableSchemaFromMetaData(pMetaData, pStmt, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t resetVnodeModifOpStmt(SParseContext* pCxt, SQuery* pQuery) {
|
||||||
|
nodesDestroyNode(pQuery->pRoot);
|
||||||
|
|
||||||
|
int32_t code = createVnodeModifOpStmt(pCxt, true, &pQuery->pRoot);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||||
|
|
||||||
(*pCxt->pStmtCb->getExecInfoFn)(pCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj, &pStmt->pTableBlockHashObj);
|
(*pCxt->pStmtCb->getExecInfoFn)(pCxt->pStmtCb->pStmt, &pStmt->pVgroupsHashObj, &pStmt->pTableBlockHashObj);
|
||||||
if (NULL == pStmt->pVgroupsHashObj) {
|
if (NULL == pStmt->pVgroupsHashObj) {
|
||||||
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
@ -1700,14 +1720,12 @@ static int32_t setVnodeModifOpStmt(SParseContext* pCxt, SCatalogReq* pCatalogReq
|
||||||
pStmt->pTableBlockHashObj =
|
pStmt->pTableBlockHashObj =
|
||||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
|
if (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj) {
|
||||||
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStmt->usingTableProcessing) {
|
return code;
|
||||||
return getTableSchemaFromMetaData(pMetaData, pStmt, true);
|
|
||||||
}
|
|
||||||
return getTableSchemaFromMetaData(pMetaData, pStmt, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initInsertQuery(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
static int32_t initInsertQuery(SParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||||
|
@ -1716,8 +1734,14 @@ static int32_t initInsertQuery(SParseContext* pCxt, SCatalogReq* pCatalogReq, co
|
||||||
return createInsertQuery(pCxt, pQuery);
|
return createInsertQuery(pCxt, pQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!((SVnodeModifOpStmt*)(*pQuery)->pRoot)->fileProcessing) {
|
if (NULL != pCxt->pStmtCb) {
|
||||||
return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, (SVnodeModifOpStmt*)(*pQuery)->pRoot);
|
return resetVnodeModifOpStmt(pCxt, *pQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)(*pQuery)->pRoot;
|
||||||
|
|
||||||
|
if (!pStmt->fileProcessing) {
|
||||||
|
return setVnodeModifOpStmt(pCxt, pCatalogReq, pMetaData, pStmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue