diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 2bd815b460..80403986aa 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -23,12 +23,12 @@ #include "tglobal.h" #include "tmsgtype.h" -#define RAW_NULL_CHECK(c) \ - do { \ - if (c == NULL) { \ - code = terrno; \ - goto end; \ - } \ +#define RAW_NULL_CHECK(c) \ + do { \ + if (c == NULL) { \ + code = terrno; \ + goto end; \ + } \ } while (0) #define RAW_FALSE_CHECK(c) \ @@ -52,7 +52,7 @@ #define TMQ_META_VERSION "1.0" -static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); +static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, SColCmprWrapper* pColCmprRow, cJSON** pJson) { @@ -163,7 +163,7 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche } RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); - end: +end: *pJson = json; } @@ -197,7 +197,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) { return code; } - end: +end: return code; } static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) { @@ -338,7 +338,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** break; } - end: +end: tFreeSMAltertbReq(&req); *pJson = json; } @@ -455,7 +455,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { cJSON* tvalue = NULL; if (IS_VAR_DATA_TYPE(pTagVal->type)) { - char* buf = NULL; + char* buf = NULL; int64_t bufSize = 0; if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) { bufSize = pTagVal->nData * 2 + 2 + 3; @@ -485,7 +485,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); } - end: +end: RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); taosArrayDestroy(pTagVals); } @@ -514,7 +514,7 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO } RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList)); - end: +end: *pJson = json; } @@ -585,7 +585,7 @@ static void processAutoCreateTable(SMqDataRsp* rsp, char** string) { *string = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); - end: +end: uDebug("auto created table return, sql json:%s", *string); for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); @@ -989,7 +989,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; - end: +end: uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code)); destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); @@ -1023,9 +1023,9 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { SCatalog* pCatalog = NULL; RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)}; SName pName = {0}; toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName); STableMeta* pTableMeta = NULL; @@ -1088,7 +1088,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; - end: +end: uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code)); destroyRequest(pRequest); tDecoderClear(&coder); @@ -1142,9 +1142,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); RAW_NULL_CHECK(pRequest->tableList); @@ -1269,7 +1269,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; - end: +end: uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); tDeleteSVCreateTbBatchReq(&req); @@ -1328,9 +1328,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); RAW_NULL_CHECK(pRequest->tableList); // loop to create table @@ -1395,7 +1395,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } code = pRequest->code; - end: +end: uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); @@ -1433,7 +1433,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } taos_free_result(res); - end: +end: uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code)); tDecoderClear(&coder); return code; @@ -1473,9 +1473,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SCatalog* pCatalog = NULL; RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog)); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; SVgroupInfo pInfo = {0}; SName pName = {0}; @@ -1543,7 +1543,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { code = handleAlterTbExecRes(pRes->res, pCatalog); } } - end: +end: uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code)); taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); @@ -1608,7 +1608,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; - end: +end: uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); @@ -1668,7 +1668,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; - end: +end: uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); @@ -1708,7 +1708,8 @@ static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) { goto end; } if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) { - RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq))); + RAW_RETURN_CHECK( + taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq))); } else { tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); pCreateReq = (SVCreateTbReq){0}; @@ -1734,34 +1735,34 @@ static SHashObj* writeRawCache = NULL; static int8_t initFlag = 0; static int8_t initedFlag = WRITE_RAW_INIT_START; -typedef struct{ - SHashObj* pVgHash; - SHashObj* pNameHash; - SHashObj* pMetaHash; -}rawCacheInfo; +typedef struct { + SHashObj* pVgHash; + SHashObj* pNameHash; + SHashObj* pMetaHash; +} rawCacheInfo; -typedef struct{ +typedef struct { SVgroupInfo vgInfo; int64_t uid; int64_t suid; -}tbInfo; +} tbInfo; -static void tmqFreeMeta(void *data){ +static void tmqFreeMeta(void* data) { STableMeta* pTableMeta = *(STableMeta**)data; taosMemoryFree(pTableMeta); } -static void freeRawCache(void *data) { +static void freeRawCache(void* data) { rawCacheInfo* pRawCache = (rawCacheInfo*)data; taosHashCleanup(pRawCache->pMetaHash); taosHashCleanup(pRawCache->pNameHash); taosHashCleanup(pRawCache->pVgHash); } -static int32_t initRawCacheHash(){ - if (writeRawCache == NULL){ +static int32_t initRawCacheHash() { + if (writeRawCache == NULL) { writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (writeRawCache == NULL){ + if (writeRawCache == NULL) { return terrno; } taosHashSetFreeFp(writeRawCache, freeRawCache); @@ -1769,7 +1770,7 @@ static int32_t initRawCacheHash(){ return 0; } -static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){ +static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) { char* p = (char*)rawData; // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each // column length | @@ -1799,16 +1800,15 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe } fields += sizeof(int8_t) + sizeof(int32_t); - if (j == pTableMeta->tableInfo.numOfColumns) - return true; + if (j == pTableMeta->tableInfo.numOfColumns) return true; } return false; } -static int32_t getRawCache(SHashObj **pVgHash, SHashObj **pNameHash, SHashObj **pMetaHash, void *key) { +static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) { int32_t code = 0; - void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES); - if (cacheInfo == NULL){ + void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES); + if (cacheInfo == NULL) { *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); RAW_NULL_CHECK(*pVgHash); *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -1819,7 +1819,7 @@ static int32_t getRawCache(SHashObj **pVgHash, SHashObj **pNameHash, SHashObj ** rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash}; RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo))); } else { - rawCacheInfo *info = (rawCacheInfo *)cacheInfo; + rawCacheInfo* info = (rawCacheInfo*)cacheInfo; *pVgHash = info->pVgHash; *pNameHash = info->pNameHash; *pMetaHash = info->pMetaHash; @@ -1833,7 +1833,7 @@ end: return code; } -static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo *conn){ +static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) { int32_t code = 0; RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, pRequest)); (*pRequest)->syncQuery = true; @@ -1852,29 +1852,30 @@ end: return code; } -typedef int32_t _raw_decode_func_(SDecoder *pDecoder, SMqDataRsp *pRsp); -static int32_t decodeRawData(SDecoder *decoder, void* data, int32_t dataLen, _raw_decode_func_ func, SMqRspObj* rspObj){ - int8_t dataVersion = *(int8_t*)data; - if (dataVersion >= MQ_DATA_RSP_VERSION) { - data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); - dataLen -= sizeof(int8_t) + sizeof(int32_t); +typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp); +static int32_t decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func, + SMqRspObj* rspObj) { + int8_t dataVersion = *(int8_t*)data; + if (dataVersion >= MQ_DATA_RSP_VERSION) { + data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); + dataLen -= sizeof(int8_t) + sizeof(int32_t); } - rspObj->resIter = -1; - tDecoderInit(decoder, data, dataLen); - int32_t code = func(decoder, &rspObj->dataRsp); - if (code != 0) { - SET_ERROR_MSG("decode mq taosx data rsp failed"); + rspObj->resIter = -1; + tDecoderInit(decoder, data, dataLen); + int32_t code = func(decoder, &rspObj->dataRsp); + if (code != 0) { + SET_ERROR_MSG("decode mq taosx data rsp failed"); } - return code; + return code; } -static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj *pMetaHash, SVCreateTbReq* pCreateReqDst, - SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName, - STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry){ - int32_t code = 0; +static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash, + SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName, + STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) { + int32_t code = 0; STableMeta* pTableMeta = NULL; - tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname)); + tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname)); if (tmpInfo == NULL || retry > 0) { tbInfo info = {0}; @@ -1884,13 +1885,13 @@ static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj } RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta)); info.uid = pTableMeta->uid; - if (pTableMeta->tableType == TSDB_CHILD_TABLE){ + if (pTableMeta->tableType == TSDB_CHILD_TABLE) { info.suid = pTableMeta->suid; } else { info.suid = pTableMeta->uid; } code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); - if (code != 0){ + if (code != 0) { taosMemoryFree(pTableMeta); goto end; } @@ -1902,20 +1903,21 @@ static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo))); tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname)); - RAW_RETURN_CHECK(taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo))); + RAW_RETURN_CHECK( + taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo))); } - if (pTableMeta == NULL || retry > 0){ + if (pTableMeta == NULL || retry > 0) { STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES); if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) { RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta)); code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); - if (code != 0){ + if (code != 0) { taosMemoryFree(pTableMeta); goto end; } - }else{ + } else { pTableMeta = *pTableMetaTmp; pTableMeta->uid = tmpInfo->uid; pTableMeta->vgId = tmpInfo->vgInfo.vgId; @@ -1927,25 +1929,25 @@ end: return code; } -static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){ - int32_t code = TSDB_CODE_SUCCESS; - SQuery* pQuery = NULL; - SMqRspObj rspObj = {0}; - SDecoder decoder = {0}; +static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { + int32_t code = TSDB_CODE_SUCCESS; + SQuery* pQuery = NULL; + SMqRspObj rspObj = {0}; + SDecoder decoder = {0}; - SRequestObj* pRequest = NULL; - SCatalog* pCatalog = NULL; - SRequestConnInfo conn = {0}; + SRequestObj* pRequest = NULL; + SCatalog* pCatalog = NULL; + SRequestConnInfo conn = {0}; RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn)); uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj)); - SHashObj *pVgHash = NULL; - SHashObj *pNameHash = NULL; - SHashObj *pMetaHash = NULL; + SHashObj* pVgHash = NULL; + SHashObj* pNameHash = NULL; + SHashObj* pMetaHash = NULL; RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos)); int retry = 0; - while(1){ + while (1) { RAW_RETURN_CHECK(smlInitHandle(&pQuery)); uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum); while (++rspObj.resIter < rspObj.dataRsp.blockNum) { @@ -1968,9 +1970,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){ tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); STableMeta* pTableMeta = NULL; - RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, - &pName, &pTableMeta, pSW, rawData, retry)); - char err[ERR_MSG_LEN] = {0}; + RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW, + rawData, retry)); + char err[ERR_MSG_LEN] = {0}; code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", pName.tname, err); @@ -1991,7 +1993,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){ break; } - end: +end: uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code)); tDeleteMqDataRsp(&rspObj.dataRsp); tDecoderClear(&decoder); @@ -2001,15 +2003,15 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){ } static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { - int32_t code = TSDB_CODE_SUCCESS; - SQuery* pQuery = NULL; - SMqRspObj rspObj = {0}; - SDecoder decoder = {0}; - SHashObj* pCreateTbHash = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SQuery* pQuery = NULL; + SMqRspObj rspObj = {0}; + SDecoder decoder = {0}; + SHashObj* pCreateTbHash = NULL; - SRequestObj* pRequest = NULL; - SCatalog* pCatalog = NULL; - SRequestConnInfo conn = {0}; + SRequestObj* pRequest = NULL; + SCatalog* pCatalog = NULL; + SRequestConnInfo conn = {0}; RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn)); uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); @@ -2019,12 +2021,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) RAW_NULL_CHECK(pCreateTbHash); RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash)); - SHashObj *pVgHash = NULL; - SHashObj *pNameHash = NULL; - SHashObj *pMetaHash = NULL; + SHashObj* pVgHash = NULL; + SHashObj* pNameHash = NULL; + SHashObj* pMetaHash = NULL; RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos)); int retry = 0; - while(1){ + while (1) { RAW_RETURN_CHECK(smlInitHandle(&pQuery)); uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum); while (++rspObj.resIter < rspObj.dataRsp.blockNum) { @@ -2048,11 +2050,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) // find schema data info SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname)); - STableMeta* pTableMeta = NULL; - RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, - &pName, &pTableMeta, pSW, rawData, retry)); - char err[ERR_MSG_LEN] = {0}; - code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); + STableMeta* pTableMeta = NULL; + RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName, + &pTableMeta, pSW, rawData, retry)); + char err[ERR_MSG_LEN] = {0}; + code = + rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", pName.tname, err); goto end; @@ -2147,7 +2150,7 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) { *string = fullStr; return; - end: +end: cJSON_Delete(pJson); tDeleteMqBatchMetaRsp(&rsp); } @@ -2159,18 +2162,18 @@ char* tmq_get_json_meta(TAOS_RES* res) { return NULL; } - char* string = NULL; + char* string = NULL; SMqRspObj* rspObj = (SMqRspObj*)res; if (TD_RES_TMQ_METADATA(res)) { processAutoCreateTable(&rspObj->dataRsp, &string); } else if (TD_RES_TMQ_BATCH_META(res)) { processBatchMetaToJson(&rspObj->batchMetaRsp, &string); } else if (TD_RES_TMQ_META(res)) { - cJSON* pJson = NULL; + cJSON* pJson = NULL; processSimpleMeta(&rspObj->metaRsp, &pJson); string = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); - } else{ + } else { uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res); } @@ -2181,7 +2184,7 @@ char* tmq_get_json_meta(TAOS_RES* res) { void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } static int32_t getOffSetLen(const SMqDataRsp* pRsp) { - SEncoder coder = {0}; + SEncoder coder = {0}; tEncoderInit(&coder, NULL, 0); if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1; @@ -2191,46 +2194,46 @@ static int32_t getOffSetLen(const SMqDataRsp* pRsp) { } typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp); -static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) { - int32_t len = 0; - int32_t code = 0; - SEncoder encoder = {0}; - void* buf = NULL; - tEncodeSize(encodeFunc, rspObj, len, code); - if (code < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; +static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) { + int32_t len = 0; + int32_t code = 0; + SEncoder encoder = {0}; + void* buf = NULL; + tEncodeSize(encodeFunc, rspObj, len, code); + if (code < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - len += sizeof(int8_t) + sizeof(int32_t); - buf = taosMemoryCalloc(1, len); - if (buf == NULL) { - code = terrno; - goto FAILED; + len += sizeof(int8_t) + sizeof(int32_t); + buf = taosMemoryCalloc(1, len); + if (buf == NULL) { + code = terrno; + goto FAILED; } - tEncoderInit(&encoder, buf, len); - if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + tEncoderInit(&encoder, buf, len); + if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - int32_t offsetLen = getOffSetLen(rspObj); - if (offsetLen <= 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + int32_t offsetLen = getOffSetLen(rspObj); + if (offsetLen <= 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - if (tEncodeI32(&encoder, offsetLen) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + if (tEncodeI32(&encoder, offsetLen) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - if (encodeFunc(&encoder, rspObj) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + if (encodeFunc(&encoder, rspObj) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - tEncoderClear(&encoder); + tEncoderClear(&encoder); - raw->raw = buf; - raw->raw_len = len; - return code; - FAILED: + raw->raw = buf; + raw->raw_len = len; + return code; +FAILED: tEncoderClear(&encoder); taosMemoryFree(buf); return code; @@ -2247,7 +2250,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw_type = rspObj->metaRsp.resMsgType; uDebug("tmq get raw type meta:%p", raw); } else if (TD_RES_TMQ(res)) { - int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw); + int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw); if (code != 0) { uError("tmq get raw type error:%d", terrno); return code; @@ -2282,7 +2285,7 @@ void tmq_free_raw(tmq_raw_data raw) { (void)memset(terrMsg, 0, ERR_MSG_LEN); } -static int32_t writeRawInit(){ +static int32_t writeRawInit() { while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) { int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1); if (old == 0) { @@ -2296,7 +2299,7 @@ static int32_t writeRawInit(){ } } - if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL){ + if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) { return TSDB_CODE_INTERNAL_ERROR; } return 0; @@ -2321,7 +2324,7 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) return taosDropTable(taos, buf, len); } else if (type == TDMT_VND_DELETE) { return taosDeleteData(taos, buf, len); - } else if (type == RES_TYPE__TMQ_METADATA){ + } else if (type == RES_TYPE__TMQ_METADATA) { return tmqWriteRawMetaDataImpl(taos, buf, len); } else if (type == RES_TYPE__TMQ) { return tmqWriteRawDataImpl(taos, buf, len); @@ -2344,9 +2347,9 @@ static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen if (taos == NULL || meta == NULL) { return TSDB_CODE_INVALID_PARA; } - SMqBatchMetaRsp rsp = {0}; + SMqBatchMetaRsp rsp = {0}; SDecoder coder = {0}; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; // decode and process req tDecoderInit(&coder, meta, metaLen); @@ -2374,7 +2377,7 @@ static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen } } - end: +end: tDeleteMqBatchMetaRsp(&rsp); return code; }