diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 083d96e975..904c8b1651 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -416,7 +416,7 @@ pipeline { echo "${WKDIR}/restore.sh -p ${BRANCH_NAME} -n ${BUILD_ID} -c {container name}" } catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { - timeout(time: 150, unit: 'MINUTES'){ + timeout(time: 200, unit: 'MINUTES'){ pre_test() script { sh ''' diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 347fb203ef..f56860dd4f 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -335,7 +335,7 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam void destroyQueryExecRes(SExecResult* pRes); int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len); -char* parseTagDatatoJson(void* p); +void parseTagDatatoJson(void* p, char** jsonStr); int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst); void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType); int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9c760a465a..22da6d5390 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -53,7 +53,7 @@ int32_t taosGetErrSize(); #define terrln (*taosGetErrln()) #define SET_ERROR_MSG(MSG, ...) \ - snprintf(terrMsg, ERR_MSG_LEN, MSG, ##__VA_ARGS__) + (void)snprintf(terrMsg, ERR_MSG_LEN, MSG, ##__VA_ARGS__) #define TSDB_CODE_SUCCESS 0 #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error @@ -556,6 +556,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x061B) #define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061C) #define TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE TAOS_DEF_ERROR_CODE(0, 0x061D) +#define TSDB_CODE_TDB_INCONSISTENT_DB_ID TAOS_DEF_ERROR_CODE(0, 0x061E) // query #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index ffbc56415d..3fb61b6902 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2007,7 +2007,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); varDataSetLen(dst, strlen(varDataVal(dst))); } else if (tTagIsJson(data)) { - char* jsonString = parseTagDatatoJson(data); + char* jsonString = NULL; + parseTagDatatoJson(data, &jsonString); STR_TO_VARSTR(dst, jsonString); taosMemoryFree(jsonString); } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index f9fff732e9..2fabf01ef4 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -23,6 +23,31 @@ #include "tglobal.h" #include "tmsgtype.h" +#define RAW_NULL_CHECK(c) \ + do { \ + if (c == NULL) { \ + code = TSDB_CODE_OUT_OF_MEMORY; \ + goto end; \ + } \ + } while (0) + +#define RAW_FALSE_CHECK(c) \ + do { \ + if (!c) { \ + code = TSDB_CODE_INVALID_PARA; \ + goto end; \ + } \ + } while (0) + +#define RAW_RETURN_CHECK(c) \ + do { \ + code = c; \ + if (code != 0) { \ + goto end; \ + } \ + } while (0) + + #define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64 #define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId @@ -32,8 +57,9 @@ static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } -static cJSON* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, - int8_t t, SColCmprWrapper* pColCmprRow) { +static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, + int8_t t, SColCmprWrapper* pColCmprRow, cJSON** pJson) { + int32_t code = TSDB_CODE_SUCCESS; int8_t buildDefaultCompress = 0; if (pColCmprRow->nCols <= 0) { buildDefaultCompress = 1; @@ -41,43 +67,45 @@ static cJSON* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sc char* string = NULL; cJSON* json = cJSON_CreateObject(); - if (json == NULL) { - uError("create json object failed") return NULL; - } + RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("create"); - cJSON_AddItemToObject(json, "type", type); + RAW_NULL_CHECK(type); - // char uid[32] = {0}; - // sprintf(uid, "%"PRIi64, id); - // cJSON* id_ = cJSON_CreateString(uid); - // cJSON_AddItemToObject(json, "id", id_); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); - cJSON_AddItemToObject(json, "tableType", tableType); + RAW_NULL_CHECK(tableType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(name); - cJSON_AddItemToObject(json, "tableName", tableName); - // cJSON* version = cJSON_CreateNumber(1); - // cJSON_AddItemToObject(json, "version", version); + RAW_NULL_CHECK(tableName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); cJSON* columns = cJSON_CreateArray(); + RAW_NULL_CHECK(columns); for (int i = 0; i < schemaRow->nCols; i++) { cJSON* column = cJSON_CreateObject(); + RAW_NULL_CHECK(column); SSchema* s = schemaRow->pSchema + i; cJSON* cname = cJSON_CreateString(s->name); - cJSON_AddItemToObject(column, "name", cname); + RAW_NULL_CHECK(cname); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "name", cname)); cJSON* ctype = cJSON_CreateNumber(s->type); - cJSON_AddItemToObject(column, "type", ctype); + RAW_NULL_CHECK(ctype); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "type", ctype)); if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = s->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(column, "length", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes)); } else if (s->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(column, "length", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes)); } cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY); - cJSON_AddItemToObject(column, "isPrimarykey", isPk); - cJSON_AddItemToArray(columns, column); + RAW_NULL_CHECK(isPk); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "isPrimarykey", isPk)); + RAW_FALSE_CHECK(cJSON_AddItemToArray(columns, column)); if (pColCmprRow == NULL) { continue; @@ -91,177 +119,222 @@ static cJSON* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sc alg = pColCmpr->alg; } const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg)); + RAW_NULL_CHECK(encode); const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg)); + RAW_NULL_CHECK(compress); const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg)); + RAW_NULL_CHECK(level); cJSON* encodeJson = cJSON_CreateString(encode); - cJSON_AddItemToObject(column, "encode", encodeJson); + RAW_NULL_CHECK(encodeJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "encode", encodeJson)); cJSON* compressJson = cJSON_CreateString(compress); - cJSON_AddItemToObject(column, "compress", compressJson); + RAW_NULL_CHECK(compressJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "compress", compressJson)); cJSON* levelJson = cJSON_CreateString(level); - cJSON_AddItemToObject(column, "level", levelJson); + RAW_NULL_CHECK(levelJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "level", levelJson)); } - cJSON_AddItemToObject(json, "columns", columns); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "columns", columns)); cJSON* tags = cJSON_CreateArray(); + RAW_NULL_CHECK(tags); for (int i = 0; schemaTag && i < schemaTag->nCols; i++) { cJSON* tag = cJSON_CreateObject(); + RAW_NULL_CHECK(tag); SSchema* s = schemaTag->pSchema + i; cJSON* tname = cJSON_CreateString(s->name); - cJSON_AddItemToObject(tag, "name", tname); + RAW_NULL_CHECK(tname); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); cJSON* ttype = cJSON_CreateNumber(s->type); - cJSON_AddItemToObject(tag, "type", ttype); + RAW_NULL_CHECK(ttype); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = s->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(tag, "length", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes)); } else if (s->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(tag, "length", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes)); } - cJSON_AddItemToArray(tags, tag); + RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); } - cJSON_AddItemToObject(json, "tags", tags); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); - return json; +end: + *pJson = json; } static int32_t setCompressOption(cJSON* json, uint32_t para) { uint8_t encode = COMPRESS_L1_TYPE_U32(para); + int32_t code = 0; if (encode != 0) { const char* encodeStr = columnEncodeStr(encode); + RAW_NULL_CHECK(encodeStr); cJSON* encodeJson = cJSON_CreateString(encodeStr); - cJSON_AddItemToObject(json, "encode", encodeJson); - return 0; + RAW_NULL_CHECK(encodeJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "encode", encodeJson)); + return code; } uint8_t compress = COMPRESS_L2_TYPE_U32(para); if (compress != 0) { const char* compressStr = columnCompressStr(compress); + RAW_NULL_CHECK(compressStr); cJSON* compressJson = cJSON_CreateString(compressStr); - cJSON_AddItemToObject(json, "compress", compressJson); - return 0; + RAW_NULL_CHECK(compressJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "compress", compressJson)); + return code; } uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para); if (level != 0) { const char* levelStr = columnLevelStr(level); + RAW_NULL_CHECK(levelStr); cJSON* levelJson = cJSON_CreateString(levelStr); - cJSON_AddItemToObject(json, "level", levelJson); - return 0; + RAW_NULL_CHECK(levelJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "level", levelJson)); + return code; } - return 0; + +end: + return code; } -static cJSON* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { +static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) { SMAlterStbReq req = {0}; cJSON* json = NULL; char* string = NULL; + int32_t code = 0; if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) { goto end; } json = cJSON_CreateObject(); - if (json == NULL) { - uError("create json object failed"); - goto end; - } + RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("alter"); - cJSON_AddItemToObject(json, "type", type); - // cJSON* uid = cJSON_CreateNumber(id); - // cJSON_AddItemToObject(json, "uid", uid); + RAW_NULL_CHECK(type); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); SName name = {0}; - tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)); cJSON* tableType = cJSON_CreateString("super"); - cJSON_AddItemToObject(json, "tableType", tableType); + RAW_NULL_CHECK(tableType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(name.tname); - cJSON_AddItemToObject(json, "tableName", tableName); + RAW_NULL_CHECK(tableName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); cJSON* alterType = cJSON_CreateNumber(req.alterType); - cJSON_AddItemToObject(json, "alterType", alterType); + RAW_NULL_CHECK(alterType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType)); switch (req.alterType) { case TSDB_ALTER_TABLE_ADD_TAG: case TSDB_ALTER_TABLE_ADD_COLUMN: { TAOS_FIELD* field = taosArrayGet(req.pFields, 0); + RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(field->type); - cJSON_AddItemToObject(json, "colType", colType); + RAW_NULL_CHECK(colType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = field->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } else if (field->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } break; } case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: { SFieldWithOptions* field = taosArrayGet(req.pFields, 0); + RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(field->type); - cJSON_AddItemToObject(json, "colType", colType); + RAW_NULL_CHECK(colType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = field->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } else if (field->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } - setCompressOption(json, field->compress); + RAW_RETURN_CHECK(setCompressOption(json, field->compress)); break; } case TSDB_ALTER_TABLE_DROP_TAG: case TSDB_ALTER_TABLE_DROP_COLUMN: { TAOS_FIELD* field = taosArrayGet(req.pFields, 0); + RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); break; } case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: { TAOS_FIELD* field = taosArrayGet(req.pFields, 0); + RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(field->type); - cJSON_AddItemToObject(json, "colType", colType); + RAW_NULL_CHECK(colType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = field->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } else if (field->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } break; } case TSDB_ALTER_TABLE_UPDATE_TAG_NAME: case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: { TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0); + RAW_NULL_CHECK(oldField); TAOS_FIELD* newField = taosArrayGet(req.pFields, 1); + RAW_NULL_CHECK(newField); cJSON* colName = cJSON_CreateString(oldField->name); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colNewName = cJSON_CreateString(newField->name); - cJSON_AddItemToObject(json, "colNewName", colNewName); + RAW_NULL_CHECK(colNewName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName)); break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { TAOS_FIELD* field = taosArrayGet(req.pFields, 0); + RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); - cJSON_AddItemToObject(json, "colName", colName); - setCompressOption(json, field->bytes); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_RETURN_CHECK(setCompressOption(json, field->bytes)); break; } default: @@ -270,13 +343,12 @@ static cJSON* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { end: tFreeSMAltertbReq(&req); - return json; + *pJson = json; } -static cJSON* processCreateStb(SMqMetaRsp* metaRsp) { +static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) { SVCreateStbReq req = {0}; SDecoder coder; - cJSON* pJson = NULL; uDebug("create stable data:%p", metaRsp); // decode and process req @@ -285,19 +357,18 @@ static cJSON* processCreateStb(SMqMetaRsp* metaRsp) { tDecoderInit(&coder, data, len); if (tDecodeSVCreateStbReq(&coder, &req) < 0) { - goto _err; + goto end; } - pJson = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr); -_err: - uDebug("create stable return, sql json:%s", cJSON_PrintUnformatted(pJson)); + buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson); + +end: + uDebug("create stable return, sql json:%s", cJSON_PrintUnformatted(*pJson)); tDecoderClear(&coder); - return pJson; } -static cJSON* processAlterStb(SMqMetaRsp* metaRsp) { +static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) { SVCreateStbReq req = {0}; - SDecoder coder; - cJSON* pJson = NULL; + SDecoder coder = {0}; uDebug("alter stable data:%p", metaRsp); // decode and process req @@ -306,13 +377,13 @@ static cJSON* processAlterStb(SMqMetaRsp* metaRsp) { tDecoderInit(&coder, data, len); if (tDecodeSVCreateStbReq(&coder, &req) < 0) { - goto _err; + goto end; } - pJson = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); -_err: - uDebug("alter stable return, sql json:%s", cJSON_PrintUnformatted(pJson)); + buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson); + +end: + uDebug("alter stable return, sql json:%s", cJSON_PrintUnformatted(*pJson)); tDecoderClear(&coder); - return pJson; } static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { @@ -322,23 +393,22 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { SArray* tagName = pCreateReq->ctb.tagName; int64_t id = pCreateReq->uid; uint8_t tagNum = pCreateReq->ctb.tagNum; + int32_t code = 0; cJSON* tableName = cJSON_CreateString(name); - cJSON_AddItemToObject(json, "tableName", tableName); + RAW_NULL_CHECK(tableName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); cJSON* using = cJSON_CreateString(sname); - cJSON_AddItemToObject(json, "using", using); + RAW_NULL_CHECK(using); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "using", using)); cJSON* tagNumJson = cJSON_CreateNumber(tagNum); - cJSON_AddItemToObject(json, "tagNum", tagNumJson); - // cJSON* version = cJSON_CreateNumber(1); - // cJSON_AddItemToObject(json, "version", version); + RAW_NULL_CHECK(tagNumJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tagNum", tagNumJson)); cJSON* tags = cJSON_CreateArray(); + RAW_NULL_CHECK(tags); SArray* pTagVals = NULL; - int32_t code = tTagToValArray(pTag, &pTagVals); - if (code) { - uError("tTagToValArray failed code:%d", code); - goto end; - } + RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals)); if (tTagIsJson(pTag)) { STag* p = (STag*)pTag; @@ -346,36 +416,41 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { uError("p->nTag == 0"); goto end; } - char* pJson = parseTagDatatoJson(pTag); + char* pJson = NULL; + parseTagDatatoJson(pTag, &pJson); cJSON* tag = cJSON_CreateObject(); + RAW_NULL_CHECK(tag); STagVal* pTagVal = taosArrayGet(pTagVals, 0); - + RAW_NULL_CHECK(pTagVal); char* ptname = taosArrayGet(tagName, 0); + RAW_NULL_CHECK(ptname); cJSON* tname = cJSON_CreateString(ptname); - cJSON_AddItemToObject(tag, "name", tname); - // cJSON* cid_ = cJSON_CreateString(""); - // cJSON_AddItemToObject(tag, "cid", cid_); + RAW_NULL_CHECK(tname); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON); - cJSON_AddItemToObject(tag, "type", ttype); + RAW_NULL_CHECK(ttype); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); cJSON* tvalue = cJSON_CreateString(pJson); - cJSON_AddItemToObject(tag, "value", tvalue); - cJSON_AddItemToArray(tags, tag); + RAW_NULL_CHECK(tvalue); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue)); + RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); taosMemoryFree(pJson); goto end; } for (int i = 0; i < taosArrayGetSize(pTagVals); i++) { STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); - + RAW_NULL_CHECK(pTagVal); cJSON* tag = cJSON_CreateObject(); - + RAW_NULL_CHECK(tag); char* ptname = taosArrayGet(tagName, i); + RAW_NULL_CHECK(ptname); cJSON* tname = cJSON_CreateString(ptname); - cJSON_AddItemToObject(tag, "name", tname); - // cJSON* cid = cJSON_CreateNumber(pTagVal->cid); - // cJSON_AddItemToObject(tag, "cid", cid); + RAW_NULL_CHECK(tname); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); cJSON* ttype = cJSON_CreateNumber(pTagVal->type); - cJSON_AddItemToObject(tag, "type", ttype); + RAW_NULL_CHECK(ttype); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); cJSON* tvalue = NULL; if (IS_VAR_DATA_TYPE(pTagVal->type)) { @@ -385,134 +460,141 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { } else { buf = taosMemoryCalloc(pTagVal->nData + 3, 1); } - - if (!buf) goto end; - dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL); + RAW_NULL_CHECK(buf); + RAW_RETURN_CHECK(dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL)); tvalue = cJSON_CreateString(buf); + RAW_NULL_CHECK(tvalue); taosMemoryFree(buf); } else { double val = 0; GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64); tvalue = cJSON_CreateNumber(val); + RAW_NULL_CHECK(tvalue); } - cJSON_AddItemToObject(tag, "value", tvalue); - cJSON_AddItemToArray(tags, tag); + RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue)); + RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); } end: - cJSON_AddItemToObject(json, "tags", tags); - taosArrayDestroy(pTagVals); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); + (void)taosArrayDestroy(pTagVals); } -static cJSON* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { +static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) { + int32_t code = 0; char* string = NULL; cJSON* json = cJSON_CreateObject(); - if (json == NULL) { - uError("create json object failed"); - return NULL; - } + RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("create"); - cJSON_AddItemToObject(json, "type", type); - // char cid[32] = {0}; - // sprintf(cid, "%"PRIi64, id); - // cJSON* cid_ = cJSON_CreateString(cid); - // cJSON_AddItemToObject(json, "id", cid_); + RAW_NULL_CHECK(type); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); cJSON* tableType = cJSON_CreateString("child"); - cJSON_AddItemToObject(json, "tableType", tableType); + RAW_NULL_CHECK(tableType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); buildChildElement(json, pCreateReq); cJSON* createList = cJSON_CreateArray(); + RAW_NULL_CHECK(createList); for (int i = 0; nReqs > 1 && i < nReqs; i++) { cJSON* create = cJSON_CreateObject(); + RAW_NULL_CHECK(create); buildChildElement(create, pCreateReq + i); - cJSON_AddItemToArray(createList, create); + RAW_FALSE_CHECK(cJSON_AddItemToArray(createList, create)); } - cJSON_AddItemToObject(json, "createList", createList); - return json; + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList)); + +end: + *pJson = json; } -static cJSON* processCreateTable(SMqMetaRsp* metaRsp) { +static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) { SDecoder decoder = {0}; SVCreateTbBatchReq req = {0}; SVCreateTbReq* pCreateReq; - cJSON* pJson = NULL; // decode uDebug("create table data:%p", metaRsp); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); tDecoderInit(&decoder, data, len); if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) { - goto _exit; + goto end; } // loop to create table if (req.nReqs > 0) { pCreateReq = req.pReqs; if (pCreateReq->type == TSDB_CHILD_TABLE) { - pJson = buildCreateCTableJson(req.pReqs, req.nReqs); + buildCreateCTableJson(req.pReqs, req.nReqs, pJson); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - pJson = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, - TSDB_NORMAL_TABLE, &pCreateReq->colCmpr); + buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, + TSDB_NORMAL_TABLE, &pCreateReq->colCmpr, pJson); } } -_exit: - uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(pJson)); +end: + uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(*pJson)); tDeleteSVCreateTbBatchReq(&req); tDecoderClear(&decoder); - return pJson; } -static char* processAutoCreateTable(STaosxRsp* rsp) { +static void processAutoCreateTable(STaosxRsp* rsp, char** string) { + SDecoder* decoder = NULL; + SVCreateTbReq* pCreateReq = NULL; + int32_t code = 0; uDebug("auto create table data:%p", rsp); if (rsp->createTableNum <= 0) { uError("processAutoCreateTable rsp->createTableNum <= 0"); - goto _exit; + goto end; } - SDecoder* decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder)); - SVCreateTbReq* pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq)); - char* string = NULL; + decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder)); + RAW_NULL_CHECK(decoder); + pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq)); + RAW_NULL_CHECK(pCreateReq); // loop to create table for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) { // decode void** data = taosArrayGet(rsp->createTableReq, iReq); + RAW_NULL_CHECK(data); int32_t* len = taosArrayGet(rsp->createTableLen, iReq); + RAW_NULL_CHECK(len); tDecoderInit(&decoder[iReq], *data, *len); if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) { - goto _exit; + goto end; } if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) { uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE"); - goto _exit; + goto end; } } - cJSON* pJson = buildCreateCTableJson(pCreateReq, rsp->createTableNum); - string = cJSON_PrintUnformatted(pJson); + cJSON* pJson = NULL; + buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson); + *string = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); -_exit: - uDebug("auto created table return, sql json:%s", string); - for (int i = 0; i < rsp->createTableNum; i++) { + +end: + uDebug("auto created table return, sql json:%s", *string); + for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); taosMemoryFreeClear(pCreateReq[i].comment); if (pCreateReq[i].type == TSDB_CHILD_TABLE) { - taosArrayDestroy(pCreateReq[i].ctb.tagName); + (void)taosArrayDestroy(pCreateReq[i].ctb.tagName); } } taosMemoryFree(decoder); taosMemoryFree(pCreateReq); - return string; } -static cJSON* processAlterTable(SMqMetaRsp* metaRsp) { +static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { SDecoder decoder = {0}; SVAlterTbReq vAlterTbReq = {0}; char* string = NULL; cJSON* json = NULL; + int32_t code = 0; uDebug("alter table data:%p", metaRsp); // decode @@ -521,95 +603,110 @@ static cJSON* processAlterTable(SMqMetaRsp* metaRsp) { tDecoderInit(&decoder, data, len); if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) { uError("tDecodeSVAlterTbReq error"); - goto _exit; + goto end; } json = cJSON_CreateObject(); - if (json == NULL) { - uError("create json object failed"); - goto _exit; - } + RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("alter"); - cJSON_AddItemToObject(json, "type", type); - // cJSON* uid = cJSON_CreateNumber(id); - // cJSON_AddItemToObject(json, "uid", uid); + RAW_NULL_CHECK(type); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal"); - cJSON_AddItemToObject(json, "tableType", tableType); + RAW_NULL_CHECK(tableType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); - cJSON_AddItemToObject(json, "tableName", tableName); + RAW_NULL_CHECK(tableName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action); - cJSON_AddItemToObject(json, "alterType", alterType); + RAW_NULL_CHECK(alterType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType)); switch (vAlterTbReq.action) { case TSDB_ALTER_TABLE_ADD_COLUMN: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); - cJSON_AddItemToObject(json, "colType", colType); + RAW_NULL_CHECK(colType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } break; } case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); - cJSON_AddItemToObject(json, "colType", colType); + RAW_NULL_CHECK(colType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } - setCompressOption(json, vAlterTbReq.compress); + RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress)); break; } case TSDB_ALTER_TABLE_DROP_COLUMN: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType); - cJSON_AddItemToObject(json, "colType", colType); + RAW_NULL_CHECK(colType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) { int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); - cJSON_AddItemToObject(json, "colLength", cbytes); + RAW_NULL_CHECK(cbytes); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); } break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); - cJSON_AddItemToObject(json, "colName", colName); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName); - cJSON_AddItemToObject(json, "colNewName", colNewName); + RAW_NULL_CHECK(colNewName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName)); break; } case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: { cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName); - cJSON_AddItemToObject(json, "colName", tagName); + RAW_NULL_CHECK(tagName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", tagName)); bool isNull = vAlterTbReq.isNull; if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) { @@ -622,47 +719,53 @@ static cJSON* processAlterTable(SMqMetaRsp* metaRsp) { if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) { if (!tTagIsJson(vAlterTbReq.pTagVal)) { uError("processAlterTable isJson false"); - goto _exit; + goto end; } - buf = parseTagDatatoJson(vAlterTbReq.pTagVal); + parseTagDatatoJson(vAlterTbReq.pTagVal, &buf); } else { if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) { buf = taosMemoryCalloc(vAlterTbReq.nTagVal * 2 + 2 + 3, 1); } else { buf = taosMemoryCalloc(vAlterTbReq.nTagVal + 3, 1); } - dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL); + RAW_NULL_CHECK(buf); + RAW_RETURN_CHECK(dataConverToStr(buf, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL)); } cJSON* colValue = cJSON_CreateString(buf); - cJSON_AddItemToObject(json, "colValue", colValue); + RAW_NULL_CHECK(colValue); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValue", colValue)); taosMemoryFree(buf); } cJSON* isNullCJson = cJSON_CreateBool(isNull); - cJSON_AddItemToObject(json, "colValueNull", isNullCJson); + RAW_NULL_CHECK(isNullCJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValueNull", isNullCJson)); break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); - cJSON_AddItemToObject(json, "colName", colName); - setCompressOption(json, vAlterTbReq.compress); + RAW_NULL_CHECK(colName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress)); break; } default: break; } -_exit: +end: uDebug("alter table return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return json; + *pJson = json; } -static cJSON* processDropSTable(SMqMetaRsp* metaRsp) { +static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) { SDecoder decoder = {0}; SVDropStbReq req = {0}; cJSON* json = NULL; + int32_t code = 0; + uDebug("processDropSTable data:%p", metaRsp); // decode @@ -671,30 +774,31 @@ static cJSON* processDropSTable(SMqMetaRsp* metaRsp) { tDecoderInit(&decoder, data, len); if (tDecodeSVDropStbReq(&decoder, &req) < 0) { uError("tDecodeSVDropStbReq failed"); - goto _exit; + goto end; } json = cJSON_CreateObject(); - if (json == NULL) { - uError("create json object failed"); - goto _exit; - } + RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("drop"); - cJSON_AddItemToObject(json, "type", type); + RAW_NULL_CHECK(type); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); cJSON* tableType = cJSON_CreateString("super"); - cJSON_AddItemToObject(json, "tableType", tableType); + RAW_NULL_CHECK(tableType); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(req.name); - cJSON_AddItemToObject(json, "tableName", tableName); + RAW_NULL_CHECK(tableName); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); -_exit: +end: uDebug("processDropSTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return json; + *pJson = json; } -static cJSON* processDeleteTable(SMqMetaRsp* metaRsp) { +static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) { SDeleteRes req = {0}; SDecoder coder = {0}; cJSON* json = NULL; + int32_t code = 0; uDebug("processDeleteTable data:%p", metaRsp); // decode and process req @@ -704,34 +808,34 @@ static cJSON* processDeleteTable(SMqMetaRsp* metaRsp) { tDecoderInit(&coder, data, len); if (tDecodeDeleteRes(&coder, &req) < 0) { uError("tDecodeDeleteRes failed"); - goto _exit; + goto end; } // getTbName(req.tableFName); char sql[256] = {0}; - snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, + (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); json = cJSON_CreateObject(); - if (json == NULL) { - uError("creaet json object failed"); - goto _exit; - } + RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("delete"); - cJSON_AddItemToObject(json, "type", type); + RAW_NULL_CHECK(type); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); cJSON* sqlJson = cJSON_CreateString(sql); - cJSON_AddItemToObject(json, "sql", sqlJson); + RAW_NULL_CHECK(sqlJson); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", sqlJson)); -_exit: +end: uDebug("processDeleteTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&coder); - return json; + *pJson = json; } -static cJSON* processDropTable(SMqMetaRsp* metaRsp) { +static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) { SDecoder decoder = {0}; SVDropTbBatchReq req = {0}; cJSON* json = NULL; + int32_t code = 0; uDebug("processDropTable data:%p", metaRsp); // decode @@ -740,40 +844,33 @@ static cJSON* processDropTable(SMqMetaRsp* metaRsp) { tDecoderInit(&decoder, data, len); if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) { uError("tDecodeSVDropTbBatchReq failed"); - goto _exit; + goto end; } json = cJSON_CreateObject(); - if (json == NULL) { - uError("create json object failed"); - goto _exit; - } + RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("drop"); - cJSON_AddItemToObject(json, "type", type); - // cJSON* uid = cJSON_CreateNumber(id); - // cJSON_AddItemToObject(json, "uid", uid); - // cJSON* tableType = cJSON_CreateString("normal"); - // cJSON_AddItemToObject(json, "tableType", tableType); - + RAW_NULL_CHECK(type); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); cJSON* tableNameList = cJSON_CreateArray(); + RAW_NULL_CHECK(tableNameList); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq* pDropTbReq = req.pReqs + iReq; - cJSON* tableName = cJSON_CreateString(pDropTbReq->name); - cJSON_AddItemToArray(tableNameList, tableName); + RAW_NULL_CHECK(tableName); + RAW_FALSE_CHECK(cJSON_AddItemToArray(tableNameList, tableName)); } - cJSON_AddItemToObject(json, "tableNameList", tableNameList); + RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableNameList", tableNameList)); -_exit: +end: uDebug("processDropTable return, json sql:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return json; + *pJson = json; } static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { if (taos == NULL || meta == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SVCreateStbReq req = {0}; SDecoder coder; @@ -781,11 +878,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { int32_t code = TSDB_CODE_SUCCESS; SRequestObj* pRequest = NULL; - code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return code; - } + RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0)); uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; if (!pRequest->pDb) { @@ -808,25 +901,27 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { } // build create stable pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions)); + RAW_NULL_CHECK (pReq.pColumns); for (int32_t i = 0; i < req.schemaRow.nCols; i++) { SSchema* pSchema = req.schemaRow.pSchema + i; SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes}; - strcpy(field.name, pSchema->name); + (void)strcpy(field.name, pSchema->name); if (createDefaultCompress) { field.compress = createDefaultColCmprByType(pSchema->type); } else { - SColCmpr* p = &req.colCmpr.pColCmpr[i]; - field.compress = p->alg; + SColCmpr* pCmp = &req.colCmpr.pColCmpr[i]; + field.compress = pCmp->alg; } - taosArrayPush(pReq.pColumns, &field); + RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field)); } pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField)); + RAW_NULL_CHECK(pReq.pTags); for (int32_t i = 0; i < req.schemaTag.nCols; i++) { SSchema* pSchema = req.schemaTag.pSchema + i; SField field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes}; - strcpy(field.name, pSchema->name); - taosArrayPush(pReq.pTags, &field); + (void)strcpy(field.name, pSchema->name); + RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field)); } pReq.colVer = req.schemaRow.version; @@ -841,19 +936,22 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid, pReq.suid); STscObj* pTscObj = pRequest->pTscObj; - SName tableName; - tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); - + SName tableName = {0}; + RAW_RETURN_CHECK(tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name)); SCmdMsgInfo pCmdMsg = {0}; pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); pCmdMsg.msgType = TDMT_MND_CREATE_STB; pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq); - pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen); - if (NULL == pCmdMsg.pMsg) { - code = TSDB_CODE_OUT_OF_MEMORY; + if (pCmdMsg.msgLen <= 0) { + code = TSDB_CODE_INVALID_PARA; + goto end; + } + pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen); + RAW_NULL_CHECK(pCmdMsg.pMsg); + if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0){ + code = TSDB_CODE_INVALID_PARA; goto end; } - tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq); SQuery pQuery = {0}; pQuery.execMode = QUERY_EXEC_MODE_RPC; @@ -861,12 +959,13 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { pQuery.msgType = pQuery.pCmdMsg->msgType; pQuery.stableQuery = true; - launchQueryImpl(pRequest, &pQuery, true, NULL); + (void)launchQueryImpl(pRequest, &pQuery, true, NULL); //ignore, because return value is pRequest if (pRequest->code == TSDB_CODE_SUCCESS) { SCatalog* pCatalog = NULL; - catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - catalogRemoveTableMeta(pCatalog, &tableName); + // ignore the return value + (void)catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + (void)catalogRemoveTableMeta(pCatalog, &tableName); } code = pRequest->code; @@ -877,14 +976,12 @@ end: destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); tDecoderClear(&coder); - terrno = code; return code; } static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { if (taos == NULL || meta == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SVDropStbReq req = {0}; SDecoder coder = {0}; @@ -892,12 +989,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { int32_t code = TSDB_CODE_SUCCESS; SRequestObj* pRequest = NULL; - code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return code; - } - + RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0)); uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; if (!pRequest->pDb) { @@ -914,16 +1006,13 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { } SCatalog* pCatalog = NULL; - code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)}; SName pName = {0}; - toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName); + (void)toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName); // ignore the return value, always return pName STableMeta* pTableMeta = NULL; code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { @@ -946,18 +1035,26 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { pReq.suid); STscObj* pTscObj = pRequest->pTscObj; SName tableName = {0}; - tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); + if (tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name) != 0) { + code = TSDB_CODE_INVALID_PARA; + goto end; + } SCmdMsgInfo pCmdMsg = {0}; pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); pCmdMsg.msgType = TDMT_MND_DROP_STB; pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq); - pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen); - if (NULL == pCmdMsg.pMsg) { - code = TSDB_CODE_OUT_OF_MEMORY; + if (pCmdMsg.msgLen <= 0) { + code = TSDB_CODE_INVALID_PARA; goto end; } - tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq); + pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen); + RAW_NULL_CHECK(pCmdMsg.pMsg); + if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0){ + code = TSDB_CODE_INVALID_PARA; + goto end; + } + SQuery pQuery = {0}; pQuery.execMode = QUERY_EXEC_MODE_RPC; @@ -965,12 +1062,12 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { pQuery.msgType = pQuery.pCmdMsg->msgType; pQuery.stableQuery = true; - launchQueryImpl(pRequest, &pQuery, true, NULL); + (void)launchQueryImpl(pRequest, &pQuery, true, NULL); //ignore, because return value is pRequest if (pRequest->code == TSDB_CODE_SUCCESS) { - // SCatalog* pCatalog = NULL; - catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - catalogRemoveTableMeta(pCatalog, &tableName); + // ignore the error code + (void)catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + (void)catalogRemoveTableMeta(pCatalog, &tableName); } code = pRequest->code; @@ -980,7 +1077,6 @@ end: uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code)); destroyRequest(pRequest); tDecoderClear(&coder); - terrno = code; return code; } @@ -992,13 +1088,12 @@ typedef struct SVgroupCreateTableBatch { static void destroyCreateTbReqBatch(void* data) { SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data; - taosArrayDestroy(pTbBatch->req.pArray); + (void)taosArrayDestroy(pTbBatch->req.pArray); } static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { if (taos == NULL || meta == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SVCreateTbBatchReq req = {0}; SDecoder coder = {0}; @@ -1007,12 +1102,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { SQuery* pQuery = NULL; SHashObj* pVgroupHashmap = NULL; - code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return code; - } - + RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0)); uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; @@ -1033,16 +1123,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { SVCreateTbReq* pCreateReq = NULL; SCatalog* pCatalog = NULL; - code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - + RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog)); pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (NULL == pVgroupHashmap) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } + RAW_NULL_CHECK(pVgroupHashmap); taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, @@ -1051,13 +1134,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); + RAW_NULL_CHECK(pRequest->tableList); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; SVgroupInfo pInfo = {0}; SName pName = {0}; - toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName); + (void)toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName); code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -1070,7 +1154,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { SName sName = {0}; tb_uid_t oldSuid = pCreateReq->ctb.suid; // pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb); - toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName); + (void)toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { code = TSDB_CODE_SUCCESS; @@ -1085,6 +1169,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) { char* tName = taosArrayGet(pCreateReq->ctb.tagName, i); + if (tName == NULL) { + continue; + } for (int32_t j = pTableMeta->tableInfo.numOfColumns; j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) { SSchema* tag = &pTableMeta->schema[j]; @@ -1095,21 +1182,21 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { } taosMemoryFreeClear(pTableMeta); } - taosArrayPush(pRequest->tableList, &pName); + RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName)); SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); if (pTableBatch == NULL) { SVgroupCreateTableBatch tBatch = {0}; tBatch.info = pInfo; - strcpy(tBatch.dbName, pRequest->pDb); + (void)strcpy(tBatch.dbName, pRequest->pDb); tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); - taosArrayPush(tBatch.req.pArray, pCreateReq); + RAW_NULL_CHECK(tBatch.req.pArray); + RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq)); tBatch.req.source = TD_REQ_FROM_TAOX; - - taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)); + RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch))); } else { // add to the correct vgroup - taosArrayPush(pTableBatch->req.pArray, pCreateReq); + RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq)); } } @@ -1117,25 +1204,20 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } SArray* pBufArray = NULL; - code = serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray); - if (TSDB_CODE_SUCCESS != code) { - goto end; - } - + RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray)); pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + RAW_NULL_CHECK(pQuery); pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->msgType = TDMT_VND_CREATE_TABLE; pQuery->stableQuery = false; pQuery->pRoot = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT); + RAW_NULL_CHECK(pQuery->pRoot); - code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray)); - launchQueryImpl(pRequest, pQuery, true, NULL); + (void)launchQueryImpl(pRequest, pQuery, true, NULL); if (pRequest->code == TSDB_CODE_SUCCESS) { - removeMeta(pTscObj, pRequest->tableList, false); + (void)removeMeta(pTscObj, pRequest->tableList, false); } code = pRequest->code; @@ -1148,7 +1230,6 @@ end: destroyRequest(pRequest); tDecoderClear(&coder); qDestroyQuery(pQuery); - terrno = code; return code; } @@ -1160,13 +1241,12 @@ typedef struct SVgroupDropTableBatch { static void destroyDropTbReqBatch(void* data) { SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data; - taosArrayDestroy(pTbBatch->req.pArray); + (void)taosArrayDestroy(pTbBatch->req.pArray); } static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { if (taos == NULL || meta == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SVDropTbBatchReq req = {0}; SDecoder coder = {0}; @@ -1175,11 +1255,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { SQuery* pQuery = NULL; SHashObj* pVgroupHashmap = NULL; - code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return code; - } + RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0)); uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; @@ -1200,16 +1276,10 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { SVDropTbReq* pDropReq = NULL; SCatalog* pCatalog = NULL; - code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog)); pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (NULL == pVgroupHashmap) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } + RAW_NULL_CHECK(pVgroupHashmap); taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, @@ -1217,6 +1287,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { .requestObjRefId = pRequest->self, .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); + RAW_NULL_CHECK(pRequest->tableList); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; @@ -1225,11 +1296,8 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { SVgroupInfo pInfo = {0}; SName pName = {0}; - toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName); - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + (void)toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName); + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo)); STableMeta* pTableMeta = NULL; code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); @@ -1247,17 +1315,17 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid, pDropReq->suid); - taosArrayPush(pRequest->tableList, &pName); + RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName)); SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); if (pTableBatch == NULL) { SVgroupDropTableBatch tBatch = {0}; tBatch.info = pInfo; tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq)); - taosArrayPush(tBatch.req.pArray, pDropReq); - - taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)); + RAW_NULL_CHECK(tBatch.req.pArray); + RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq)); + RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch))); } else { // add to the correct vgroup - taosArrayPush(pTableBatch->req.pArray, pDropReq); + RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq)); } } @@ -1265,25 +1333,19 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } SArray* pBufArray = NULL; - code = serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray); - if (TSDB_CODE_SUCCESS != code) { - goto end; - } - + RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray)); pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + RAW_NULL_CHECK(pQuery); pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->msgType = TDMT_VND_DROP_TABLE; pQuery->stableQuery = false; pQuery->pRoot = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT); + RAW_NULL_CHECK(pQuery->pRoot); + RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray)); - code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - launchQueryImpl(pRequest, pQuery, true, NULL); + (void)launchQueryImpl(pRequest, pQuery, true, NULL); if (pRequest->code == TSDB_CODE_SUCCESS) { - removeMeta(pTscObj, pRequest->tableList, false); + (void)removeMeta(pTscObj, pRequest->tableList, false); } code = pRequest->code; @@ -1293,46 +1355,12 @@ end: destroyRequest(pRequest); tDecoderClear(&coder); qDestroyQuery(pQuery); - terrno = code; return code; } -// delete from db.tabl where .. -> delete from tabl where .. -// delete from db .tabl where .. -> delete from tabl where .. -// static void getTbName(char *sql){ -// char *ch = sql; -// -// bool inBackQuote = false; -// int8_t dotIndex = 0; -// while(*ch != '\0'){ -// if(!inBackQuote && *ch == '`'){ -// inBackQuote = true; -// ch++; -// continue; -// } -// -// if(inBackQuote && *ch == '`'){ -// inBackQuote = false; -// ch++; -// -// continue; -// } -// -// if(!inBackQuote && *ch == '.'){ -// dotIndex ++; -// if(dotIndex == 2){ -// memmove(sql, ch + 1, strlen(ch + 1) + 1); -// break; -// } -// } -// ch++; -// } -//} - static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { if (taos == NULL || meta == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SDeleteRes req = {0}; SDecoder coder = {0}; @@ -1350,11 +1378,11 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - // getTbName(req.tableFName); - snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, + (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX); + RAW_NULL_CHECK(res); SRequestObj* pRequest = (SRequestObj*)res; code = pRequest->code; if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) { @@ -1365,14 +1393,12 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { end: uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code)); tDecoderClear(&coder); - terrno = code; return code; } static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { if (taos == NULL || meta == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SVAlterTbReq req = {0}; SDecoder dcoder = {0}; @@ -1382,11 +1408,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SArray* pArray = NULL; SVgDataBlocks* pVgData = NULL; - code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return code; - } + RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0)); uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; if (!pRequest->pDb) { @@ -1409,11 +1431,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { STscObj* pTscObj = pRequest->pTscObj; SCatalog* pCatalog = NULL; - code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - + RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self, @@ -1421,23 +1439,13 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SVgroupInfo pInfo = {0}; SName pName = {0}; - toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName); - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - + (void)toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName); + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo)); pArray = taosArrayInit(1, sizeof(void*)); - if (NULL == pArray) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } + RAW_NULL_CHECK(pArray); pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); - if (NULL == pVgData) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } + RAW_NULL_CHECK(pVgData); pVgData->vg = pInfo; int tlen = 0; @@ -1449,10 +1457,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } tlen += sizeof(SMsgHead); void* pMsg = taosMemoryMalloc(tlen); - if (NULL == pMsg) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } + RAW_NULL_CHECK(pMsg); ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId); ((SMsgHead*)pMsg)->contLen = htonl(tlen); void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); @@ -1470,24 +1475,19 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { pVgData->size = tlen; pVgData->numOfTables = 1; - taosArrayPush(pArray, &pVgData); + RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData)); pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); - if (NULL == pQuery) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } + RAW_NULL_CHECK(pQuery); pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->msgType = TDMT_VND_ALTER_TABLE; pQuery->stableQuery = false; pQuery->pRoot = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT); + RAW_NULL_CHECK(pQuery->pRoot); + RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray)); - code = rewriteToVnodeModifyOpStmt(pQuery, pArray); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - launchQueryImpl(pRequest, pQuery, true, NULL); + (void)launchQueryImpl(pRequest, pQuery, true, NULL); pVgData = NULL; pArray = NULL; @@ -1504,13 +1504,12 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } end: uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code)); - taosArrayDestroy(pArray); + (void)taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); destroyRequest(pRequest); tDecoderClear(&dcoder); qDestroyQuery(pQuery); - terrno = code; return code; } @@ -1522,8 +1521,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields, int numFields, int64_t reqid) { if (!taos || !pData || !tbname) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; @@ -1531,9 +1529,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat SHashObj* pVgHash = NULL; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid); - if (!pRequest) { - return terrno; - } + RAW_NULL_CHECK(pRequest); uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE, rows, pData, tbname, fields, numFields); @@ -1549,10 +1545,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat tstrncpy(pName.tname, tbname, sizeof(pName.tname)); struct SCatalog* pCatalog = NULL; - code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {0}; conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; @@ -1561,35 +1554,16 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); SVgroupInfo vgData = {0}; - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - // uError("td23101 0vgId:%d, vgId:%d, name:%s, uid:%"PRIu64, vgData.vgId, pTableMeta->vgId, tbname, pTableMeta->uid); - - code = smlInitHandle(&pQuery); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData)); + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); + RAW_RETURN_CHECK(smlInitHandle(&pQuery)); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); + RAW_NULL_CHECK(pVgHash); + RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData))); + RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0)); + RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); - code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - code = smlBuildOutput(pQuery, pVgHash); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - launchQueryImpl(pRequest, pQuery, true, NULL); + (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; end: @@ -1598,7 +1572,6 @@ end: qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); - terrno = code; return code; } @@ -1608,8 +1581,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) { if (!taos || !pData || !tbname) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; @@ -1617,9 +1589,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha SHashObj* pVgHash = NULL; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, reqid); - if (!pRequest) { - return terrno; - } + RAW_NULL_CHECK(pRequest); uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname); @@ -1634,10 +1604,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha tstrncpy(pName.tname, tbname, sizeof(pName.tname)); struct SCatalog* pCatalog = NULL; - code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {0}; conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; @@ -1646,34 +1613,16 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); SVgroupInfo vgData = {0}; - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - code = smlInitHandle(&pQuery); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData)); + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); + RAW_RETURN_CHECK(smlInitHandle(&pQuery)); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); + RAW_NULL_CHECK(pVgHash); + RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData))); + RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0)); + RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); - code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - code = smlBuildOutput(pQuery, pVgHash); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - - launchQueryImpl(pRequest, pQuery, true, NULL); + (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; end: @@ -1682,7 +1631,6 @@ end: qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); - terrno = code; return code; } @@ -1700,9 +1648,8 @@ static void* getRawDataFromRes(void* pRetrieve) { static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { if (taos == NULL || data == NULL) { - terrno = TSDB_CODE_INVALID_PARA; SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); - return terrno; + return TSDB_CODE_INVALID_PARA; } int32_t code = TSDB_CODE_SUCCESS; SHashObj* pVgHash = NULL; @@ -1711,12 +1658,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; - terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); - if (!pRequest) { - SET_ERROR_MSG("pRequest is NULL"); - return terrno; - } + RAW_NULL_CHECK(pRequest); uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; @@ -1742,11 +1685,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { } struct SCatalog* pCatalog = NULL; - code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("cata log get handle failed"); - goto end; - } + RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {0}; conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; @@ -1754,60 +1693,37 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - code = smlInitHandle(&pQuery); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("init sml handle failed"); - goto end; - } - + RAW_RETURN_CHECK(smlInitHandle(&pQuery)); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pVgHash); while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) { void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter); + RAW_NULL_CHECK(pRetrieve); if (!rspObj.rsp.common.withSchema) { goto end; } const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter); - if (!tbName) { - SET_ERROR_MSG("block tbname is null"); - code = TSDB_CODE_TMQ_INVALID_MSG; - goto end; - } + RAW_NULL_CHECK(tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; - strcpy(pName.dbname, pRequest->pDb); - strcpy(pName.tname, tbName); + (void)strcpy(pName.dbname, pRequest->pDb); + (void)strcpy(pName.tname, tbName); - code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); - // if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - // uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); - // code = TSDB_CODE_SUCCESS; - // continue; - // } - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("cata log get table:%s meta failed", tbName); - goto end; - } + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); - SVgroupInfo vg; - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("cata log get table:%s vgroup failed", tbName); - goto end; - } + SVgroupInfo vg = {0}; + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); if (hData == NULL) { - taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); + RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); } SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter); + RAW_NULL_CHECK(pSW); TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); - if (fields == NULL) { - SET_ERROR_MSG("calloc fields failed"); - code = TSDB_CODE_OUT_OF_MEMORY; - goto end; - } + RAW_NULL_CHECK(fields); for (int i = 0; i < pSW->nCols; i++) { fields[i].type = pSW->pSchema[i].type; fields[i].bytes = pSW->pSchema[i].bytes; @@ -1824,13 +1740,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { } } - code = smlBuildOutput(pQuery, pVgHash); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("sml build output failed"); - goto end; - } + RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); - launchQueryImpl(pRequest, pQuery, true, NULL); + (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; end: @@ -1841,15 +1753,13 @@ end: destroyRequest(pRequest); taosHashCleanup(pVgHash); taosMemoryFreeClear(pTableMeta); - terrno = code; return code; } static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { if (taos == NULL || data == NULL) { - terrno = TSDB_CODE_INVALID_PARA; SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); - return terrno; + return TSDB_CODE_INVALID_PARA; } int32_t code = TSDB_CODE_SUCCESS; SHashObj* pVgHash = NULL; @@ -1859,12 +1769,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) STableMeta* pTableMeta = NULL; SVCreateTbReq* pCreateReqDst = NULL; - terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); - if (!pRequest) { - SET_ERROR_MSG("pRequest is NULL"); - return terrno; - } + RAW_NULL_CHECK(pRequest); + uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; rspObj.common.resIter = -1; @@ -1890,11 +1797,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } struct SCatalog* pCatalog = NULL; - code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("cata log get handle failed"); - goto end; - } + RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {0}; conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; @@ -1902,17 +1805,14 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - code = smlInitHandle(&pQuery); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("init sml handle failed"); - goto end; - } - + RAW_RETURN_CHECK(smlInitHandle(&pQuery)); pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pVgHash); uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum); while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) { void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter); + RAW_NULL_CHECK(pRetrieve); if (!rspObj.rsp.common.withSchema) { goto end; } @@ -1926,13 +1826,15 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) uDebug(LOG_ID_TAG " write raw metadata block tbname:%s", LOG_ID_VALUE, tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; - strcpy(pName.dbname, pRequest->pDb); - strcpy(pName.tname, tbName); + (void)strcpy(pName.dbname, pRequest->pDb); + (void)strcpy(pName.tname, tbName); // find schema data info for (int j = 0; j < rspObj.rsp.createTableNum; j++) { void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); + RAW_NULL_CHECK(dataTmp); int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); + RAW_NULL_CHECK(dataTmp); SDecoder decoderTmp = {0}; SVCreateTbReq pCreateReq = {0}; @@ -1953,8 +1855,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } if (strcmp(tbName, pCreateReq.name) == 0) { - cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); - // pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb); + RAW_RETURN_CHECK(cloneSVreateTbReq(&pCreateReq, &pCreateReqDst)); tDecoderClear(&decoderTmp); tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); break; @@ -1963,26 +1864,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); } - SVgroupInfo vg; - code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("cata log get table:%s vgroup failed", tbName); - goto end; - } - + SVgroupInfo vg = {0}; + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); if (pCreateReqDst) { // change stable name to get meta - strcpy(pName.tname, pCreateReqDst->ctb.stbName); - } - code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); - // if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - // uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); - // code = TSDB_CODE_SUCCESS; - // continue; - // } - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("cata log get table:%s meta failed", tbName); - goto end; + (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); } + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); if (pCreateReqDst) { pTableMeta->vgId = vg.vgId; @@ -1991,10 +1878,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); if (hData == NULL) { - taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); + RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); } SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter); + RAW_NULL_CHECK(pSW); TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); if (fields == NULL) { SET_ERROR_MSG("calloc fields failed"); @@ -2017,12 +1905,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } } - code = smlBuildOutput(pQuery, pVgHash); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } + RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); - launchQueryImpl(pRequest, pQuery, true, NULL); + (void)launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; end: @@ -2037,67 +1922,72 @@ end: tdDestroySVCreateTbReq(pCreateReqDst); taosMemoryFree(pCreateReqDst); } - terrno = code; return code; } -static cJSON* processSimpleMeta(SMqMetaRsp* pMetaRsp) { +static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) { if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) { - return processCreateStb(pMetaRsp); + processCreateStb(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) { - return processAlterStb(pMetaRsp); + processAlterStb(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) { - return processDropSTable(pMetaRsp); + processDropSTable(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) { - return processCreateTable(pMetaRsp); + processCreateTable(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) { - return processAlterTable(pMetaRsp); + processAlterTable(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) { - return processDropTable(pMetaRsp); + processDropTable(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) { - return processDropTable(pMetaRsp); + processDropTable(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) { - return processDeleteTable(pMetaRsp); + processDeleteTable(pMetaRsp, meta); } - - return NULL; } -static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { + +static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) { SDecoder coder; SMqBatchMetaRsp rsp = {0}; + int32_t code = 0; + cJSON* pJson = NULL; tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen); if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { - goto _end; + goto end; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION); + pJson = cJSON_CreateObject(); + RAW_NULL_CHECK(pJson); + RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION)); cJSON* pMetaArr = cJSON_CreateArray(); + RAW_NULL_CHECK(pMetaArr); int32_t num = taosArrayGetSize(rsp.batchMetaReq); for (int32_t i = 0; i < num; i++) { - int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); + int32_t* len = taosArrayGet(rsp.batchMetaLen, i); + RAW_NULL_CHECK(len); void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); + RAW_NULL_CHECK(tmpBuf); SDecoder metaCoder = {0}; SMqMetaRsp metaRsp = {0}; - tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead)); + tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead)); if(tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0 ) { - goto _end; + goto end; } - cJSON* pItem = processSimpleMeta(&metaRsp); + cJSON* pItem = NULL; + processSimpleMeta(&metaRsp, &pItem); tDeleteMqMetaRsp(&metaRsp); - cJSON_AddItemToArray(pMetaArr, pItem); + RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem)); } - cJSON_AddItemToObject(pJson, "metas", pMetaArr); + RAW_FALSE_CHECK(cJSON_AddItemToObject(pJson, "metas", pMetaArr)); tDeleteMqBatchMetaRsp(&rsp); char* fullStr = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); - return fullStr; + *string = fullStr; + return; -_end: +end: cJSON_Delete(pJson); tDeleteMqBatchMetaRsp(&rsp); - return NULL; } char* tmq_get_json_meta(TAOS_RES* res) { @@ -2109,14 +1999,19 @@ char* tmq_get_json_meta(TAOS_RES* res) { if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* pMetaDataRspObj = (SMqTaosxRspObj*)res; - return processAutoCreateTable(&pMetaDataRspObj->rsp); + char* string = NULL; + processAutoCreateTable(&pMetaDataRspObj->rsp, &string); + return string; } else if (TD_RES_TMQ_BATCH_META(res)) { SMqBatchMetaRspObj* pBatchMetaRspObj = (SMqBatchMetaRspObj*)res; - return processBatchMetaToJson(&pBatchMetaRspObj->rsp); + char* string = NULL; + processBatchMetaToJson(&pBatchMetaRspObj->rsp, &string); + return string; } SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - cJSON* pJson = processSimpleMeta(&pMetaRspObj->metaRsp); + cJSON* pJson = NULL; + processSimpleMeta(&pMetaRspObj->metaRsp, &pJson); char* string = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); return string; @@ -2144,48 +2039,47 @@ static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_ra void* buf = NULL; tEncodeSize(encodeFunc, rspObj, len, code); if (code < 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto FAILED; } len += sizeof(int8_t) + sizeof(int32_t); buf = taosMemoryCalloc(1, len); if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto FAILED; } tEncoderInit(&encoder, buf, len); if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto FAILED; } int32_t offsetLen = getOffSetLen(rspObj); if (offsetLen <= 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto FAILED; } if (tEncodeI32(&encoder, offsetLen) < 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto FAILED; } if (encodeFunc(&encoder, rspObj) < 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto FAILED; } tEncoderClear(&encoder); raw->raw = buf; raw->raw_len = len; - return 0; + return code; FAILED: tEncoderClear(&encoder); taosMemoryFree(buf); - return terrno; + return code; } int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { if (!raw || !res) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; @@ -2195,18 +2089,20 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { uDebug("tmq get raw type meta:%p", raw); } else if (TD_RES_TMQ(res)) { SMqRspObj* rspObj = ((SMqRspObj*)res); - if (encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0) { + int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw); + if (code != 0) { uError("tmq get raw type error:%d", terrno); - return terrno; + return code; } raw->raw_type = RES_TYPE__TMQ; uDebug("tmq get raw type data:%p", raw); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res); - if (encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0) { + int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw); + if (code != 0) { uError("tmq get raw type error:%d", terrno); - return terrno; + return code; } raw->raw_type = RES_TYPE__TMQ_METADATA; uDebug("tmq get raw type metadata:%p", raw); @@ -2218,8 +2114,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { uDebug("tmq get raw batch meta:%p", raw); } else { uError("tmq get raw error type:%d", *(int8_t*)res); - terrno = TSDB_CODE_TMQ_INVALID_MSG; - return terrno; + return TSDB_CODE_TMQ_INVALID_MSG; } return TSDB_CODE_SUCCESS; } @@ -2229,7 +2124,7 @@ void tmq_free_raw(tmq_raw_data raw) { if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) { taosMemoryFree(raw.raw); } - memset(terrMsg, 0, ERR_MSG_LEN); + (void)memset(terrMsg, 0, ERR_MSG_LEN); } static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { @@ -2267,39 +2162,39 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) { if (taos == NULL || meta == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_INVALID_PARA; } SMqBatchMetaRsp rsp = {0}; - SDecoder coder; + SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; // decode and process req tDecoderInit(&coder, meta, metaLen); if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { code = TSDB_CODE_INVALID_PARA; - goto _end; + goto end; } int32_t num = taosArrayGetSize(rsp.batchMetaReq); for (int32_t i = 0; i < num; i++) { - int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); - void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); + int32_t* len = taosArrayGet(rsp.batchMetaLen, i); + RAW_NULL_CHECK(len); + void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); + RAW_NULL_CHECK(tmpBuf); SDecoder metaCoder = {0}; SMqMetaRsp metaRsp = {0}; - tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), len - sizeof(SMqRspHead)); + tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead)); if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) { code = TSDB_CODE_INVALID_PARA; - goto _end; + goto end; } code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType); tDeleteMqMetaRsp(&metaRsp); if (code != TSDB_CODE_SUCCESS) { - goto _end; + goto end; } } -_end: +end: tDeleteMqBatchMetaRsp(&rsp); - errno = code; return code; } diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index 7c3847e4a1..3d70c67661 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -23,11 +23,11 @@ TARGET_LINK_LIBRARIES( PUBLIC os util common transport parser catalog scheduler function gtest taos_static qcom geometry ) -ADD_EXECUTABLE(clientMonitorTest clientMonitorTests.cpp) -TARGET_LINK_LIBRARIES( - clientMonitorTest - PUBLIC os util common transport monitor parser catalog scheduler function gtest taos_static qcom executor -) +#ADD_EXECUTABLE(clientMonitorTest clientMonitorTests.cpp) +#TARGET_LINK_LIBRARIES( +# clientMonitorTest +# PUBLIC os util common transport monitor parser catalog scheduler function gtest taos_static qcom executor +#) TARGET_INCLUDE_DIRECTORIES( clientTest @@ -47,11 +47,11 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/client/inc" ) -TARGET_INCLUDE_DIRECTORIES( - clientMonitorTest - PUBLIC "${TD_SOURCE_DIR}/include/client/" - PRIVATE "${TD_SOURCE_DIR}/source/client/inc" -) +#TARGET_INCLUDE_DIRECTORIES( +# clientMonitorTest +# PUBLIC "${TD_SOURCE_DIR}/include/client/" +# PRIVATE "${TD_SOURCE_DIR}/source/client/inc" +#) add_test( NAME smlTest diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 98e6530be0..9efff055bf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10828,7 +10828,9 @@ int32_t tDecodeMqBatchMetaRsp(SDecoder *pDecoder, SMqBatchMetaRsp *pRsp) { if (tDecodeI32(pDecoder, &size) < 0) return -1; if (size > 0) { pRsp->batchMetaReq = taosArrayInit(size, POINTER_BYTES); + if (!pRsp->batchMetaReq) return -1; pRsp->batchMetaLen = taosArrayInit(size, sizeof(int32_t)); + if (!pRsp->batchMetaLen) return -1; for (int32_t i = 0; i < size; i++) { void *pCreate = NULL; uint64_t len = 0; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index a985b804be..ca536c5bad 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -103,31 +103,31 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; - TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); - TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); - TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); - TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_GO_TO_END(tdbTbUpsert(ttb, key, kLen, value, vLen, txn)); + TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); return 0; + +END: + tdbAbort(pTq->pMetaDB, txn); + return code; } int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) { int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; - TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); - TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn)); - TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn)); - TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED)); + TQ_ERR_GO_TO_END(tdbTbDelete(ttb, key, kLen, txn)); + TQ_ERR_GO_TO_END(tdbCommit(pTq->pMetaDB, txn)); + TQ_ERR_GO_TO_END(tdbPostCommit(pTq->pMetaDB, txn)); return 0; -} - -static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) { - int32_t code = TDB_CODE_SUCCESS; - TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq, path)); END: + tdbAbort(pTq->pMetaDB, txn); return code; } @@ -408,7 +408,8 @@ END: } int32_t tqMetaOpen(STQ* pTq) { - char* maindb = NULL; + char* maindb = NULL; + char* offsetNew = NULL; int32_t code = TDB_CODE_SUCCESS; TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME)); if(!taosCheckExistFile(maindb)){ @@ -416,12 +417,20 @@ int32_t tqMetaOpen(STQ* pTq) { TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq)); }else{ TQ_ERR_GO_TO_END(tqMetaTransform(pTq)); - taosRemoveFile(maindb); + (void)taosRemoveFile(maindb); } + + TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); + if(taosCheckExistFile(offsetNew)){ + TQ_ERR_GO_TO_END(tqOffsetRestoreFromFile(pTq, offsetNew)); + (void)taosRemoveFile(offsetNew); + } + TQ_ERR_GO_TO_END(tqMetaRestoreCheckInfo(pTq)); END: taosMemoryFree(maindb); + taosMemoryFree(offsetNew); return code; } @@ -445,14 +454,14 @@ int32_t tqMetaTransform(STQ* pTq) { TQ_ERR_GO_TO_END(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore)); TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME)); - if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ - tqError("copy offset file error"); + if(taosCheckExistFile(offset)) { + if (taosCopyFile(offset, offsetNew) < 0) { + tqError("copy offset file error"); + } else { + (void)taosRemoveFile(offset); + } } - TQ_ERR_GO_TO_END(tqMetaTransformOffsetInfo(pTq, offsetNew)); - (void)taosRemoveFile(offset); - (void)taosRemoveFile(offsetNew); - END: taosMemoryFree(offset); taosMemoryFree(offsetNew); diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index f9faf611e1..63257d7fab 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -73,6 +73,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { } TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size)); + tqInfo("tq: offset restore from file to tdb, subkey:%s", offset.subKey); taosMemoryFree(pMemBuf); pMemBuf = NULL; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b4829d1dd8..0e5b1b6fb7 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -365,6 +365,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = streamTrySchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); + taosArrayDestroy(pTaskList); return -1; } } diff --git a/source/dnode/vnode/src/vnd/vnodeAsync.c b/source/dnode/vnode/src/vnd/vnodeAsync.c index 6d6533463b..126c394f54 100644 --- a/source/dnode/vnode/src/vnd/vnodeAsync.c +++ b/source/dnode/vnode/src/vnd/vnodeAsync.c @@ -190,10 +190,10 @@ static int32_t vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) { task->prev->next = task->next; task->next->prev = task->prev; if (task->cancel) { - taosArrayPush(cancelArray, &(SVATaskCancelInfo){ - .cancel = task->cancel, - .arg = task->arg, - }); + TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ + .cancel = task->cancel, + .arg = task->arg, + })); } vnodeAsyncTaskDone(async, task); } @@ -206,6 +206,9 @@ static void *vnodeAsyncLoop(void *arg) { SVWorker *worker = (SVWorker *)arg; SVAsync *async = worker->async; SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo)); + if (cancelArray == NULL) { + return NULL; + } setThreadName(async->label); @@ -466,7 +469,7 @@ int32_t vnodeAsyncOpen(int32_t numOfThreads) { vnodeAsyncSetWorkers(2, numOfThreads); _exit: - return 0; + return code; } int32_t vnodeAsyncClose() { @@ -748,10 +751,10 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) { task->prev->next = task->next; task->next->prev = task->prev; if (task->cancel) { - taosArrayPush(cancelArray, &(SVATaskCancelInfo){ - .cancel = task->cancel, - .arg = task->arg, - }); + TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ + .cancel = task->cancel, + .arg = task->arg, + })); } vnodeAsyncTaskDone(async, task); } @@ -763,10 +766,10 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) { channel->scheduled->prev->next = channel->scheduled->next; channel->scheduled->next->prev = channel->scheduled->prev; if (channel->scheduled->cancel) { - taosArrayPush(cancelArray, &(SVATaskCancelInfo){ - .cancel = channel->scheduled->cancel, - .arg = channel->scheduled->arg, - }); + TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ + .cancel = channel->scheduled->cancel, + .arg = channel->scheduled->arg, + })); } vnodeAsyncTaskDone(async, channel->scheduled); } diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 40112c5579..ad183839d7 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -16,13 +16,12 @@ #include "vnd.h" /* ------------------------ STRUCTURES ------------------------ */ -static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) { +static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) { SVBufPool *pPool; pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); if (pPool == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } memset(pPool, 0, sizeof(SVBufPool)); @@ -44,14 +43,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPoo pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock)); if (!pPool->lock) { taosMemoryFree(pPool); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } if (taosThreadSpinInit(pPool->lock, 0) != 0) { taosMemoryFree((void *)pPool->lock); taosMemoryFree(pPool); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + return terrno = TAOS_SYSTEM_ERROR(errno); } } else { pPool->lock = NULL; @@ -77,10 +74,11 @@ int vnodeOpenBufPool(SVnode *pVnode) { for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { // create pool - if (vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i])) { + int32_t code; + if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) { vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vnodeCloseBufPool(pVnode); - return -1; + return code; } // add to free list @@ -274,8 +272,6 @@ _exit: } int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { - int32_t code = 0; - taosThreadMutexLock(&pPool->mutex); pQNode->pNext = pPool->qList.pNext; @@ -285,9 +281,7 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { pPool->nQuery++; taosThreadMutexUnlock(&pPool->mutex); - -_exit: - return code; + return 0; } void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) { diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index c7b54d36b6..85981ac9aa 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -394,8 +394,7 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) { } if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) { - terrno = TSDB_CODE_VND_HASH_MISMATCH; - return -1; + return terrno = TSDB_CODE_VND_HASH_MISMATCH; } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 48852dd159..9be84b99f4 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -129,17 +129,17 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable; pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed; - pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it -// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it + pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it + // pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it pTq->tqGetResultBlock = tqGetResultBlock; -// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; + // pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; pTq->tqGetResultBlockTime = tqGetResultBlockTime; pTq->tqGetStreamExecProgress = tqGetStreamExecInfo; - } +} void initStateStoreAPI(SStateStore* pStore) { pStore->streamFileStateInit = streamFileStateInit; diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 3a454c53ef..228cc9e0b2 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -19,31 +19,19 @@ static volatile int32_t VINIT = 0; int vnodeInit(int nthreads) { - int32_t init; - - init = atomic_val_compare_exchange_32(&VINIT, 0, 1); - if (init) { + if (atomic_val_compare_exchange_32(&VINIT, 0, 1)) { return 0; } - if (vnodeAsyncOpen(nthreads) != 0) { - return -1; - } - - if (walInit() < 0) { - return -1; - } + TAOS_CHECK_RETURN(vnodeAsyncOpen(nthreads)); + TAOS_CHECK_RETURN(walInit()); return 0; } void vnodeCleanup() { - int32_t init = atomic_val_compare_exchange_32(&VINIT, 1, 0); - if (init == 0) return; - - // set stop + if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return; vnodeAsyncClose(); - walCleanUp(); smaCleanUp(); } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 07982afd5d..27a7ce1022 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -446,7 +446,8 @@ int32_t ctgGetTbTag(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - char* pJson = parseTagDatatoJson(pTag); + char* pJson = NULL; + parseTagDatatoJson(pTag, &pJson); STagVal tagVal; tagVal.cid = 0; tagVal.type = TSDB_DATA_TYPE_JSON; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 13bf2d08e6..e1d7362f0a 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2079,7 +2079,8 @@ int32_t ctgHandleGetTbTagRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - char* pJson = parseTagDatatoJson(pTag); + char* pJson = NULL; + parseTagDatatoJson(pTag, &pJson); STagVal tagVal; tagVal.cid = 0; tagVal.type = TSDB_DATA_TYPE_JSON; diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 40e85e90d2..9506e801c5 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -503,11 +503,10 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) { } if (tTagIsJson(pTag)) { - char* pJson = parseTagDatatoJson(pTag); - if (pJson) { - *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson); - taosMemoryFree(pJson); - } + char* pJson = NULL; + parseTagDatatoJson(pTag, &pJson); + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson); + taosMemoryFree(pJson); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 415c7d1f0e..dd6c653a8a 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -966,7 +966,8 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, char* tagVarChar = NULL; if (tagData != NULL) { if (tagType == TSDB_DATA_TYPE_JSON) { - char* tagJson = parseTagDatatoJson(tagData); + char* tagJson = NULL; + parseTagDatatoJson(tagData, &tagJson); tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE); memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson)); varDataSetLen(tagVarChar, strlen(tagJson)); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 8502856754..3d344a8a20 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -417,7 +417,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t return TSDB_CODE_SUCCESS; } -char* parseTagDatatoJson(void* p) { +void parseTagDatatoJson(void* p, char** jsonStr) { char* string = NULL; SArray* pTagVals = NULL; cJSON* json = NULL; @@ -436,6 +436,9 @@ char* parseTagDatatoJson(void* p) { } for (int j = 0; j < nCols; ++j) { STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j); + if (pTagVal == NULL) { + continue; + } // json key encode by binary tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey)); // json value @@ -445,11 +448,16 @@ char* parseTagDatatoJson(void* p) { if (value == NULL) { goto end; } - cJSON_AddItemToObject(json, tagJsonKey, value); + if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + goto end; + } } else if (type == TSDB_DATA_TYPE_NCHAR) { cJSON* value = NULL; if (pTagVal->nData > 0) { char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1); + if (tagJsonValue == NULL) { + goto end; + } int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue); if (length < 0) { qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, @@ -464,25 +472,34 @@ char* parseTagDatatoJson(void* p) { } } else if (pTagVal->nData == 0) { value = cJSON_CreateString(""); + if (value == NULL) { + goto end; + } } else { goto end; } - cJSON_AddItemToObject(json, tagJsonKey, value); + if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + goto end; + } } else if (type == TSDB_DATA_TYPE_DOUBLE) { double jsonVd = *(double*)(&pTagVal->i64); cJSON* value = cJSON_CreateNumber(jsonVd); if (value == NULL) { goto end; } - cJSON_AddItemToObject(json, tagJsonKey, value); + if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + goto end; + } } else if (type == TSDB_DATA_TYPE_BOOL) { char jsonVd = *(char*)(&pTagVal->i64); cJSON* value = cJSON_CreateBool(jsonVd); if (value == NULL) { goto end; } - cJSON_AddItemToObject(json, tagJsonKey, value); + if(!cJSON_AddItemToObject(json, tagJsonKey, value)){ + goto end; + } } else { goto end; } @@ -494,7 +511,7 @@ end: if (string == NULL) { string = taosStrdup(TSDB_DATA_NULL_STR_L); } - return string; + *jsonStr = string; } int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 45e0ecf738..3ce5cd5714 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -214,7 +214,6 @@ void schedulerDestroy(void) { } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); - taosTmrCleanUp(schMgmt.timer); qWorkerDestroy(&schMgmt.queryMgmt); schMgmt.queryMgmt = NULL; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 5e67f1766f..55209fc427 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -280,6 +280,13 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamMutexLock(&pInfo->checkInfoLock); + // drop procedure already started, not start check downstream now + ETaskStatus s = streamTaskGetStatus(pTask).state; + if (s == TASK_STATUS__DROPPING) { + streamMutexUnlock(&pInfo->checkInfoLock); + return; + } + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 42d2f86dac..6f3b7d8b32 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -742,7 +742,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaRLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { + // to make sure check status will not start the check downstream status when we start to check timerActive count. + streamMutexLock(&pTask->taskCheckInfo.checkInfoLock); timerActive = (*ppTask)->status.timerActive; + streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock); } streamMetaRUnLock(pMeta); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 990c84371f..4db1475fa9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -435,6 +435,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_STB_NOT_EXIST, "Stable not exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER, "Table schema is old") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE, "Table already exists in other stables") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INCONSISTENT_DB_ID, "Inconsistent database id") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle") diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 82993e8449..1d2b3f003c 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -49,8 +49,7 @@ int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double number) { if (NULL == cJSON_AddNumberToObject((cJSON*)pJson, pName, number)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; @@ -58,8 +57,7 @@ int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double num int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean) { if (NULL == cJSON_AddBoolToObject((cJSON*)pJson, pName, boolean)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; @@ -67,8 +65,7 @@ int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal) { if (NULL == cJSON_AddStringToObject((cJSON*)pJson, pName, pVal)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; @@ -87,8 +84,7 @@ int32_t tjsonAddItemToObject(SJson* pJson, const char* pName, SJson* pItem) { return TSDB_CODE_SUCCESS; } - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) { @@ -96,8 +92,7 @@ int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem) { return TSDB_CODE_SUCCESS; } - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj) { @@ -106,18 +101,27 @@ int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void } SJson* pJobj = tjsonCreateObject(); - if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) { + if (NULL == pJobj) { + return terrno; + } + int32_t rc = func(pObj, pJobj); + if (rc != TSDB_CODE_SUCCESS) { tjsonDelete(pJobj); - return TSDB_CODE_FAILED; + return rc; } return tjsonAddItemToObject(pJson, pName, pJobj); } int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) { SJson* pJobj = tjsonCreateObject(); - if (NULL == pJobj || TSDB_CODE_SUCCESS != func(pObj, pJobj)) { + if (pJobj == NULL) { + return terrno; + } + + int32_t rc = func(pObj, pJobj); + if (rc != TSDB_CODE_SUCCESS) { tjsonDelete(pJobj); - return TSDB_CODE_FAILED; + return rc; } return tjsonAddItemToArray(pJson, pJobj); } @@ -156,9 +160,21 @@ int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArr return TSDB_CODE_SUCCESS; } -char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); } +char* tjsonToString(const SJson* pJson) { + char* p = cJSON_Print((cJSON*)pJson); + if (!p) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + return p; +} -char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); } +char* tjsonToUnformattedString(const SJson* pJson) { + char* p = cJSON_PrintUnformatted((cJSON*)pJson); + if (!p) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + return p; +} SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName) { return cJSON_GetObjectItem(pJson, pName); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index bb67aa5a1f..c0f1d6e443 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -55,6 +55,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/window_close_session_ext.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/partition_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/pause_resume_test.py +,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/state_window_case.py #,,n,system-test,python3 ./test.py -f 8-stream/vnode_restart.py -N 4 #,,n,system-test,python3 ./test.py -f 8-stream/snode_restart.py -N 4 ,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 @@ -300,6 +301,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3581.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3311.py ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3821.py +,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-5130.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/balance_vgroups_r1.py -N 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosShell.py diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 63dc9773c4..8c0e9b17ad 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -538,21 +538,21 @@ class TDCom: tdLog.info("cfgPath: %s" % cfgPath) return cfgPath - def newcon(self,host='localhost',port=6030,user='root',password='taosdata'): - con=taos.connect(host=host, user=user, password=password, port=port) + def newcon(self,host='localhost',port=6030,user='root',password='taosdata', database=None): + con=taos.connect(host=host, user=user, password=password, port=port, database=database) # print(con) return con - def newcur(self,host='localhost',port=6030,user='root',password='taosdata'): + def newcur(self,host='localhost',port=6030,user='root',password='taosdata',database=None): cfgPath = self.getClientCfgPath() - con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port) + con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port,database=database) cur=con.cursor() # print(cur) return cur - def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata'): + def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata', database = None): newTdSql = TDSql() - cur = self.newcur(host=host,port=port,user=user,password=password) + cur = self.newcur(host=host,port=port,user=user,password=password, database=database) newTdSql.init(cur, False) return newTdSql diff --git a/tests/system-test/8-stream/state_window_case.py b/tests/system-test/8-stream/state_window_case.py new file mode 100644 index 0000000000..5ecf8d7832 --- /dev/null +++ b/tests/system-test/8-stream/state_window_case.py @@ -0,0 +1,60 @@ +import sys +from util.log import * +from util.sql import * + +from util.cases import * +from util.common import * + + +class TDTestCase: + updatecfgDict = {'debugFlag':135,} + def init(self, conn, logSql, replicaVar = 1): + self.replicaVar = replicaVar + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + def init_case(self): + tdLog.debug("==========init case==========") + tdSql.execute("create database test") + tdSql.execute("use test") + tdSql.execute("CREATE STABLE `st_variable_data` (`load_time` TIMESTAMP, `collect_time` TIMESTAMP, `var_value` NCHAR(300)) TAGS (`factory_id` NCHAR(30), `device_code` NCHAR(80), `var_name` NCHAR(120), `var_type` NCHAR(30), `var_address` NCHAR(100), `var_attribute` NCHAR(30), `device_name` NCHAR(150), `var_desc` NCHAR(200), `trigger_value` NCHAR(50), `var_category` NCHAR(50), `var_category_desc` NCHAR(200));") + tdSql.execute('CREATE TABLE aaa using `st_variable_data` tags("a1","a2", "a3","a4","a5","a6","a7","a8","a9","a10","a11")') + time.sleep(2) + + def create_stream(self): + tdLog.debug("==========create stream==========") + tdSql.execute("CREATE STREAM stream_device_alarm TRIGGER AT_ONCE DELETE_MARK 30d INTO st_device_alarm tags(factory_id varchar(20), device_code varchar(80), var_name varchar(200))\ + as select _wstart start_time, last(load_time) end_time, first(var_value) var_value, (case when lower(var_value)=lower(trigger_value) then '1' else '0' end) state_flag from st_variable_data\ + PARTITION BY tbname tname, factory_id, device_code, var_name STATE_WINDOW(case when lower(var_value)=lower(trigger_value) then '1' else '0' end)") + time.sleep(2) + tdSql.execute("CREATE STREAM stream_device_alarm2 TRIGGER AT_ONCE DELETE_MARK 30d INTO st_device_alarm2 tags(factory_id varchar(20), device_code varchar(80), var_name varchar(200))\ + as select _wstart start_time, last(load_time) end_time, first(var_value) var_value, 1 state_flag from st_variable_data\ + PARTITION BY tbname tname, factory_id, device_code, var_name STATE_WINDOW(case when lower(var_value)=lower(trigger_value) then '1' else '0' end)") + time.sleep(2) + + def insert_data(self): + try: + tdSql.execute("insert into aaa values('2024-07-15 14:00:00', '2024-07-15 14:00:00', 'a8')", queryTimes=5, show=True) + time.sleep(0.01) + tdSql.execute("insert into aaa values('2024-07-15 14:10:00', '2024-07-15 14:10:00', 'a9')", queryTimes=5, show=True) + time.sleep(1) + except Exception as error: + tdLog.exit(f"insert data failed {error}") + + def run(self): + self.init_case() + self.create_stream() + self.insert_data() + tdSql.query("select state_flag from st_device_alarm") + tdSql.checkData(0, 0, 0, show=True) + tdSql.checkData(1, 0, 1, show=True) + tdSql.query("select state_flag from st_device_alarm2") + tdSql.checkData(0, 0, 1, show=True) + tdSql.checkData(1, 0, 1, show=True) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/99-TDcase/TS-5130.py b/tests/system-test/99-TDcase/TS-5130.py new file mode 100644 index 0000000000..504500fa0e --- /dev/null +++ b/tests/system-test/99-TDcase/TS-5130.py @@ -0,0 +1,45 @@ +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +import taos + + + +class TDTestCase: + def init(self, conn, logSQl, replicaVal=1): + self.replicaVar = int(replicaVal) + tdLog.debug(f"start to excute {__file__}") + self.conn = conn + tdSql.init(conn.cursor(), False) + self.passwd = {'root':'taosdata', + 'test':'test'} + def prepare_user(self): + tdSql.execute(f"create user test pass 'test' sysinfo 1") + + def test_connect_user(self, uname): + try: + for db in ['information_schema', 'performance_schema']: + new_tdsql = tdCom.newTdSql(user=uname, password=self.passwd[uname], database=db) + new_tdsql.query('show databases') + new_tdsql.checkData(0, 0, 'information_schema') + new_tdsql.checkData(1, 0, 'performance_schema') + tdLog.success(f"Test User {uname} for {db} .......[OK]") + except: + tdLog.exit(f'{__file__} failed') + + def run(self): + self.prepare_user() + self.test_connect_user('root') + self.test_connect_user('test') + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) + + + + diff --git a/utils/test/c/write_raw_block_test.c b/utils/test/c/write_raw_block_test.c index 0f123fb560..8ed997bc92 100644 --- a/utils/test/c/write_raw_block_test.c +++ b/utils/test/c/write_raw_block_test.c @@ -171,7 +171,7 @@ int32_t init_env() { } // pass NULL return last error code describe - const char* err = taos_errstr(NULL); + const char* err = tmq_err2str(error_code); printf("write_raw_block return code =0x%x err=%s\n", error_code, err); if(strcmp(err, "success") == 0) { printf("expect failed , but error string is success! err=%s\n", err); @@ -185,7 +185,7 @@ int32_t init_env() { goto END; } - err = taos_errstr(NULL); + err = tmq_err2str(error_code); printf("write_raw_block no exist table return code =0x%x err=%s\n", error_code, err); if(strcmp(err, "success") == 0) { printf("expect failed write no exist table, but error string is success! err=%s\n", err);