diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index daf008108b..f4ccc05208 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -182,8 +182,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE -#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \ - ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) +#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \ + ((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \ + (_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \ + (_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED) #define NEED_CLIENT_REFRESH_VG_ERROR(_code) \ ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) #define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) @@ -194,7 +196,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_SCHEDULER_RETRY_ERROR(_code) \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) -#define REQUEST_MAX_TRY_TIMES 5 +#define REQUEST_MAX_TRY_TIMES 1 #define qFatal(...) \ do { \ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 1dda4c3024..f493f02cd6 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -13,18 +13,18 @@ * along with this program. If not, see . */ +#include "cJSON.h" #include "clientInt.h" #include "clientLog.h" #include "command.h" #include "scheduler.h" #include "tdatablock.h" +#include "tdataformat.h" #include "tdef.h" #include "tglobal.h" #include "tmsgtype.h" #include "tpagedbuf.h" #include "tref.h" -#include "cJSON.h" -#include "tdataformat.h" static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); @@ -189,7 +189,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision); } - + } + if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) { TSWAP(pRequest->dbList, (*pQuery)->pDbList); TSWAP(pRequest->tableList, (*pQuery)->pTableList); } @@ -293,7 +294,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, - pRequest->metric.start, &res); + pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); @@ -483,7 +484,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { int32_t retryNum = 0; int32_t code = 0; - while (retryNum++ < REQUEST_MAX_TRY_TIMES) { + do { + destroyRequest(pRequest); pRequest = launchQuery(pTscObj, sql, sqlLen); if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; @@ -494,9 +496,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { pRequest->code = code; break; } - - destroyRequest(pRequest); - } + } while (retryNum++ < REQUEST_MAX_TRY_TIMES); return pRequest; } @@ -805,21 +805,20 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { return TSDB_CODE_SUCCESS; } -static char* parseTagDatatoJson(void *p){ - char* string = NULL; - cJSON *json = cJSON_CreateObject(); - if (json == NULL) - { +static char* parseTagDatatoJson(void* p) { + char* string = NULL; + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { goto end; } int16_t nCols = kvRowNCols(p); - char tagJsonKey[256] = {0}; + char tagJsonKey[256] = {0}; for (int j = 0; j < nCols; ++j) { - SColIdx * pColIdx = kvRowColIdxAt(p, j); - char* val = (char*)(kvRowColVal(p, pColIdx)); - if (j == 0){ - if(*val == TSDB_DATA_TYPE_NULL){ + SColIdx* pColIdx = kvRowColIdxAt(p, j); + char* val = (char*)(kvRowColVal(p, pColIdx)); + if (j == 0) { + if (*val == TSDB_DATA_TYPE_NULL) { string = taosMemoryCalloc(1, 8); sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L); varDataSetLen(string, strlen(varDataVal(string))); @@ -834,19 +833,18 @@ static char* parseTagDatatoJson(void *p){ // json value val += varDataTLen(val); char* realData = POINTER_SHIFT(val, CHAR_BYTES); - char type = *val; - if(type == TSDB_DATA_TYPE_NULL) { + char type = *val; + if (type == TSDB_DATA_TYPE_NULL) { cJSON* value = cJSON_CreateNull(); - if (value == NULL) - { + if (value == NULL) { goto end; } cJSON_AddItemToObject(json, tagJsonKey, value); - }else if(type == TSDB_DATA_TYPE_NCHAR) { + } else if (type == TSDB_DATA_TYPE_NCHAR) { cJSON* value = NULL; - if (varDataLen(realData) > 0){ - char *tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1); - int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(realData), varDataLen(realData), tagJsonValue); + if (varDataLen(realData) > 0) { + char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1); + int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue); if (length < 0) { tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val); taosMemoryFree(tagJsonValue); @@ -854,45 +852,41 @@ static char* parseTagDatatoJson(void *p){ } value = cJSON_CreateString(tagJsonValue); taosMemoryFree(tagJsonValue); - if (value == NULL) - { + if (value == NULL) { goto end; } - }else if(varDataLen(realData) == 0){ + } else if (varDataLen(realData) == 0) { value = cJSON_CreateString(""); - }else{ + } else { ASSERT(0); } cJSON_AddItemToObject(json, tagJsonKey, value); - }else if(type == TSDB_DATA_TYPE_DOUBLE){ + } else if (type == TSDB_DATA_TYPE_DOUBLE) { double jsonVd = *(double*)(realData); cJSON* value = cJSON_CreateNumber(jsonVd); - if (value == NULL) - { + if (value == NULL) { goto end; } cJSON_AddItemToObject(json, tagJsonKey, value); -// }else if(type == TSDB_DATA_TYPE_BIGINT){ -// int64_t jsonVd = *(int64_t*)(realData); -// cJSON* value = cJSON_CreateNumber((double)jsonVd); -// if (value == NULL) -// { -// goto end; -// } -// cJSON_AddItemToObject(json, tagJsonKey, value); - }else if (type == TSDB_DATA_TYPE_BOOL) { - char jsonVd = *(char*)(realData); + // }else if(type == TSDB_DATA_TYPE_BIGINT){ + // int64_t jsonVd = *(int64_t*)(realData); + // cJSON* value = cJSON_CreateNumber((double)jsonVd); + // if (value == NULL) + // { + // goto end; + // } + // cJSON_AddItemToObject(json, tagJsonKey, value); + } else if (type == TSDB_DATA_TYPE_BOOL) { + char jsonVd = *(char*)(realData); cJSON* value = cJSON_CreateBool(jsonVd); - if (value == NULL) - { + if (value == NULL) { goto end; } cJSON_AddItemToObject(json, tagJsonKey, value); - }else{ + } else { ASSERT(0); } - } string = cJSON_PrintUnformatted(json); end: @@ -930,7 +924,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i]; pResultInfo->row[i] = pResultInfo->pCol[i].pData; - }else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) { + } else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) { char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -943,7 +937,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int if (pCol->offset[j] != -1) { char* pStart = pCol->offset[j] + pCol->pData; - int32_t jsonInnerType = *pStart; char* jsonInnerData = pStart + CHAR_BYTES; char dst[TSDB_MAX_JSON_TAG_LEN] = {0}; @@ -951,7 +944,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); varDataSetLen(dst, strlen(varDataVal(dst))); } else if (jsonInnerType == TSDB_DATA_TYPE_JSON) { - char *jsonString = parseTagDatatoJson(jsonInnerData); + char* jsonString = parseTagDatatoJson(jsonInnerData); STR_TO_VARSTR(dst, jsonString); taosMemoryFree(jsonString); } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index d1b4a745b9..b452950624 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -41,6 +41,13 @@ sToken = tStrGetToken(pSql, &index, false); \ } while (0) +#define NEXT_VALID_TOKEN(pSql, sToken) \ + do { \ + sToken.n = tGetToken(pSql, &sToken.type); \ + sToken.z = pSql; \ + pSql += sToken.n; \ + } while (TK_NK_SPACE == sToken.type) + typedef struct SInsertParseContext { SParseContext* pComCxt; // input char* pSql; // input @@ -482,9 +489,11 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); } } else if (pToken->type == TK_NK_INTEGER) { - return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); + return func(pMsgBuf, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, + param); } else if (pToken->type == TK_NK_FLOAT) { - return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param); + return func(pMsgBuf, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, + param); } else { return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z); } @@ -685,7 +694,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* isOrdered = false; } if (index < 0) { - return buildSyntaxErrMsg(&pCxt->msg, "invalid column/tag name", sToken.z); + return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, sToken.z); } if (pColList->cols[index].valStat == VAL_STAT_HAS) { return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", sToken.z); @@ -895,8 +904,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); } CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname)); - NEXT_TOKEN(pCxt->pSql, sToken); - if (TK_NK_RP != sToken.type) { + NEXT_VALID_TOKEN(pCxt->pSql, sToken); + if (TK_NK_COMMA == sToken.type) { + return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_TAGS_NOT_MATCHED); + } else if (TK_NK_RP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z); } @@ -996,8 +1007,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo pDataBlock->size += extendedRowSize; // len; } - NEXT_TOKEN(pCxt->pSql, sToken); - if (TK_NK_RP != sToken.type) { + NEXT_VALID_TOKEN(pCxt->pSql, sToken); + if (TK_NK_COMMA == sToken.type) { + return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMNS_NUM); + } else if (TK_NK_RP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, ") expected", sToken.z); } @@ -1057,10 +1070,10 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; static int32_t parseInsertBody(SInsertParseContext* pCxt) { - int32_t tbNum = 0; - char tbFName[TSDB_TABLE_FNAME_LEN]; - bool autoCreateTbl = false; - STableMeta *pMeta = NULL; + int32_t tbNum = 0; + char tbFName[TSDB_TABLE_FNAME_LEN]; + bool autoCreateTbl = false; + STableMeta* pMeta = NULL; // for each table while (1) { @@ -1121,7 +1134,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { &dataBuf, NULL, &pCxt->createTblReq)); pMeta = pCxt->pTableMeta; pCxt->pTableMeta = NULL; - + if (TK_NK_LP == sToken.type) { // pSql -> field1_name, ...) CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta))); @@ -1160,7 +1173,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); - (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); + (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, + pCxt->pTableBlockHashObj); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); pCxt->pVgroupsHashObj = NULL; @@ -1231,14 +1245,14 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } } - + context.pOutput->payloadType = PAYLOAD_TYPE_KV; int32_t code = skipInsertInto(&context); if (TSDB_CODE_SUCCESS == code) { code = parseInsertBody(&context); } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) { SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL); while (NULL != pTable) { taosArrayPush((*pQuery)->pTableList, pTable); @@ -1579,9 +1593,9 @@ typedef struct SmlExecTableHandle { } SmlExecTableHandle; typedef struct SmlExecHandle { - SHashObj* pBlockHash; - SmlExecTableHandle tableExecHandle; - SQuery *pQuery; + SHashObj* pBlockHash; + SmlExecTableHandle tableExecHandle; + SQuery* pQuery; } SSmlExecHandle; static void smlDestroyTableHandle(void* pHandle) { @@ -1673,9 +1687,9 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1 param.schema = pTagSchema; SSmlKv* kv = taosArrayGetP(cols, i); - if(IS_VAR_DATA_TYPE(kv->type)){ + if (IS_VAR_DATA_TYPE(kv->type)) { KvRowAppend(msg, kv->value, kv->length, ¶m); - }else{ + } else { KvRowAppend(msg, &(kv->value), kv->length, ¶m); } } @@ -1688,13 +1702,13 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD return TSDB_CODE_SUCCESS; } -int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols, bool format, - STableMeta *pTableMeta, char *tableName, char *msgBuf, int16_t msgBufLen) { +int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, + char* tableName, char* msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle; - smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table - SSchema* pTagsSchema = getTableTagSchema(pTableMeta); + smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table + SSchema* pTagsSchema = getTableTagSchema(pTableMeta); setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta)); int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema); if (ret != TSDB_CODE_SUCCESS) { @@ -1702,7 +1716,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols return ret; } SKVRow row = NULL; - ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema, &row, &pBuf); + ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tagsBuilder, &smlHandle->tableExecHandle.tags, pTagsSchema, + &row, &pBuf); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -1733,7 +1748,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo); int32_t rowNum = taosArrayGetSize(cols); - if(rowNum <= 0) { + if (rowNum <= 0) { return buildInvalidOperationMsg(&pBuf, "cols size <= 0"); } ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum); @@ -1744,9 +1759,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols for (int32_t r = 0; r < rowNum; ++r) { STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row); - void *rowData = taosArrayGetP(cols, r); + void* rowData = taosArrayGetP(cols, r); size_t rowDataSize = 0; - if(format){ + if (format) { rowDataSize = taosArrayGetSize(rowData); } @@ -1781,9 +1796,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); } - if(IS_VAR_DATA_TYPE(kv->type)){ + if (IS_VAR_DATA_TYPE(kv->type)) { MemRowAppend(&pBuf, kv->value, colLen, ¶m); - }else{ + } else { MemRowAppend(&pBuf, &(kv->value), colLen, ¶m); } } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index dbb29699fc..f6d53dd15a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -120,6 +120,20 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const return getTableMetaImpl(pCxt, toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name), pMeta); } +static int32_t refreshGetTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName, + STableMeta** pMeta) { + SParseContext* pParCxt = pCxt->pParseCxt; + SName name; + toName(pCxt->pParseCxt->acctId, pDbName, pTableName, &name); + int32_t code = + catalogRefreshGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pMeta, false); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName, + pTableName); + } + return code; +} + static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) { SParseContext* pParCxt = pCxt->pParseCxt; int32_t code = collectUseDatabase(pName, pCxt->pDbs); @@ -3201,7 +3215,7 @@ static int32_t translateExplain(STranslateContext* pCxt, SExplainStmt* pStmt) { } static int32_t translateDescribe(STranslateContext* pCxt, SDescribeStmt* pStmt) { - return getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta); + return refreshGetTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pStmt->pMeta); } static int32_t translateKillConnection(STranslateContext* pCxt, SKillStmt* pStmt) {