diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 4c13f76e24..a35863bb5a 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -106,11 +106,10 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* void qDestroyBoundColInfo(void* pInfo); -void* smlInitHandle(SQuery* pQuery); -void smlDestroyHandle(void* pHandle); -int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, +SQuery* smlInitHandle(); +int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); -int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); +int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 9f7dd13d39..163e704424 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -170,7 +170,6 @@ typedef struct { SHashObj *childTables; SHashObj *superTables; SHashObj *pVgHash; - void *exec; STscObj *taos; SCatalog *pCatalog; @@ -1488,7 +1487,6 @@ static void smlDestroyCols(SArray *cols) { static void smlDestroyInfo(SSmlHandle *info) { if (!info) return; qDestroyQuery(info->pQuery); - smlDestroyHandle(info->exec); // destroy info->childTables void **p1 = (void **)taosHashIterate(info->childTables, NULL); @@ -1526,19 +1524,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr } info->id = smlGenId(); - info->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); - if (NULL == info->pQuery) { - uError("SML:0x%" PRIx64 " create info->pQuery error", info->id); - goto cleanup; - } - info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; - info->pQuery->haveResultSet = false; - info->pQuery->msgType = TDMT_VND_SUBMIT; - info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); - if (NULL == info->pQuery->pRoot) { - uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id); - goto cleanup; - } + info->pQuery = smlInitHandle(); if (pTscObj) { info->taos = pTscObj; @@ -1561,10 +1547,8 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr info->pRequest = request; info->msgBuf.buf = info->pRequest->msgBuf; info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; - info->pRequest->stmtType = info->pQuery->pRoot->type; } - info->exec = smlInitHandle(info->pQuery); info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -1577,7 +1561,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr goto cleanup; } } - if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash || + if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash || NULL == info->dumplicateKey) { uError("SML:0x%" PRIx64 " create info failed", info->id); goto cleanup; @@ -2337,7 +2321,7 @@ static int32_t smlInsertData(SSmlHandle *info) { (*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid - code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, + code = smlBindData(info->pQuery, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->ttl, info->msgBuf.buf, info->msgBuf.len); if (code != TSDB_CODE_SUCCESS) { @@ -2347,7 +2331,7 @@ static int32_t smlInsertData(SSmlHandle *info) { oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable); } - code = smlBuildOutput(info->exec, info->pVgHash); + code = smlBuildOutput(info->pQuery, info->pVgHash); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id); return code; diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 9ab2e46da8..8abb31b890 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -45,12 +45,6 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* return TSDB_CODE_SUCCESS; } -typedef struct SmlExecHandle { - SHashObj* pBlockHash; - SQuery* pQuery; -} SSmlExecHandle; - - static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) { bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); if (NULL == pUseCols) { @@ -164,11 +158,10 @@ end: return code; } -int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, +int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; SSchema* pTagsSchema = getTableTagSchema(pTableMeta); SBoundColInfo bindTags = {0}; SVCreateTbReq *pCreateTblReq = NULL; @@ -200,7 +193,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen); STableDataCxt* pTableCxt = NULL; - ret = insGetTableDataCxt(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), + ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false); if (ret != TSDB_CODE_SUCCESS) { buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error"); @@ -247,12 +240,12 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols } if (kv->type == TSDB_DATA_TYPE_NCHAR){ int32_t len = 0; - char* pUcs4 = taosMemoryCalloc(1, pSchema->bytes - VARSTR_HEADER_SIZE); + char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE); if (NULL == pUcs4) { ret = TSDB_CODE_OUT_OF_MEMORY; goto end; } - if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pSchema->bytes - VARSTR_HEADER_SIZE, &len)) { + if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (errno == E2BIG) { buildInvalidOperationMsg(&pBuf, "value too long"); ret = TSDB_CODE_PAR_VALUE_TOO_LONG; @@ -271,6 +264,14 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols } pVal->flag = CV_FLAG_VALUE; } + + SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); + ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); + if (TSDB_CODE_SUCCESS != ret) { + buildInvalidOperationMsg(&pBuf, "tRowBuild error"); + goto end; + } + insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow)); } end: @@ -280,23 +281,42 @@ end: return ret; } -void* smlInitHandle(SQuery* pQuery) { - SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle)); - if (!handle) return NULL; - handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - handle->pQuery = pQuery; +SQuery* smlInitHandle() { + SQuery *pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create pQuery error"); + return NULL; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + SVnodeModifOpStmt *stmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == stmt) { + uError("create SVnodeModifOpStmt error"); + qDestroyQuery(pQuery); + return NULL; + } + stmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + stmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + stmt->freeHashFunc = insDestroyTableDataCxtHashMap; + stmt->freeArrayFunc = insDestroyVgroupDataCxtList; - return handle; + pQuery->pRoot = (SNode *)stmt; + return pQuery; } -void smlDestroyHandle(void* pHandle) { - if (!pHandle) return; - SSmlExecHandle* handle = (SSmlExecHandle*)pHandle; - insDestroyBlockHashmap(handle->pBlockHash); - taosMemoryFree(handle); -} - -int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) { - SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; - return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash); +int32_t smlBuildOutput(SQuery * handle, SHashObj* pVgHash) { + SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)(handle)->pRoot; + // merge according to vgId + int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); + if (code != TSDB_CODE_SUCCESS) { + uError("insMergeTableDataCxt failed"); + return code; + } + code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); + if (code != TSDB_CODE_SUCCESS) { + uError("insBuildVgDataBlocks failed"); + return code; + } + return code; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 57bac6ced5..c589526e0c 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -1083,8 +1083,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) { - *pTableCxt = taosHashGet(pHash, id, idLen); - if (NULL != *pTableCxt) { + STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen); + if (NULL != tmp) { + *pTableCxt = *tmp; return TSDB_CODE_SUCCESS; } int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);