enh:[TS-5441] cost too long in tmq write meta data

This commit is contained in:
wangmm0220 2024-09-24 18:54:52 +08:00
parent cf75cf4fe2
commit f84fe05a67
1 changed files with 55 additions and 41 deletions

View File

@ -26,7 +26,7 @@
#define RAW_NULL_CHECK(c) \
do { \
if (c == NULL) { \
code = TSDB_CODE_OUT_OF_MEMORY; \
code = terrno; \
goto end; \
} \
} while (0)
@ -1780,6 +1780,42 @@ end:
return code;
}
static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) {
// find schema data info
int32_t code = 0;
SVCreateTbReq pCreateReq = {0};
SDecoder decoderTmp = {0};
for (int j = 0; j < rsp->createTableNum; j++) {
void** dataTmp = taosArrayGet(rsp->createTableReq, j);
RAW_NULL_CHECK(dataTmp);
int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
RAW_NULL_CHECK(dataTmp);
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
RAW_RETURN_CHECK (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
if (pCreateReq.type != TSDB_CHILD_TABLE) {
code = TSDB_CODE_INVALID_MSG;
goto end;
}
if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL){
RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
} else{
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
pCreateReq = (SVCreateTbReq){0};
}
tDecoderClear(&decoderTmp);
}
return 0;
end:
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
return code;
}
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
if (taos == NULL || data == NULL) {
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
@ -1787,11 +1823,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL;
SHashObj* pCreateTbHash = NULL;
SQuery* pQuery = NULL;
SMqTaosxRspObj rspObj = {0};
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
SVCreateTbReq* pCreateReqDst = NULL;
SRequestObj* pRequest = NULL;
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
@ -1832,6 +1868,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pVgHash);
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
RAW_NULL_CHECK(pCreateTbHash);
taosHashSetFreeFp(pCreateTbHash, (_hash_free_fn_t)tdDestroySVCreateTbReq);
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash));
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum);
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
@ -1854,40 +1895,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
(void)strcpy(pName.tname, tbName);
// find schema data info
for (int j = 0; j < rspObj.rsp.createTableNum; j++) {
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
RAW_NULL_CHECK(dataTmp);
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
RAW_NULL_CHECK(dataTmp);
SDecoder decoderTmp = {0};
SVCreateTbReq pCreateReq = {0};
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
code = TSDB_CODE_TMQ_INVALID_MSG;
SET_ERROR_MSG("decode create table:%s req failed", tbName);
goto end;
}
if (pCreateReq.type != TSDB_CHILD_TABLE) {
code = TSDB_CODE_TSC_INVALID_VALUE;
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
SET_ERROR_MSG("create table req type is not child table: %s, type: %d", tbName, pCreateReq.type);
goto end;
}
if (strcmp(tbName, pCreateReq.name) == 0) {
RAW_RETURN_CHECK(cloneSVreateTbReq(&pCreateReq, &pCreateReqDst));
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
break;
}
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
}
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
SVgroupInfo vg = {0};
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg));
if (pCreateReqDst) { // change stable name to get meta
@ -1920,7 +1928,15 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
}
void* rawData = getRawDataFromRes(pRetrieve);
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
SVCreateTbReq* pCreateReqTmp = NULL;
if (pCreateReqDst){
RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp));
}
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN);
if (pCreateReqTmp != NULL) {
tdDestroySVCreateTbReq(pCreateReqTmp);
taosMemoryFree(pCreateReqTmp);
}
taosMemoryFree(fields);
taosMemoryFreeClear(pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
@ -1936,16 +1952,14 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
end:
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
taosHashCleanup(pCreateTbHash);
tDeleteSTaosxRsp(&rspObj.rsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
destroyRequest(pRequest);
taosHashCleanup(pVgHash);
taosMemoryFreeClear(pTableMeta);
if (pCreateReqDst) {
tdDestroySVCreateTbReq(pCreateReqDst);
taosMemoryFree(pCreateReqDst);
}
return code;
}