enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info
This commit is contained in:
parent
94a70c9389
commit
0100d383ee
|
@ -23,12 +23,12 @@
|
|||
#include "tglobal.h"
|
||||
#include "tmsgtype.h"
|
||||
|
||||
#define RAW_NULL_CHECK(c) \
|
||||
do { \
|
||||
if (c == NULL) { \
|
||||
code = terrno; \
|
||||
goto end; \
|
||||
} \
|
||||
#define RAW_NULL_CHECK(c) \
|
||||
do { \
|
||||
if (c == NULL) { \
|
||||
code = terrno; \
|
||||
goto end; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define RAW_FALSE_CHECK(c) \
|
||||
|
@ -52,7 +52,7 @@
|
|||
|
||||
#define TMQ_META_VERSION "1.0"
|
||||
|
||||
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
|
||||
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
|
||||
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
|
||||
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
|
||||
SColCmprWrapper* pColCmprRow, cJSON** pJson) {
|
||||
|
@ -163,7 +163,7 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
|
|||
}
|
||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
|
||||
|
||||
end:
|
||||
end:
|
||||
*pJson = json;
|
||||
}
|
||||
|
||||
|
@ -197,7 +197,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
|
|||
return code;
|
||||
}
|
||||
|
||||
end:
|
||||
end:
|
||||
return code;
|
||||
}
|
||||
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
|
||||
|
@ -338,7 +338,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
|||
break;
|
||||
}
|
||||
|
||||
end:
|
||||
end:
|
||||
tFreeSMAltertbReq(&req);
|
||||
*pJson = json;
|
||||
}
|
||||
|
@ -455,7 +455,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
|
|||
|
||||
cJSON* tvalue = NULL;
|
||||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||
char* buf = NULL;
|
||||
char* buf = NULL;
|
||||
int64_t bufSize = 0;
|
||||
if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
|
||||
bufSize = pTagVal->nData * 2 + 2 + 3;
|
||||
|
@ -485,7 +485,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
|
|||
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
|
||||
}
|
||||
|
||||
end:
|
||||
end:
|
||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
|
||||
taosArrayDestroy(pTagVals);
|
||||
}
|
||||
|
@ -514,7 +514,7 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO
|
|||
}
|
||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList));
|
||||
|
||||
end:
|
||||
end:
|
||||
*pJson = json;
|
||||
}
|
||||
|
||||
|
@ -585,7 +585,7 @@ static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
|
|||
*string = cJSON_PrintUnformatted(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug("auto created table return, sql json:%s", *string);
|
||||
for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
|
||||
tDecoderClear(&decoder[i]);
|
||||
|
@ -989,7 +989,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
destroyRequest(pRequest);
|
||||
tFreeSMCreateStbReq(&pReq);
|
||||
|
@ -1023,9 +1023,9 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
SCatalog* pCatalog = NULL;
|
||||
RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
|
||||
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
|
||||
SName pName = {0};
|
||||
toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
|
||||
STableMeta* pTableMeta = NULL;
|
||||
|
@ -1088,7 +1088,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
destroyRequest(pRequest);
|
||||
tDecoderClear(&coder);
|
||||
|
@ -1142,9 +1142,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
|
||||
|
||||
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
|
||||
|
||||
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
|
||||
RAW_NULL_CHECK(pRequest->tableList);
|
||||
|
@ -1269,7 +1269,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
tDeleteSVCreateTbBatchReq(&req);
|
||||
|
||||
|
@ -1328,9 +1328,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
|
||||
|
||||
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
|
||||
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
|
||||
RAW_NULL_CHECK(pRequest->tableList);
|
||||
// loop to create table
|
||||
|
@ -1395,7 +1395,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
}
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
destroyRequest(pRequest);
|
||||
|
@ -1433,7 +1433,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
}
|
||||
taos_free_result(res);
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
|
||||
tDecoderClear(&coder);
|
||||
return code;
|
||||
|
@ -1473,9 +1473,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
SCatalog* pCatalog = NULL;
|
||||
RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
|
||||
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
|
||||
.requestId = pRequest->requestId,
|
||||
.requestObjRefId = pRequest->self,
|
||||
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
|
||||
|
||||
SVgroupInfo pInfo = {0};
|
||||
SName pName = {0};
|
||||
|
@ -1543,7 +1543,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
code = handleAlterTbExecRes(pRes->res, pCatalog);
|
||||
}
|
||||
}
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
|
||||
taosArrayDestroy(pArray);
|
||||
if (pVgData) taosMemoryFreeClear(pVgData->pData);
|
||||
|
@ -1608,7 +1608,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
|
|||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
qDestroyQuery(pQuery);
|
||||
|
@ -1668,7 +1668,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
|
|||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
code = pRequest->code;
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
qDestroyQuery(pQuery);
|
||||
|
@ -1708,7 +1708,8 @@ static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
|
|||
goto end;
|
||||
}
|
||||
if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
|
||||
RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
|
||||
RAW_RETURN_CHECK(
|
||||
taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
|
||||
} else {
|
||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||
pCreateReq = (SVCreateTbReq){0};
|
||||
|
@ -1734,34 +1735,34 @@ static SHashObj* writeRawCache = NULL;
|
|||
static int8_t initFlag = 0;
|
||||
static int8_t initedFlag = WRITE_RAW_INIT_START;
|
||||
|
||||
typedef struct{
|
||||
SHashObj* pVgHash;
|
||||
SHashObj* pNameHash;
|
||||
SHashObj* pMetaHash;
|
||||
}rawCacheInfo;
|
||||
typedef struct {
|
||||
SHashObj* pVgHash;
|
||||
SHashObj* pNameHash;
|
||||
SHashObj* pMetaHash;
|
||||
} rawCacheInfo;
|
||||
|
||||
typedef struct{
|
||||
typedef struct {
|
||||
SVgroupInfo vgInfo;
|
||||
int64_t uid;
|
||||
int64_t suid;
|
||||
}tbInfo;
|
||||
} tbInfo;
|
||||
|
||||
static void tmqFreeMeta(void *data){
|
||||
static void tmqFreeMeta(void* data) {
|
||||
STableMeta* pTableMeta = *(STableMeta**)data;
|
||||
taosMemoryFree(pTableMeta);
|
||||
}
|
||||
|
||||
static void freeRawCache(void *data) {
|
||||
static void freeRawCache(void* data) {
|
||||
rawCacheInfo* pRawCache = (rawCacheInfo*)data;
|
||||
taosHashCleanup(pRawCache->pMetaHash);
|
||||
taosHashCleanup(pRawCache->pNameHash);
|
||||
taosHashCleanup(pRawCache->pVgHash);
|
||||
}
|
||||
|
||||
static int32_t initRawCacheHash(){
|
||||
if (writeRawCache == NULL){
|
||||
static int32_t initRawCacheHash() {
|
||||
if (writeRawCache == NULL) {
|
||||
writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (writeRawCache == NULL){
|
||||
if (writeRawCache == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
taosHashSetFreeFp(writeRawCache, freeRawCache);
|
||||
|
@ -1769,7 +1770,7 @@ static int32_t initRawCacheHash(){
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){
|
||||
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
|
||||
char* p = (char*)rawData;
|
||||
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
|
||||
// column length |
|
||||
|
@ -1799,16 +1800,15 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
|
|||
}
|
||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||
|
||||
if (j == pTableMeta->tableInfo.numOfColumns)
|
||||
return true;
|
||||
if (j == pTableMeta->tableInfo.numOfColumns) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t getRawCache(SHashObj **pVgHash, SHashObj **pNameHash, SHashObj **pMetaHash, void *key) {
|
||||
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
|
||||
int32_t code = 0;
|
||||
void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
|
||||
if (cacheInfo == NULL){
|
||||
void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
|
||||
if (cacheInfo == NULL) {
|
||||
*pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(*pVgHash);
|
||||
*pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
@ -1819,7 +1819,7 @@ static int32_t getRawCache(SHashObj **pVgHash, SHashObj **pNameHash, SHashObj **
|
|||
rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
|
||||
RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
|
||||
} else {
|
||||
rawCacheInfo *info = (rawCacheInfo *)cacheInfo;
|
||||
rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
|
||||
*pVgHash = info->pVgHash;
|
||||
*pNameHash = info->pNameHash;
|
||||
*pMetaHash = info->pMetaHash;
|
||||
|
@ -1833,7 +1833,7 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo *conn){
|
||||
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
|
||||
int32_t code = 0;
|
||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, pRequest));
|
||||
(*pRequest)->syncQuery = true;
|
||||
|
@ -1852,29 +1852,30 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef int32_t _raw_decode_func_(SDecoder *pDecoder, SMqDataRsp *pRsp);
|
||||
static int32_t decodeRawData(SDecoder *decoder, void* data, int32_t dataLen, _raw_decode_func_ func, SMqRspObj* rspObj){
|
||||
int8_t dataVersion = *(int8_t*)data;
|
||||
if (dataVersion >= MQ_DATA_RSP_VERSION) {
|
||||
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
||||
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
||||
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
|
||||
static int32_t decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func,
|
||||
SMqRspObj* rspObj) {
|
||||
int8_t dataVersion = *(int8_t*)data;
|
||||
if (dataVersion >= MQ_DATA_RSP_VERSION) {
|
||||
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
||||
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
||||
}
|
||||
|
||||
rspObj->resIter = -1;
|
||||
tDecoderInit(decoder, data, dataLen);
|
||||
int32_t code = func(decoder, &rspObj->dataRsp);
|
||||
if (code != 0) {
|
||||
SET_ERROR_MSG("decode mq taosx data rsp failed");
|
||||
rspObj->resIter = -1;
|
||||
tDecoderInit(decoder, data, dataLen);
|
||||
int32_t code = func(decoder, &rspObj->dataRsp);
|
||||
if (code != 0) {
|
||||
SET_ERROR_MSG("decode mq taosx data rsp failed");
|
||||
}
|
||||
return code;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj *pMetaHash, SVCreateTbReq* pCreateReqDst,
|
||||
SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
|
||||
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry){
|
||||
int32_t code = 0;
|
||||
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
|
||||
SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
|
||||
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
|
||||
int32_t code = 0;
|
||||
STableMeta* pTableMeta = NULL;
|
||||
tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
|
||||
tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
|
||||
if (tmpInfo == NULL || retry > 0) {
|
||||
tbInfo info = {0};
|
||||
|
||||
|
@ -1884,13 +1885,13 @@ static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj
|
|||
}
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
|
||||
info.uid = pTableMeta->uid;
|
||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE){
|
||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
||||
info.suid = pTableMeta->suid;
|
||||
} else {
|
||||
info.suid = pTableMeta->uid;
|
||||
}
|
||||
code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
|
||||
if (code != 0){
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pTableMeta);
|
||||
goto end;
|
||||
}
|
||||
|
@ -1902,20 +1903,21 @@ static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj
|
|||
|
||||
RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
|
||||
tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
|
||||
RAW_RETURN_CHECK(taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
|
||||
RAW_RETURN_CHECK(
|
||||
taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
|
||||
}
|
||||
|
||||
if (pTableMeta == NULL || retry > 0){
|
||||
if (pTableMeta == NULL || retry > 0) {
|
||||
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
|
||||
if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
|
||||
code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
|
||||
if (code != 0){
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pTableMeta);
|
||||
goto end;
|
||||
}
|
||||
|
||||
}else{
|
||||
} else {
|
||||
pTableMeta = *pTableMetaTmp;
|
||||
pTableMeta->uid = tmpInfo->uid;
|
||||
pTableMeta->vgId = tmpInfo->vgInfo.vgId;
|
||||
|
@ -1927,25 +1929,25 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SQuery* pQuery = NULL;
|
||||
SMqRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SQuery* pQuery = NULL;
|
||||
SMqRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
|
||||
SRequestObj* pRequest = NULL;
|
||||
SCatalog* pCatalog = NULL;
|
||||
SRequestConnInfo conn = {0};
|
||||
SRequestObj* pRequest = NULL;
|
||||
SCatalog* pCatalog = NULL;
|
||||
SRequestConnInfo conn = {0};
|
||||
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
|
||||
uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
|
||||
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
|
||||
|
||||
SHashObj *pVgHash = NULL;
|
||||
SHashObj *pNameHash = NULL;
|
||||
SHashObj *pMetaHash = NULL;
|
||||
SHashObj* pVgHash = NULL;
|
||||
SHashObj* pNameHash = NULL;
|
||||
SHashObj* pMetaHash = NULL;
|
||||
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
|
||||
int retry = 0;
|
||||
while(1){
|
||||
while (1) {
|
||||
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
|
||||
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
|
||||
|
@ -1968,9 +1970,9 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){
|
|||
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
|
||||
|
||||
STableMeta* pTableMeta = NULL;
|
||||
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn,
|
||||
&pName, &pTableMeta, pSW, rawData, retry));
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
|
||||
rawData, retry));
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
|
||||
|
@ -1991,7 +1993,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){
|
|||
break;
|
||||
}
|
||||
|
||||
end:
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
tDeleteMqDataRsp(&rspObj.dataRsp);
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -2001,15 +2003,15 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){
|
|||
}
|
||||
|
||||
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SQuery* pQuery = NULL;
|
||||
SMqRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
SHashObj* pCreateTbHash = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SQuery* pQuery = NULL;
|
||||
SMqRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
SHashObj* pCreateTbHash = NULL;
|
||||
|
||||
SRequestObj* pRequest = NULL;
|
||||
SCatalog* pCatalog = NULL;
|
||||
SRequestConnInfo conn = {0};
|
||||
SRequestObj* pRequest = NULL;
|
||||
SCatalog* pCatalog = NULL;
|
||||
SRequestConnInfo conn = {0};
|
||||
|
||||
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
|
||||
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
|
||||
|
@ -2019,12 +2021,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
RAW_NULL_CHECK(pCreateTbHash);
|
||||
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
|
||||
|
||||
SHashObj *pVgHash = NULL;
|
||||
SHashObj *pNameHash = NULL;
|
||||
SHashObj *pMetaHash = NULL;
|
||||
SHashObj* pVgHash = NULL;
|
||||
SHashObj* pNameHash = NULL;
|
||||
SHashObj* pMetaHash = NULL;
|
||||
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
|
||||
int retry = 0;
|
||||
while(1){
|
||||
while (1) {
|
||||
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
|
||||
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
|
||||
|
@ -2048,11 +2050,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
|
||||
// find schema data info
|
||||
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
|
||||
STableMeta* pTableMeta = NULL;
|
||||
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn,
|
||||
&pName, &pTableMeta, pSW, rawData, retry));
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||
STableMeta* pTableMeta = NULL;
|
||||
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
|
||||
&pTableMeta, pSW, rawData, retry));
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code =
|
||||
rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
|
||||
goto end;
|
||||
|
@ -2147,7 +2150,7 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
|
|||
*string = fullStr;
|
||||
return;
|
||||
|
||||
end:
|
||||
end:
|
||||
cJSON_Delete(pJson);
|
||||
tDeleteMqBatchMetaRsp(&rsp);
|
||||
}
|
||||
|
@ -2159,18 +2162,18 @@ char* tmq_get_json_meta(TAOS_RES* res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
char* string = NULL;
|
||||
char* string = NULL;
|
||||
SMqRspObj* rspObj = (SMqRspObj*)res;
|
||||
if (TD_RES_TMQ_METADATA(res)) {
|
||||
processAutoCreateTable(&rspObj->dataRsp, &string);
|
||||
} else if (TD_RES_TMQ_BATCH_META(res)) {
|
||||
processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
|
||||
} else if (TD_RES_TMQ_META(res)) {
|
||||
cJSON* pJson = NULL;
|
||||
cJSON* pJson = NULL;
|
||||
processSimpleMeta(&rspObj->metaRsp, &pJson);
|
||||
string = cJSON_PrintUnformatted(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
} else{
|
||||
} else {
|
||||
uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
|
||||
}
|
||||
|
||||
|
@ -2181,7 +2184,7 @@ char* tmq_get_json_meta(TAOS_RES* res) {
|
|||
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
|
||||
|
||||
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
|
||||
SEncoder coder = {0};
|
||||
SEncoder coder = {0};
|
||||
tEncoderInit(&coder, NULL, 0);
|
||||
if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
|
||||
if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
|
||||
|
@ -2191,46 +2194,46 @@ static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
|
|||
}
|
||||
|
||||
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
|
||||
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
SEncoder encoder = {0};
|
||||
void* buf = NULL;
|
||||
tEncodeSize(encodeFunc, rspObj, len, code);
|
||||
if (code < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
SEncoder encoder = {0};
|
||||
void* buf = NULL;
|
||||
tEncodeSize(encodeFunc, rspObj, len, code);
|
||||
if (code < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
}
|
||||
len += sizeof(int8_t) + sizeof(int32_t);
|
||||
buf = taosMemoryCalloc(1, len);
|
||||
if (buf == NULL) {
|
||||
code = terrno;
|
||||
goto FAILED;
|
||||
len += sizeof(int8_t) + sizeof(int32_t);
|
||||
buf = taosMemoryCalloc(1, len);
|
||||
if (buf == NULL) {
|
||||
code = terrno;
|
||||
goto FAILED;
|
||||
}
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
}
|
||||
int32_t offsetLen = getOffSetLen(rspObj);
|
||||
if (offsetLen <= 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
int32_t offsetLen = getOffSetLen(rspObj);
|
||||
if (offsetLen <= 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
}
|
||||
if (tEncodeI32(&encoder, offsetLen) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
if (tEncodeI32(&encoder, offsetLen) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
}
|
||||
if (encodeFunc(&encoder, rspObj) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
if (encodeFunc(&encoder, rspObj) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto FAILED;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
raw->raw = buf;
|
||||
raw->raw_len = len;
|
||||
return code;
|
||||
FAILED:
|
||||
raw->raw = buf;
|
||||
raw->raw_len = len;
|
||||
return code;
|
||||
FAILED:
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(buf);
|
||||
return code;
|
||||
|
@ -2247,7 +2250,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
|||
raw->raw_type = rspObj->metaRsp.resMsgType;
|
||||
uDebug("tmq get raw type meta:%p", raw);
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
|
||||
int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
|
||||
if (code != 0) {
|
||||
uError("tmq get raw type error:%d", terrno);
|
||||
return code;
|
||||
|
@ -2282,7 +2285,7 @@ void tmq_free_raw(tmq_raw_data raw) {
|
|||
(void)memset(terrMsg, 0, ERR_MSG_LEN);
|
||||
}
|
||||
|
||||
static int32_t writeRawInit(){
|
||||
static int32_t writeRawInit() {
|
||||
while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
|
||||
int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
|
||||
if (old == 0) {
|
||||
|
@ -2296,7 +2299,7 @@ static int32_t writeRawInit(){
|
|||
}
|
||||
}
|
||||
|
||||
if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL){
|
||||
if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
|
||||
return TSDB_CODE_INTERNAL_ERROR;
|
||||
}
|
||||
return 0;
|
||||
|
@ -2321,7 +2324,7 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
|
|||
return taosDropTable(taos, buf, len);
|
||||
} else if (type == TDMT_VND_DELETE) {
|
||||
return taosDeleteData(taos, buf, len);
|
||||
} else if (type == RES_TYPE__TMQ_METADATA){
|
||||
} else if (type == RES_TYPE__TMQ_METADATA) {
|
||||
return tmqWriteRawMetaDataImpl(taos, buf, len);
|
||||
} else if (type == RES_TYPE__TMQ) {
|
||||
return tmqWriteRawDataImpl(taos, buf, len);
|
||||
|
@ -2344,9 +2347,9 @@ static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen
|
|||
if (taos == NULL || meta == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
SMqBatchMetaRsp rsp = {0};
|
||||
SMqBatchMetaRsp rsp = {0};
|
||||
SDecoder coder = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// decode and process req
|
||||
tDecoderInit(&coder, meta, metaLen);
|
||||
|
@ -2374,7 +2377,7 @@ static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen
|
|||
}
|
||||
}
|
||||
|
||||
end:
|
||||
end:
|
||||
tDeleteMqBatchMetaRsp(&rsp);
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue