diff --git a/include/common/tglobal.h b/include/common/tglobal.h index bd5e74387e..350cd785d9 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -103,6 +103,7 @@ extern bool tsKeepColumnName; // client extern int32_t tsMinSlidingTime; extern int32_t tsMinIntervalTime; +extern int32_t tsMaxMemUsedByInsert; // build info extern char version[]; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index ee566d759a..c3caac00ad 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -303,6 +303,10 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re int32_t catalogClearCache(void); +SMetaData* catalogCloneMetaData(SMetaData* pData); + +void catalogFreeMetaData(SMetaData* pData); + /** * Destroy catalog and relase all resources */ diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 5ee097bd92..e1acf0dd6a 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -385,7 +385,6 @@ typedef struct SCmdMsgInfo { SEpSet epSet; void* pMsg; int32_t msgLen; - void* pExtension; // todo remove it soon } SCmdMsgInfo; typedef enum EQueryExecMode { diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index b1a937910d..bcd2316baf 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -33,6 +33,13 @@ typedef struct SStmtCallback { int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**); } SStmtCallback; +typedef struct SParseCsvCxt { + TdFilePtr fp; // last parsed file + int32_t tableNo; // last parsed table + SName tableName; // last parsed table + const char* pLastSqlPos; // the location of the last parsed sql +} SParseCsvCxt; + typedef struct SParseContext { uint64_t requestId; int64_t requestRid; @@ -57,6 +64,8 @@ typedef struct SParseContext { SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos int64_t allocatorId; + bool needMultiParse; + SParseCsvCxt csvCxt; } SParseContext; int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); @@ -67,6 +76,8 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData, SQuery* pQuery); +void qDestroyParseContext(SParseContext* pCxt); + void qDestroyQuery(SQuery* pQueryNode); int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 535a436c6c..f815adfeaa 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -380,16 +380,25 @@ void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); // --- mq void hbMgrInitMqHbRspHandle(); +typedef struct SSqlCallbackWrapper { + SParseContext* pParseCtx; + SCatalogReq* pCatalogReq; + SMetaData* pResultMeta; + SRequestObj* pRequest; +} SSqlCallbackWrapper; + SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); -void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta); -int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); -int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); -void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta); -int32_t removeMeta(STscObj* pTscObj, SArray* tbList); -int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); -int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); -bool qnodeRequired(SRequestObj* pRequest); +void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper); +int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); +int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList); +void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta); +int32_t removeMeta(STscObj* pTscObj, SArray* tbList); +int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); +int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog); +bool qnodeRequired(SRequestObj* pRequest); +int32_t continueInsertFromCsv(SSqlCallbackWrapper* pWrapper, SRequestObj* pRequest); +void destorySqlCallbackWrapper(SSqlCallbackWrapper* pWrapper); #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8ffc88ec28..58bfcc8a09 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -868,10 +868,11 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { return code; } -//todo refacto the error code mgmt +// todo refacto the error code mgmt void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { - SRequestObj* pRequest = (SRequestObj*)param; - STscObj* pTscObj = pRequest->pTscObj; + SSqlCallbackWrapper* pWrapper = param; + SRequestObj* pRequest = pWrapper->pRequest; + STscObj* pTscObj = pRequest->pTscObj; pRequest->code = code; if (pResult) { @@ -882,7 +883,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { int32_t type = pRequest->type; if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) { if (pResult) { - pRequest->body.resInfo.numOfRows = pResult->numOfRows; + pRequest->body.resInfo.numOfRows += pResult->numOfRows; // record the insert rows if (TDMT_VND_SUBMIT == type) { @@ -899,12 +900,13 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { pRequest->requestId); if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { - tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, - pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId); + tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, + tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; schedulerFreeJob(&pRequest->body.queryJob, 0); qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; + destorySqlCallbackWrapper(pWrapper); doAsyncQuery(pRequest, true); return; } @@ -920,6 +922,15 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { pRequest->code = code1; } + if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pWrapper->pParseCtx && pWrapper->pParseCtx->needMultiParse) { + code = continueInsertFromCsv(pWrapper, pRequest); + if (TSDB_CODE_SUCCESS == code) { + return; + } + } + + destorySqlCallbackWrapper(pWrapper); + // return to client pRequest->body.queryFp(pRequest->body.param, pRequest, code); } @@ -1020,76 +1031,86 @@ SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool vali return launchQueryImpl(pRequest, pQuery, false, NULL); } -void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) { +static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, + SSqlCallbackWrapper* pWrapper) { + pRequest->type = pQuery->msgType; + + SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); + SPlanContext cxt = {.queryId = pRequest->requestId, + .acctId = pRequest->pTscObj->acctId, + .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .pAstRoot = pQuery->pRoot, + .showRewrite = pQuery->showRewrite, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pUser = pRequest->pTscObj->user, + .sysInfo = pRequest->pTscObj->sysInfo, + .allocatorId = pRequest->allocatorRefId}; + SQueryPlan* pDag = NULL; + int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList); + if (code) { + tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + } else { + pRequest->body.subplanNum = pDag->numOfSubplans; + } + + pRequest->metric.planEnd = taosGetTimestampUs(); + + if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { + SArray* pNodeList = NULL; + buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); + + SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter, + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self}; + SSchedulerReq req = { + .syncReq = false, + .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), + .pConn = &conn, + .pNodeList = pNodeList, + .pDag = pDag, + .allocatorRefId = pRequest->allocatorRefId, + .sql = pRequest->sqlstr, + .startTs = pRequest->metric.start, + .execFp = schedulerExecCb, + .cbParam = pWrapper, + .chkKillFp = chkRequestKilled, + .chkKillParam = (void*)pRequest->self, + .pExecRes = NULL, + }; + code = schedulerExecJob(&req, &pRequest->body.queryJob); + taosArrayDestroy(pNodeList); + } else { + tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + destorySqlCallbackWrapper(pWrapper); + pRequest->body.queryFp(pRequest->body.param, pRequest, code); + } + + // todo not to be released here + taosArrayDestroy(pMnodeList); + + return code; +} + +void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) { int32_t code = 0; pRequest->body.execMode = pQuery->execMode; + if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) { + destorySqlCallbackWrapper(pWrapper); + } switch (pQuery->execMode) { case QUERY_EXEC_MODE_LOCAL: asyncExecLocalCmd(pRequest, pQuery); - return; + break; case QUERY_EXEC_MODE_RPC: code = asyncExecDdlQuery(pRequest, pQuery); break; case QUERY_EXEC_MODE_SCHEDULE: { - SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); - - pRequest->type = pQuery->msgType; - - SPlanContext cxt = {.queryId = pRequest->requestId, - .acctId = pRequest->pTscObj->acctId, - .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), - .pAstRoot = pQuery->pRoot, - .showRewrite = pQuery->showRewrite, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, - .pUser = pRequest->pTscObj->user, - .sysInfo = pRequest->pTscObj->sysInfo, - .allocatorId = pRequest->allocatorRefId}; - - SAppInstInfo* pAppInfo = getAppInfo(pRequest); - SQueryPlan* pDag = NULL; - code = qCreateQueryPlan(&cxt, &pDag, pMnodeList); - if (code) { - tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), - pRequest->requestId); - } else { - pRequest->body.subplanNum = pDag->numOfSubplans; - } - - pRequest->metric.planEnd = taosGetTimestampUs(); - if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { - SArray* pNodeList = NULL; - buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); - - SRequestConnInfo conn = { - .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; - SSchedulerReq req = { - .syncReq = false, - .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), - .pConn = &conn, - .pNodeList = pNodeList, - .pDag = pDag, - .allocatorRefId = pRequest->allocatorRefId, - .sql = pRequest->sqlstr, - .startTs = pRequest->metric.start, - .execFp = schedulerExecCb, - .cbParam = pRequest, - .chkKillFp = chkRequestKilled, - .chkKillParam = (void*)pRequest->self, - .pExecRes = NULL, - }; - code = schedulerExecJob(&req, &pRequest->body.queryJob); - taosArrayDestroy(pNodeList); - } else { - tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), - pRequest->requestId); - pRequest->body.queryFp(pRequest->body.param, pRequest, code); - } - - // todo not to be released here - taosArrayDestroy(pMnodeList); + code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper); break; } case QUERY_EXEC_MODE_EMPTY_RESULT: diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 5b255a28ea..6f8cef7c0d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -667,43 +667,47 @@ const char *taos_get_server_info(TAOS *taos) { return pTscObj->sDetailVer; } -typedef struct SqlParseWrapper { - SParseContext *pCtx; - SCatalogReq catalogReq; - SRequestObj *pRequest; -} SqlParseWrapper; - static void destoryTablesReq(void *p) { STablesReq *pRes = (STablesReq *)p; taosArrayDestroy(pRes->pTables); } -static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { - taosArrayDestroy(pWrapper->catalogReq.pDbVgroup); - taosArrayDestroy(pWrapper->catalogReq.pDbCfg); - taosArrayDestroy(pWrapper->catalogReq.pDbInfo); - taosArrayDestroyEx(pWrapper->catalogReq.pTableMeta, destoryTablesReq); - taosArrayDestroyEx(pWrapper->catalogReq.pTableHash, destoryTablesReq); - taosArrayDestroy(pWrapper->catalogReq.pUdf); - taosArrayDestroy(pWrapper->catalogReq.pIndex); - taosArrayDestroy(pWrapper->catalogReq.pUser); - taosArrayDestroy(pWrapper->catalogReq.pTableIndex); - taosArrayDestroy(pWrapper->pCtx->pTableMetaPos); - taosArrayDestroy(pWrapper->pCtx->pTableVgroupPos); - taosMemoryFree(pWrapper->pCtx); +static void destoryCatalogReq(SCatalogReq *pCatalogReq) { + if (NULL == pCatalogReq) { + return; + } + taosArrayDestroy(pCatalogReq->pDbVgroup); + taosArrayDestroy(pCatalogReq->pDbCfg); + taosArrayDestroy(pCatalogReq->pDbInfo); + taosArrayDestroyEx(pCatalogReq->pTableMeta, destoryTablesReq); + taosArrayDestroyEx(pCatalogReq->pTableHash, destoryTablesReq); + taosArrayDestroy(pCatalogReq->pUdf); + taosArrayDestroy(pCatalogReq->pIndex); + taosArrayDestroy(pCatalogReq->pUser); + taosArrayDestroy(pCatalogReq->pTableIndex); + taosMemoryFree(pCatalogReq); +} + +void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) { + if (NULL == pWrapper) { + return; + } + destoryCatalogReq(pWrapper->pCatalogReq); + qDestroyParseContext(pWrapper->pParseCtx); + catalogFreeMetaData(pWrapper->pResultMeta); taosMemoryFree(pWrapper); } void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { - SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; - SRequestObj *pRequest = pWrapper->pRequest; - SQuery *pQuery = pRequest->pQuery; + SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; + SRequestObj *pRequest = pWrapper->pRequest; + SQuery *pQuery = pRequest->pQuery; pRequest->metric.ctgEnd = taosGetTimestampUs(); qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId); if (code == TSDB_CODE_SUCCESS) { - code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); + code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); pRequest->stableQuery = pQuery->stableQuery; if (pQuery->pRoot) { pRequest->stmtType = pQuery->pRoot->type; @@ -712,6 +716,13 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->metric.semanticEnd = taosGetTimestampUs(); + if (code == TSDB_CODE_SUCCESS && pWrapper->pParseCtx->needMultiParse) { + pWrapper->pResultMeta = catalogCloneMetaData(pResultMeta); + if (NULL == pWrapper->pResultMeta) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (code == TSDB_CODE_SUCCESS) { if (pQuery->haveResultSet) { setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols); @@ -722,15 +733,13 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { TSWAP(pRequest->tableList, (pQuery)->pTableList); TSWAP(pRequest->targetTableList, (pQuery)->pTargetTableList); - destorySqlParseWrapper(pWrapper); - - double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd)/1000.0; + double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd) / 1000.0; tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%" PRIx64, pRequest->self, el, pRequest->requestId); - launchAsyncQuery(pRequest, pQuery, pResultMeta); + launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper); } else { - destorySqlParseWrapper(pWrapper); + destorySqlCallbackWrapper(pWrapper); qDestroyQuery(pRequest->pQuery); pRequest->pQuery = NULL; @@ -750,6 +759,16 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { } } +int32_t continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest) { + qDestroyQuery(pRequest->pQuery); + pRequest->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pRequest->pQuery) { + return TSDB_CODE_OUT_OF_MEMORY; + } + retrieveMetaCallback(pWrapper->pResultMeta, pWrapper, TSDB_CODE_SUCCESS); + return TSDB_CODE_SUCCESS; +} + void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { int64_t connId = *(int64_t *)taos; taosAsyncQueryImpl(connId, sql, fp, param, false); @@ -786,37 +805,48 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { } void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { - SParseContext *pCxt = NULL; - STscObj *pTscObj = pRequest->pTscObj; - int32_t code = 0; + STscObj *pTscObj = pRequest->pTscObj; + SSqlCallbackWrapper *pWrapper = NULL; + int32_t code = TSDB_CODE_SUCCESS; if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { code = pRequest->prevCode; - goto _error; } - code = createParseContext(pRequest, &pCxt); - if (code != TSDB_CODE_SUCCESS) { - goto _error; + if (TSDB_CODE_SUCCESS == code) { + pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); + if (pWrapper == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pWrapper->pRequest = pRequest; + } } - pCxt->mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCxt->pCatalog); - if (code != TSDB_CODE_SUCCESS) { - goto _error; + if (TSDB_CODE_SUCCESS == code) { + code = createParseContext(pRequest, &pWrapper->pParseCtx); } - pRequest->metric.syntaxStart = taosGetTimestampUs(); - - SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)}; - code = qParseSqlSyntax(pCxt, &pRequest->pQuery, &catalogReq); - if (code != TSDB_CODE_SUCCESS) { - goto _error; + if (TSDB_CODE_SUCCESS == code) { + pWrapper->pParseCtx->mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pWrapper->pParseCtx->pCatalog); } - pRequest->metric.syntaxEnd = taosGetTimestampUs(); + if (TSDB_CODE_SUCCESS == code) { + pRequest->metric.syntaxStart = taosGetTimestampUs(); - if (!updateMetaForce) { + pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq)); + if (pWrapper->pCatalogReq == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pWrapper->pCatalogReq->forceUpdate = updateMetaForce; + pWrapper->pCatalogReq->qNodeRequired = qnodeRequired(pRequest); + code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq); + } + + pRequest->metric.syntaxEnd = taosGetTimestampUs(); + } + + if (TSDB_CODE_SUCCESS == code && !updateMetaForce) { SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; if (NULL == pRequest->pQuery->pRoot) { atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); @@ -825,38 +855,26 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } } - SqlParseWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SqlParseWrapper)); - if (pWrapper == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + if (TSDB_CODE_SUCCESS == code) { + SRequestConnInfo conn = {.pTrans = pWrapper->pParseCtx->pTransporter, + .requestId = pWrapper->pParseCtx->requestId, + .requestObjRefId = pWrapper->pParseCtx->requestRid, + .mgmtEps = pWrapper->pParseCtx->mgmtEpSet}; + + pRequest->metric.ctgStart = taosGetTimestampUs(); + + code = catalogAsyncGetAllMeta(pWrapper->pParseCtx->pCatalog, &conn, pWrapper->pCatalogReq, retrieveMetaCallback, + pWrapper, &pRequest->body.queryJob); } - pWrapper->pCtx = pCxt; - pWrapper->pRequest = pRequest; - pWrapper->catalogReq = catalogReq; - - SRequestConnInfo conn = {.pTrans = pCxt->pTransporter, - .requestId = pCxt->requestId, - .requestObjRefId = pCxt->requestRid, - .mgmtEps = pCxt->mgmtEpSet}; - - pRequest->metric.ctgStart = taosGetTimestampUs(); - - code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper, - &pRequest->body.queryJob); - pCxt = NULL; - if (code == TSDB_CODE_SUCCESS) { - return; + if (TSDB_CODE_SUCCESS != code) { + tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), + pRequest->requestId); + destorySqlCallbackWrapper(pWrapper); + terrno = code; + pRequest->code = code; + pRequest->body.queryFp(pRequest->body.param, pRequest, code); } - -_error: - tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), - pRequest->requestId); - taosMemoryFree(pCxt); - - terrno = code; - pRequest->code = code; - pRequest->body.queryFp(pRequest->body.param, pRequest, code); } static void fetchCallback(void *pResult, void *param, int32_t code) { diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 195466061d..c5633e73d0 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -209,7 +209,7 @@ static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashOb } static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) { - if(pBuf->buf){ + if (pBuf->buf) { memset(pBuf->buf, 0, pBuf->len); if (msg1) strncat(pBuf->buf, msg1, pBuf->len); int32_t left = pBuf->len - strlen(pBuf->buf); @@ -256,15 +256,15 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) { while (result <= length) { result *= 2; } - if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){ + if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) { result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE; - } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ + } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; } - if (type == TSDB_DATA_TYPE_NCHAR){ + if (type == TSDB_DATA_TYPE_NCHAR) { result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; - }else if (type == TSDB_DATA_TYPE_BINARY){ + } else if (type == TSDB_DATA_TYPE_BINARY) { result = result + VARSTR_HEADER_SIZE; } return result; @@ -274,7 +274,7 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH ESchemaAction *action, bool isTag) { int32_t code = TSDB_CODE_SUCCESS; for (int j = 0; j < taosArrayGetSize(cols); ++j) { - if(j == 0 && !isTag) continue; + if (j == 0 && !isTag) continue; SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j); code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info); if (code != TSDB_CODE_SUCCESS) { @@ -286,12 +286,12 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) { SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - int32_t i = 0; - for ( ;i < length; i++) { + int32_t i = 0; + for (; i < length; i++) { taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES); } - if (isTag){ + if (isTag) { i = 0; } else { i = 1; @@ -306,7 +306,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool return 0; } -static int32_t getBytes(uint8_t type, int32_t length){ +static int32_t getBytes(uint8_t type, int32_t length) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { return smlFindNearestPowerOf2(length, type); } else { @@ -314,21 +314,22 @@ static int32_t getBytes(uint8_t type, int32_t length){ } } -static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray* results, int32_t numOfCols, bool isTag) { +static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, + SArray *results, int32_t numOfCols, bool isTag) { for (int j = 0; j < taosArrayGetSize(cols); ++j) { - SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j); + SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j); ESchemaAction action = SCHEMA_ACTION_NULL; smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info); - if(action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG){ + if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) { SField field = {0}; field.type = kv->type; field.bytes = getBytes(kv->type, kv->length); memcpy(field.name, kv->key, kv->keyLen); taosArrayPush(results, &field); - }else if(action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){ + } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) { uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen); - uint16_t newIndex = *index; - if(isTag) newIndex -= numOfCols; + uint16_t newIndex = *index; + if (isTag) newIndex -= numOfCols; SField *field = (SField *)taosArrayGet(results, newIndex); field->bytes = getBytes(kv->type, kv->length); } @@ -336,12 +337,11 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO return TSDB_CODE_SUCCESS; } -//static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData, -// int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){ -static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, SArray* pTags, - STableMeta *pTableMeta, ESchemaAction action){ - - SRequestObj* pRequest = NULL; +// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData, +// int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){ +static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta, + ESchemaAction action) { + SRequestObj *pRequest = NULL; SMCreateStbReq pReq = {0}; int32_t code = TSDB_CODE_SUCCESS; SCmdMsgInfo pCmdMsg = {0}; @@ -363,24 +363,24 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, goto end; } - if (action == SCHEMA_ACTION_CREATE_STABLE){ + if (action == SCHEMA_ACTION_CREATE_STABLE) { pReq.colVer = 1; pReq.tagVer = 1; pReq.suid = 0; pReq.source = TD_REQ_FROM_APP; - } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){ + } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) { pReq.colVer = pTableMeta->sversion; pReq.tagVer = pTableMeta->tversion + 1; pReq.suid = pTableMeta->uid; pReq.source = TD_REQ_FROM_TAOX; - } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE){ + } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) { pReq.colVer = pTableMeta->sversion + 1; pReq.tagVer = pTableMeta->tversion; pReq.suid = pTableMeta->uid; pReq.source = TD_REQ_FROM_TAOX; } - if (pReq.numOfTags == 0){ + if (pReq.numOfTags == 0) { pReq.numOfTags = 1; SField field = {0}; field.type = TSDB_DATA_TYPE_NCHAR; @@ -412,7 +412,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, launchQueryImpl(pRequest, &pQuery, true, NULL); - if(pRequest->code == TSDB_CODE_SUCCESS){ + if (pRequest->code == TSDB_CODE_SUCCESS) { catalogRemoveTableMeta(info->pCatalog, pName); } code = pRequest->code; @@ -425,11 +425,11 @@ end: } static int32_t smlModifyDBSchemas(SSmlHandle *info) { - int32_t code = 0; - SHashObj *hashTmp = NULL; - STableMeta *pTableMeta = NULL; + int32_t code = 0; + SHashObj *hashTmp = NULL; + STableMeta *pTableMeta = NULL; - SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; + SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; strcpy(pName.dbname, info->pRequest->pDb); SRequestConnInfo conn = {0}; @@ -451,8 +451,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) { - SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); - SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); + SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); + SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true); smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false); @@ -463,8 +463,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } info->cost.numOfCreateSTables++; } else if (code == TSDB_CODE_SUCCESS) { - hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, + HASH_NO_LOCK); for (uint16_t i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) { taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES); @@ -475,22 +475,25 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { if (code != TSDB_CODE_SUCCESS) { goto end; } - if (action != SCHEMA_ACTION_NULL){ - SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); - SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField)); + if (action != SCHEMA_ACTION_NULL) { + SArray *pColumns = + taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); + SArray *pTags = + taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField)); for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) { SField field = {0}; field.type = pTableMeta->schema[i].type; field.bytes = pTableMeta->schema[i].bytes; strcpy(field.name, pTableMeta->schema[i].name); - if(i < pTableMeta->tableInfo.numOfColumns){ + if (i < pTableMeta->tableInfo.numOfColumns) { taosArrayPush(pColumns, &field); - }else{ + } else { taosArrayPush(pTags, &field); } } - smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, pTableMeta->tableInfo.numOfColumns, true); + smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, + pTableMeta->tableInfo.numOfColumns, true); code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action); if (code != TSDB_CODE_SUCCESS) { @@ -518,23 +521,26 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { if (code != TSDB_CODE_SUCCESS) { goto end; } - if (action != SCHEMA_ACTION_NULL){ - SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); - SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField)); + if (action != SCHEMA_ACTION_NULL) { + SArray *pColumns = + taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); + SArray *pTags = + taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField)); for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) { SField field = {0}; field.type = pTableMeta->schema[i].type; field.bytes = pTableMeta->schema[i].bytes; strcpy(field.name, pTableMeta->schema[i].name); - if(i < pTableMeta->tableInfo.numOfColumns){ + if (i < pTableMeta->tableInfo.numOfColumns) { taosArrayPush(pColumns, &field); - }else{ + } else { taosArrayPush(pTags, &field); } } - smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, pTableMeta->tableInfo.numOfColumns, false); + smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, + pTableMeta->tableInfo.numOfColumns, false); code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action); if (code != TSDB_CODE_SUCCESS) { @@ -847,7 +853,7 @@ static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) { int64_t ts = 0; if (info->protocol == TSDB_SML_LINE_PROTOCOL) { -// uError("SML:data:%s,len:%d", data, len); + // uError("SML:data:%s,len:%d", data, len); ts = smlParseInfluxTime(info, data, len); } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { ts = smlParseOpenTsdbTime(info, data, len); @@ -877,7 +883,7 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { if (smlIsBinary(pVal->value, pVal->length)) { pVal->type = TSDB_DATA_TYPE_BINARY; pVal->length -= BINARY_ADD_LEN; - if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){ + if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) { return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; } pVal->value += (BINARY_ADD_LEN - 1); @@ -887,7 +893,7 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { if (smlIsNchar(pVal->value, pVal->length)) { pVal->type = TSDB_DATA_TYPE_NCHAR; pVal->length -= NCHAR_ADD_LEN; - if(pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ + if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; } pVal->value += (NCHAR_ADD_LEN - 1); @@ -1063,7 +1069,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab continue; } - if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ + if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; } @@ -1224,7 +1230,7 @@ static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *c kv->value = value; kv->length = valueLen; if (isTag) { - if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ + if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; } kv->type = TSDB_DATA_TYPE_NCHAR; @@ -1362,8 +1368,8 @@ static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) { static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) { SHashObj *s1 = *(SHashObj **)key1; SHashObj *s2 = *(SHashObj **)key2; - SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN); - SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN); + SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN); + SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN); ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP); if (kv1->i < kv2->i) { @@ -1375,12 +1381,12 @@ static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) { } } -static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ - if(dataFormat){ +static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *cols) { + if (dataFormat) { void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT); - if(p == NULL){ + if (p == NULL) { taosArrayPush(oneTable->cols, &cols); - }else{ + } else { taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols); } return TSDB_CODE_SUCCESS; @@ -1397,9 +1403,9 @@ static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *col } void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT); - if(p == NULL){ + if (p == NULL) { taosArrayPush(oneTable->cols, &kvHash); - }else{ + } else { taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash); } return TSDB_CODE_SUCCESS; @@ -1488,15 +1494,15 @@ static void smlDestroyInfo(SSmlHandle *info) { taosMemoryFreeClear(info); } -static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){ - int32_t code = TSDB_CODE_SUCCESS; - SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle)); +static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision) { + int32_t code = TSDB_CODE_SUCCESS; + SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle)); if (NULL == info) { return NULL; } info->id = smlGenId(); - info->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + info->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY); if (NULL == info->pQuery) { uError("SML:0x%" PRIx64 " create info->pQuery error", info->id); goto cleanup; @@ -1511,8 +1517,8 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr } ((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV; - if (pTscObj){ - info->taos = pTscObj; + if (pTscObj) { + info->taos = pTscObj; code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code); @@ -1528,7 +1534,7 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr info->dataFormat = true; } - if(request){ + if (request) { info->pRequest = request; info->msgBuf.buf = info->pRequest->msgBuf; info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; @@ -1827,10 +1833,11 @@ static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) { } pVal->length = (int16_t)strlen(value->valuestring); - if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){ + if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) { return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; } - if (pVal->type == TSDB_DATA_TYPE_NCHAR && pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){ + if (pVal->type == TSDB_DATA_TYPE_NCHAR && + pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; } @@ -2058,7 +2065,7 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) { SSmlLineInfo elements = {0}; uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql); - int ret = smlParseInfluxString(sql, &elements, &info->msgBuf); + int ret = smlParseInfluxString(sql, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id); return ret; @@ -2314,7 +2321,8 @@ static int32_t smlInsertData(SSmlHandle *info) { (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, - (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->msgBuf.buf, info->msgBuf.len); + (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, + info->msgBuf.buf, info->msgBuf.len); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlBindData failed", info->id); return code; @@ -2336,7 +2344,12 @@ static int32_t smlInsertData(SSmlHandle *info) { SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary; atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); - launchAsyncQuery(info->pRequest, info->pQuery, NULL); + SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); + if (pWrapper == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pWrapper->pRequest = info->pRequest; + launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper); return TSDB_CODE_SUCCESS; } @@ -2417,41 +2430,41 @@ static int smlProcess(SSmlHandle *info, char *lines[], int numLines) { } static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) { -// SCatalog *catalog = NULL; -// int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog); -// if (code != TSDB_CODE_SUCCESS) { -// uError("SML get catalog error %d", code); -// return code; -// } -// -// SName name; -// tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db)); -// char dbFname[TSDB_DB_FNAME_LEN] = {0}; -// tNameGetFullDbName(&name, dbFname); -// SDbCfgInfo pInfo = {0}; -// -// SRequestConnInfo conn = {0}; -// conn.pTrans = taos->pAppInfo->pTransporter; -// conn.requestId = request->requestId; -// conn.requestObjRefId = request->self; -// conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp); -// -// code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo); -// if (code != TSDB_CODE_SUCCESS) { -// return code; -// } -// taosArrayDestroy(pInfo.pRetensions); -// -// if (!pInfo.schemaless) { -// return TSDB_CODE_SML_INVALID_DB_CONF; -// } + // SCatalog *catalog = NULL; + // int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog); + // if (code != TSDB_CODE_SUCCESS) { + // uError("SML get catalog error %d", code); + // return code; + // } + // + // SName name; + // tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db)); + // char dbFname[TSDB_DB_FNAME_LEN] = {0}; + // tNameGetFullDbName(&name, dbFname); + // SDbCfgInfo pInfo = {0}; + // + // SRequestConnInfo conn = {0}; + // conn.pTrans = taos->pAppInfo->pTransporter; + // conn.requestId = request->requestId; + // conn.requestObjRefId = request->self; + // conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp); + // + // code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo); + // if (code != TSDB_CODE_SUCCESS) { + // return code; + // } + // taosArrayDestroy(pInfo.pRetensions); + // + // if (!pInfo.schemaless) { + // return TSDB_CODE_SML_INVALID_DB_CONF; + // } return TSDB_CODE_SUCCESS; } static void smlInsertCallback(void *param, void *res, int32_t code) { SRequestObj *pRequest = (SRequestObj *)res; SSmlHandle *info = (SSmlHandle *)param; - int32_t rows = taos_affected_rows(pRequest); + int32_t rows = taos_affected_rows(pRequest); uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); Params *pParam = info->params; @@ -2461,7 +2474,7 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { if (code != TSDB_CODE_SUCCESS) { pParam->request->code = code; pParam->request->body.resInfo.numOfRows += rows; - }else{ + } else { pParam->request->body.resInfo.numOfRows += info->affectedRows; } if (pParam->cnt == pParam->total) { @@ -2497,20 +2510,20 @@ static void smlInsertCallback(void *param, void *res, int32_t code) { * */ -TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { +TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { if (NULL == taos) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - SRequestObj* request = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); - if(!request){ + SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT); + if (!request) { uError("SML:taos_schemaless_insert error request is null"); return NULL; } - int batchs = 0; - STscObj* pTscObj = request->pTscObj; + int batchs = 0; + STscObj *pTscObj = request->pTscObj; pTscObj->schemalessType = 1; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; @@ -2526,7 +2539,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){ + if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) { request->code = TSDB_CODE_SML_INVALID_DB_CONF; smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL); goto end; @@ -2551,9 +2564,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr goto end; } - if(protocol == TSDB_SML_JSON_PROTOCOL){ + if (protocol == TSDB_SML_JSON_PROTOCOL) { numLines = 1; - }else if(numLines <= 0){ + } else if (numLines <= 0) { request->code = TSDB_CODE_SML_INVALID_DATA; smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL); goto end; @@ -2562,14 +2575,14 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr batchs = ceil(((double)numLines) / LINE_BATCH); params.total = batchs; for (int i = 0; i < batchs; ++i) { - SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT); - if(!req){ + SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT); + if (!req) { request->code = TSDB_CODE_OUT_OF_MEMORY; uError("SML:taos_schemaless_insert error request is null"); goto end; } - SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision); - if(!info){ + SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision); + if (!info) { request->code = TSDB_CODE_OUT_OF_MEMORY; uError("SML:taos_schemaless_insert error SSmlHandle is null"); goto end; @@ -2599,8 +2612,8 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); -// ((STscObj *)taos)->schemalessType = 0; + // ((STscObj *)taos)->schemalessType = 0; pTscObj->schemalessType = 1; uDebug("resultend:%s", request->msgBuf); - return (TAOS_RES*)request; + return (TAOS_RES *)request; } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ce9a9a7b50..88df54a9ea 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -85,7 +85,8 @@ uint16_t tsTelemPort = 80; char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value. // If set to empty system will generate table name using MD5 hash. -bool tsSmlDataFormat = false; // true means that the name and order of cols in each line are the same(only for influx protocol) +// true means that the name and order of cols in each line are the same(only for influx protocol) +bool tsSmlDataFormat = false; // query int32_t tsQueryPolicy = 1; @@ -125,6 +126,9 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000; // 1 database precision unit for interval time range, changed accordingly int32_t tsMinIntervalTime = 1; +// maximum memory allowed to be allocated for a single csv load (in MB) +int32_t tsMaxMemUsedByInsert = 1024; + // the maximum allowed query buffer size during query processing for each data node. // -1 no limit (default) // 0 no query allowed, queries are disabled @@ -296,6 +300,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); @@ -374,8 +379,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; -// tsNumOfVnodeFetchThreads = 1; -// if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1, 0) != 0) return -1; + // tsNumOfVnodeFetchThreads = 1; + // if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); @@ -497,15 +502,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } -/* - pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads"); - if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfVnodeFetchThreads = numOfCores / 4; - tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); - pItem->i32 = tsNumOfVnodeFetchThreads; - pItem->stype = stype; - } -*/ + /* + pItem = cfgGetItem(tsCfg, "numOfVnodeFetchThreads"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfVnodeFetchThreads = numOfCores / 4; + tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); + pItem->i32 = tsNumOfVnodeFetchThreads; + pItem->stype = stype; + } + */ pItem = cfgGetItem(tsCfg, "numOfVnodeWriteThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { @@ -648,6 +653,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; + tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32; + tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32; tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32; @@ -705,7 +712,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32; -// tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; + // tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; @@ -877,6 +884,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32; } else if (strcasecmp("maxNumOfDistinctRes", name) == 0) { tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; + } else if (strcasecmp("maxMemUsedByInsert", name) == 0) { + tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32; } break; } @@ -955,10 +964,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; } else if (strcasecmp("numOfVnodeQueryThreads", name) == 0) { tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; -/* - } else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) { - tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; -*/ + /* + } else if (strcasecmp("numOfVnodeFetchThreads", name) == 0) { + tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; + */ } else if (strcasecmp("numOfVnodeWriteThreads", name) == 0) { tsNumOfVnodeWriteThreads = cfgGetItem(pCfg, "numOfVnodeWriteThreads")->i32; } else if (strcasecmp("numOfVnodeSyncThreads", name) == 0) { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 97b174de1c..0c0ca9649a 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -1193,4 +1193,232 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) { return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx); } +static void* ctgCloneDbVgroup(void* pSrc) { + return taosArrayDup((const SArray*)pSrc); +} +static void ctgFreeDbVgroup(void* p) { + taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); +} + +static void* ctgCloneDbCfgInfo(void* pSrc) { + SDbCfgInfo* pDst = taosMemoryMalloc(sizeof(SDbCfgInfo)); + if (NULL == pDst) { + return NULL; + } + memcpy(pDst, pSrc, sizeof(SDbCfgInfo)); + return pDst; +} + +static void ctgFreeDbCfgInfo(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneDbInfo(void* pSrc) { + SDbInfo* pDst = taosMemoryMalloc(sizeof(SDbInfo)); + if (NULL == pDst) { + return NULL; + } + memcpy(pDst, pSrc, sizeof(SDbInfo)); + return pDst; +} + +static void ctgFreeDbInfo(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneTableMeta(void* pSrc) { + STableMeta* pMeta = pSrc; + int32_t size = sizeof(STableMeta) + (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags) * sizeof(SSchema); + STableMeta* pDst = taosMemoryMalloc(size); + if (NULL == pDst) { + return NULL; + } + memcpy(pDst, pSrc, size); + return pDst; +} + +static void ctgFreeTableMeta(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneVgroupInfo(void* pSrc) { + SVgroupInfo* pDst = taosMemoryMalloc(sizeof(SVgroupInfo)); + if (NULL == pDst) { + return NULL; + } + memcpy(pDst, pSrc, sizeof(SVgroupInfo)); + return pDst; +} + +static void ctgFreeVgroupInfo(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneTableIndices(void* pSrc) { + return taosArrayDup((const SArray*)pSrc); +} + +static void ctgFreeTableIndices(void* p) { + taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); +} + +static void* ctgCloneFuncInfo(void* pSrc) { + SFuncInfo* pDst = taosMemoryMalloc(sizeof(SFuncInfo)); + if (NULL == pDst) { + return NULL; + } + memcpy(pDst, pSrc, sizeof(SFuncInfo)); + return pDst; +} + +static void ctgFreeFuncInfo(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneIndexInfo(void* pSrc) { + SIndexInfo* pDst = taosMemoryMalloc(sizeof(SIndexInfo)); + if (NULL == pDst) { + return NULL; + } + memcpy(pDst, pSrc, sizeof(SIndexInfo)); + return pDst; +} + +static void ctgFreeIndexInfo(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneUserAuth(void* pSrc) { + bool* pDst = taosMemoryMalloc(sizeof(bool)); + if (NULL == pDst) { + return NULL; + } + *pDst = *(bool*)pSrc; + return pDst; +} + +static void ctgFreeUserAuth(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneQnodeList(void* pSrc) { + return taosArrayDup((const SArray*)pSrc); +} + +static void ctgFreeQnodeList(void* p) { + taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); +} + +static void* ctgCloneTableCfg(void* pSrc) { + STableCfg* pDst = taosMemoryMalloc(sizeof(STableCfg)); + if (NULL == pDst) { + return NULL; + } + memcpy(pDst, pSrc, sizeof(STableCfg)); + return pDst; +} + +static void ctgFreeTableCfg(void* p) { + taosMemoryFree(((SMetaRes*)p)->pRes); +} + +static void* ctgCloneDnodeList(void* pSrc) { + return taosArrayDup((const SArray*)pSrc); +} + +static void ctgFreeDnodeList(void* p) { + taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); +} + +static int32_t ctgCloneMetaDataArray(SArray* pSrc, FCopy copyFunc, SArray** pDst) { + if (NULL == pSrc) { + return TSDB_CODE_SUCCESS; + } + + int32_t size = taosArrayGetSize(pSrc); + *pDst = taosArrayInit(size, sizeof(SMetaRes)); + if (NULL == *pDst) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < size; ++i) { + SMetaRes* pRes = taosArrayGet(pSrc, i); + SMetaRes res = {.code = pRes->code, .pRes = copyFunc(pRes->pRes)}; + if (NULL == res.pRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(*pDst, &res); + } + + return TSDB_CODE_SUCCESS; +} + +SMetaData* catalogCloneMetaData(SMetaData* pData) { + SMetaData* pRes = taosMemoryCalloc(1, sizeof(SMetaData)); + if (NULL == pRes) { + return NULL; + } + + int32_t code = ctgCloneMetaDataArray(pData->pDbVgroup, ctgCloneDbVgroup, &pRes->pDbVgroup); + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pDbCfg, ctgCloneDbCfgInfo, &pRes->pDbCfg); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pDbInfo, ctgCloneDbInfo, &pRes->pDbInfo); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pTableMeta, ctgCloneTableMeta, &pRes->pTableMeta); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pTableHash, ctgCloneVgroupInfo, &pRes->pTableHash); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pTableIndex, ctgCloneTableIndices, &pRes->pTableIndex); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pUdfList, ctgCloneFuncInfo, &pRes->pUdfList); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pIndex, ctgCloneIndexInfo, &pRes->pIndex); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pUser, ctgCloneUserAuth, &pRes->pUser); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pQnodeList, ctgCloneQnodeList, &pRes->pQnodeList); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pTableCfg, ctgCloneTableCfg, &pRes->pTableCfg); + } + if (TSDB_CODE_SUCCESS == code) { + code = ctgCloneMetaDataArray(pData->pDnodeList, ctgCloneDnodeList, &pRes->pDnodeList); + } + + if (TSDB_CODE_SUCCESS != code) { + catalogFreeMetaData(pRes); + return NULL; + } + + return pRes; +} + +void catalogFreeMetaData(SMetaData* pData) { + if (NULL == pData) { + return; + } + + taosArrayDestroyEx(pData->pDbVgroup, ctgFreeDbVgroup); + taosArrayDestroyEx(pData->pDbCfg, ctgFreeDbCfgInfo); + taosArrayDestroyEx(pData->pDbInfo, ctgFreeDbInfo); + taosArrayDestroyEx(pData->pTableMeta, ctgFreeTableMeta); + taosArrayDestroyEx(pData->pTableHash, ctgFreeVgroupInfo); + taosArrayDestroyEx(pData->pTableIndex, ctgFreeTableIndices); + taosArrayDestroyEx(pData->pUdfList, ctgFreeFuncInfo); + taosArrayDestroyEx(pData->pIndex, ctgFreeIndexInfo); + taosArrayDestroyEx(pData->pUser, ctgFreeUserAuth); + taosArrayDestroyEx(pData->pQnodeList, ctgFreeQnodeList); + taosArrayDestroyEx(pData->pTableCfg, ctgFreeTableCfg); + taosArrayDestroyEx(pData->pDnodeList, ctgFreeDnodeList); + taosMemoryFreeClear(pData->pSvrVer); + taosMemoryFree(pData); +} diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 86930268f1..eec20f91e4 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -266,7 +266,7 @@ static int32_t getTableVgroup(SInsertParseContext* pCxt, int32_t tbNo, SName* pT return catalogGetTableHashVgroup(pBasicCtx->pCatalog, &conn, pTbName, pVg); } -static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname, bool isStb) { +static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* name, bool isStb) { CHECK_CODE(getTableSchema(pCxt, tbNo, name, isStb, &pCxt->pTableMeta)); if (!isStb) { SVgroupInfo vg; @@ -276,12 +276,12 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, int32_t tbNo, SName* return TSDB_CODE_SUCCESS; } -static int32_t getTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) { - return getTableMetaImpl(pCxt, tbNo, name, dbFname, false); +static int32_t getTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name) { + return getTableMetaImpl(pCxt, tbNo, name, false); } -static int32_t getSTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name, char* dbFname) { - return getTableMetaImpl(pCxt, tbNo, name, dbFname, true); +static int32_t getSTableMeta(SInsertParseContext* pCxt, int32_t tbNo, SName* name) { + return getTableMetaImpl(pCxt, tbNo, name, true); } static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) { @@ -1178,7 +1178,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, int32_t tbNo, SName* tNameGetFullDbName(&sname, dbFName); strcpy(pCxt->sTableName, sname.tname); - CHECK_CODE(getSTableMeta(pCxt, tbNo, &sname, dbFName)); + CHECK_CODE(getSTableMeta(pCxt, tbNo, &sname)); if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } @@ -1385,6 +1385,10 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB (*numOfRows)++; } pCxt->pSql = pRawSql; + + if (pDataBlock->nAllocSize > tsMaxMemUsedByInsert * 1024 * 1024) { + break; + } } if (0 == (*numOfRows) && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) { @@ -1393,23 +1397,13 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB return TSDB_CODE_SUCCESS; } -static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STableDataBlocks* dataBuf) { - char filePathStr[TSDB_FILENAME_LEN] = {0}; - if (TK_NK_STRING == filePath.type) { - trimString(filePath.z, filePath.n, filePathStr, sizeof(filePathStr)); - } else { - strncpy(filePathStr, filePath.z, filePath.n); - } - TdFilePtr fp = taosOpenFile(filePathStr, TD_FILE_READ | TD_FILE_STREAM); - if (NULL == fp) { - return TAOS_SYSTEM_ERROR(errno); - } - +static int32_t parseDataFromFileAgain(SInsertParseContext* pCxt, int16_t tableNo, const SName* pTableName, + STableDataBlocks* dataBuf) { int32_t maxNumOfRows; CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows)); int32_t numOfRows = 0; - CHECK_CODE(parseCsvFile(pCxt, fp, dataBuf, maxNumOfRows, &numOfRows)); + CHECK_CODE(parseCsvFile(pCxt, pCxt->pComCxt->csvCxt.fp, dataBuf, maxNumOfRows, &numOfRows)); SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData); if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { @@ -1417,12 +1411,38 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa "too many rows in sql, total number of rows should be less than INT32_MAX"); } + if (!taosEOFFile(pCxt->pComCxt->csvCxt.fp)) { + pCxt->pComCxt->needMultiParse = true; + pCxt->pComCxt->csvCxt.tableNo = tableNo; + memcpy(&pCxt->pComCxt->csvCxt.tableName, pTableName, sizeof(SName)); + pCxt->pComCxt->csvCxt.pLastSqlPos = pCxt->pSql; + } + dataBuf->numOfTables = 1; pCxt->totalNum += numOfRows; return TSDB_CODE_SUCCESS; } +static int32_t parseDataFromFile(SInsertParseContext* pCxt, int16_t tableNo, const SName* pTableName, SToken filePath, + STableDataBlocks* dataBuf) { + char filePathStr[TSDB_FILENAME_LEN] = {0}; + if (TK_NK_STRING == filePath.type) { + trimString(filePath.z, filePath.n, filePathStr, sizeof(filePathStr)); + } else { + strncpy(filePathStr, filePath.z, filePath.n); + } + pCxt->pComCxt->csvCxt.fp = taosOpenFile(filePathStr, TD_FILE_READ | TD_FILE_STREAM); + if (NULL == pCxt->pComCxt->csvCxt.fp) { + return TAOS_SYSTEM_ERROR(errno); + } + + return parseDataFromFileAgain(pCxt, tableNo, pTableName, dataBuf); +} + static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) { + if (!pCxt->pComCxt->needMultiParse) { + taosCloseFile(&pCxt->pComCxt->csvCxt.fp); + } taosMemoryFreeClear(pCxt->pTableMeta); destroyBoundColumnInfo(&pCxt->tags); tdDestroySVCreateTbReq(&pCxt->createTblReq); @@ -1481,7 +1501,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return buildSyntaxErrMsg(&pCxt->msg, "invalid charactor in SQL", sToken.z); } - if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT))) { + if (0 == pCxt->totalNum && (!TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && + !pCxt->pComCxt->needMultiParse) { return buildInvalidOperationMsg(&pCxt->msg, "no data in sql"); } break; @@ -1536,7 +1557,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { NEXT_TOKEN(pCxt->pSql, sToken); autoCreateTbl = true; } else if (!existedUsing) { - CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName)); + CHECK_CODE(getTableMeta(pCxt, tbNum, &name)); if (TSDB_SUPER_TABLE == pCxt->pTableMeta->tableType) { return buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported"); } @@ -1577,17 +1598,22 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { if (0 == sToken.n || (TK_NK_STRING != sToken.type && TK_NK_ID != sToken.type)) { return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z); } - CHECK_CODE(parseDataFromFile(pCxt, sToken, dataBuf)); + CHECK_CODE(parseDataFromFile(pCxt, tbNum, &name, sToken, dataBuf)); pCxt->pOutput->insertType = TSDB_QUERY_TYPE_FILE_INSERT; tbNum++; - continue; + if (!pCxt->pComCxt->needMultiParse) { + continue; + } else { + parserInfo("0x%" PRIx64 " insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId); + break; + } } return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", sToken.z); } - qDebug("0x%" PRIx64 " insert input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum); + parserInfo("0x%" PRIx64 " insert input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum); if (TSDB_QUERY_HAS_TYPE(pCxt->pOutput->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags)); @@ -1612,6 +1638,26 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return buildOutput(pCxt); } +static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) { + STableDataBlocks* dataBuf = NULL; + CHECK_CODE(getTableMeta(pCxt, pCxt->pComCxt->csvCxt.tableNo, &pCxt->pComCxt->csvCxt.tableName)); + CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, &pCxt->pTableMeta->uid, sizeof(pCxt->pTableMeta->uid), + TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, + pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq)); + CHECK_CODE(parseDataFromFileAgain(pCxt, pCxt->pComCxt->csvCxt.tableNo, &pCxt->pComCxt->csvCxt.tableName, dataBuf)); + if (taosEOFFile(pCxt->pComCxt->csvCxt.fp)) { + CHECK_CODE(parseInsertBody(pCxt)); + pCxt->pComCxt->needMultiParse = false; + return TSDB_CODE_SUCCESS; + } + parserInfo("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum); + // merge according to vgId + if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { + CHECK_CODE(mergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); + } + return buildOutput(pCxt); +} + // INSERT INTO // tb_name // [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] @@ -1621,7 +1667,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache* pMetaCache) { SInsertParseContext context = { .pComCxt = pContext, - .pSql = (char*)pContext->pSql, + .pSql = pContext->needMultiParse ? (char*)pContext->csvCxt.pLastSqlPos : (char*)pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .pTableMeta = NULL, .createTblReq = {0}, @@ -1691,10 +1737,16 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache context.pOutput->payloadType = PAYLOAD_TYPE_KV; - int32_t code = skipInsertInto(&context.pSql, &context.msg); - if (TSDB_CODE_SUCCESS == code) { - code = parseInsertBody(&context); + int32_t code = TSDB_CODE_SUCCESS; + if (!context.pComCxt->needMultiParse) { + code = skipInsertInto(&context.pSql, &context.msg); + if (TSDB_CODE_SUCCESS == code) { + code = parseInsertBody(&context); + } + } else { + code = parseInsertBodyAgain(&context); } + if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) { SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL); while (NULL != pTable) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 2fe6ebfb79..748478778a 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -214,6 +214,16 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata return code; } +void qDestroyParseContext(SParseContext* pCxt) { + if (NULL == pCxt) { + return; + } + + taosArrayDestroy(pCxt->pTableMetaPos); + taosArrayDestroy(pCxt->pTableVgroupPos); + taosMemoryFree(pCxt); +} + void qDestroyQuery(SQuery* pQueryNode) { nodesDestroyNode((SNode*)pQueryNode); } int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index fcaa5af05c..a9360f796c 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -130,16 +130,16 @@ void generatePerformanceSchema(MockCatalogService* mcs) { * c5 | column | DOUBLE | 8 | */ void generateTestTables(MockCatalogService* mcs, const std::string& db) { - ITableBuilder& builder = mcs->createTableBuilder(db, "t1", TSDB_NORMAL_TABLE, 6) - .setPrecision(TSDB_TIME_PRECISION_MILLI) - .setVgid(1) - .addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP) - .addColumn("c1", TSDB_DATA_TYPE_INT) - .addColumn("c2", TSDB_DATA_TYPE_BINARY, 20) - .addColumn("c3", TSDB_DATA_TYPE_BIGINT) - .addColumn("c4", TSDB_DATA_TYPE_DOUBLE) - .addColumn("c5", TSDB_DATA_TYPE_DOUBLE); - builder.done(); + mcs->createTableBuilder(db, "t1", TSDB_NORMAL_TABLE, 6) + .setPrecision(TSDB_TIME_PRECISION_MILLI) + .setVgid(1) + .addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP) + .addColumn("c1", TSDB_DATA_TYPE_INT) + .addColumn("c2", TSDB_DATA_TYPE_BINARY, 20) + .addColumn("c3", TSDB_DATA_TYPE_BIGINT) + .addColumn("c4", TSDB_DATA_TYPE_DOUBLE) + .addColumn("c5", TSDB_DATA_TYPE_DOUBLE) + .done(); } /* diff --git a/source/libs/parser/test/parTestUtil.cpp b/source/libs/parser/test/parTestUtil.cpp index 14c991917b..bf27fd2e13 100644 --- a/source/libs/parser/test/parTestUtil.cpp +++ b/source/libs/parser/test/parTestUtil.cpp @@ -343,7 +343,6 @@ class ParserTestBaseImpl { unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); doParseSql(&cxt, query.get()); - SQuery* pQuery = *(query.get()); if (g_dump) { dump();