enh:[TS-5441] cost too long in tmq write meta data
This commit is contained in:
parent
54bc8c3e22
commit
865183cda8
|
@ -1780,6 +1780,42 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) {
|
||||
// find schema data info
|
||||
int32_t code = 0;
|
||||
SVCreateTbReq pCreateReq = {0};
|
||||
SDecoder decoderTmp = {0};
|
||||
|
||||
for (int j = 0; j < rsp->createTableNum; j++) {
|
||||
void** dataTmp = taosArrayGet(rsp->createTableReq, j);
|
||||
RAW_NULL_CHECK(dataTmp);
|
||||
int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
|
||||
RAW_NULL_CHECK(dataTmp);
|
||||
|
||||
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
||||
RAW_RETURN_CHECK (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
|
||||
|
||||
if (pCreateReq.type != TSDB_CHILD_TABLE) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto end;
|
||||
}
|
||||
if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL){
|
||||
RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
|
||||
} else{
|
||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||
pCreateReq = (SVCreateTbReq){0};
|
||||
}
|
||||
|
||||
tDecoderClear(&decoderTmp);
|
||||
}
|
||||
return 0;
|
||||
|
||||
end:
|
||||
tDecoderClear(&decoderTmp);
|
||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||
if (taos == NULL || data == NULL) {
|
||||
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
|
||||
|
@ -1791,7 +1827,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
SMqTaosxRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SVCreateTbReq* pCreateReqDst = NULL;
|
||||
SHashObj* pCreateTbHash = NULL;
|
||||
|
||||
SRequestObj* pRequest = NULL;
|
||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
|
||||
|
@ -1832,6 +1868,9 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pVgHash);
|
||||
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pCreateTbHash);
|
||||
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash));
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum);
|
||||
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
|
||||
|
@ -1854,40 +1893,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
(void)strcpy(pName.tname, tbName);
|
||||
|
||||
// find schema data info
|
||||
for (int j = 0; j < rspObj.rsp.createTableNum; j++) {
|
||||
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
|
||||
RAW_NULL_CHECK(dataTmp);
|
||||
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
|
||||
RAW_NULL_CHECK(dataTmp);
|
||||
|
||||
SDecoder decoderTmp = {0};
|
||||
SVCreateTbReq pCreateReq = {0};
|
||||
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
||||
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
|
||||
tDecoderClear(&decoderTmp);
|
||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
SET_ERROR_MSG("decode create table:%s req failed", tbName);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (pCreateReq.type != TSDB_CHILD_TABLE) {
|
||||
code = TSDB_CODE_TSC_INVALID_VALUE;
|
||||
tDecoderClear(&decoderTmp);
|
||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||
SET_ERROR_MSG("create table req type is not child table: %s, type: %d", tbName, pCreateReq.type);
|
||||
goto end;
|
||||
}
|
||||
if (strcmp(tbName, pCreateReq.name) == 0) {
|
||||
RAW_RETURN_CHECK(cloneSVreateTbReq(&pCreateReq, &pCreateReqDst));
|
||||
tDecoderClear(&decoderTmp);
|
||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||
break;
|
||||
}
|
||||
tDecoderClear(&decoderTmp);
|
||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||
}
|
||||
|
||||
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
||||
SVgroupInfo vg = {0};
|
||||
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg));
|
||||
if (pCreateReqDst) { // change stable name to get meta
|
||||
|
@ -1920,13 +1926,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
}
|
||||
void* rawData = getRawDataFromRes(pRetrieve);
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
|
||||
SVCreateTbReq* pCreateReqTmp = NULL;
|
||||
if (pCreateReqDst){
|
||||
RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp));
|
||||
}
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN);
|
||||
if (pCreateReqTmp != NULL) {
|
||||
tdDestroySVCreateTbReq(pCreateReqTmp);
|
||||
taosMemoryFree(pCreateReqTmp);
|
||||
}
|
||||
taosMemoryFree(fields);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
if (pCreateReqDst) {
|
||||
tdDestroySVCreateTbReq(pCreateReqDst);
|
||||
taosMemoryFreeClear(pCreateReqDst);
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
|
||||
goto end;
|
||||
|
@ -1940,16 +1950,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
|
||||
end:
|
||||
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||
void* pIter = taosHashIterate(pCreateTbHash, NULL);
|
||||
while (pIter) {
|
||||
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
|
||||
pIter = taosHashIterate(pCreateTbHash, pIter);
|
||||
}
|
||||
taosHashCleanup(pCreateTbHash);
|
||||
tDeleteSTaosxRsp(&rspObj.rsp);
|
||||
tDecoderClear(&decoder);
|
||||
qDestroyQuery(pQuery);
|
||||
destroyRequest(pRequest);
|
||||
taosHashCleanup(pVgHash);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
if (pCreateReqDst) {
|
||||
tdDestroySVCreateTbReq(pCreateReqDst);
|
||||
taosMemoryFreeClear(pCreateReqDst);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue