enh:[TS-5441] cost too long in tmq write meta data
This commit is contained in:
parent
33cab144fa
commit
5399bffe99
|
@ -1790,7 +1790,7 @@ static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) {
|
||||||
void** dataTmp = taosArrayGet(rsp->createTableReq, j);
|
void** dataTmp = taosArrayGet(rsp->createTableReq, j);
|
||||||
RAW_NULL_CHECK(dataTmp);
|
RAW_NULL_CHECK(dataTmp);
|
||||||
int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
|
int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
|
||||||
RAW_NULL_CHECK(dataTmp);
|
RAW_NULL_CHECK(lenTmp);
|
||||||
|
|
||||||
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
||||||
RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
|
RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
|
||||||
|
@ -1799,14 +1799,19 @@ static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL){
|
SVCreateTbReq** pCreateReqDst = taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name));
|
||||||
RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
|
if (pCreateReqDst == NULL){
|
||||||
} else{
|
RAW_RETURN_CHECK(cloneSVreateTbReq(&pCreateReq, pCreateReqDst));
|
||||||
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
code = taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReqDst, POINTER_BYTES);
|
||||||
pCreateReq = (SVCreateTbReq){0};
|
if (code != 0){
|
||||||
|
tdDestroySVCreateTbReq(*pCreateReqDst);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderClear(&decoderTmp);
|
tDecoderClear(&decoderTmp);
|
||||||
|
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
|
||||||
|
pCreateReq = (SVCreateTbReq){0};
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -1816,19 +1821,20 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static threadlocal SHashObj* pVgHash = NULL;
|
||||||
|
static threadlocal SHashObj* pCreateTbHash = NULL;
|
||||||
|
static threadlocal SHashObj* pNameHash = NULL;
|
||||||
|
static threadlocal SHashObj* pMetaHash = NULL;
|
||||||
|
|
||||||
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
if (taos == NULL || data == NULL) {
|
if (taos == NULL || data == NULL) {
|
||||||
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
|
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SHashObj* pVgHash = NULL;
|
|
||||||
SHashObj* pCreateTbHash = NULL;
|
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
SMqTaosxRspObj rspObj = {0};
|
SMqTaosxRspObj rspObj = {0};
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
STableMeta* pTableMeta = NULL;
|
|
||||||
|
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
|
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
|
||||||
|
|
||||||
|
@ -1856,6 +1862,23 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pVgHash == NULL){
|
||||||
|
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
RAW_NULL_CHECK(pVgHash);
|
||||||
|
}
|
||||||
|
if (pCreateTbHash == NULL){
|
||||||
|
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
RAW_NULL_CHECK(pCreateTbHash);
|
||||||
|
}
|
||||||
|
if (pNameHash == NULL){
|
||||||
|
pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
RAW_NULL_CHECK(pNameHash);
|
||||||
|
}
|
||||||
|
if (pMetaHash == NULL){
|
||||||
|
pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
RAW_NULL_CHECK(pMetaHash);
|
||||||
|
taosHashSetFreeFp(pMetaHash, taosMemoryFree);
|
||||||
|
}
|
||||||
struct SCatalog* pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
|
RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
|
||||||
|
|
||||||
|
@ -1866,11 +1889,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
|
||||||
RAW_NULL_CHECK(pVgHash);
|
|
||||||
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
|
||||||
RAW_NULL_CHECK(pCreateTbHash);
|
|
||||||
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash));
|
|
||||||
|
|
||||||
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum);
|
uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum);
|
||||||
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
|
while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) {
|
||||||
|
@ -1894,21 +1912,37 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
|
|
||||||
// find schema data info
|
// find schema data info
|
||||||
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
||||||
SVgroupInfo vg = {0};
|
if (pCreateReqDst == NULL) {
|
||||||
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg));
|
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.rsp, pCreateTbHash));
|
||||||
|
pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vgId = 0;
|
||||||
|
SVgroupInfo* vg = (SVgroupInfo*)taosHashGet(pNameHash, tbName, strlen(tbName));
|
||||||
|
if (vg == NULL) {
|
||||||
|
SVgroupInfo vgTmp = {0};
|
||||||
|
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgTmp));
|
||||||
|
RAW_RETURN_CHECK(taosHashPut(pNameHash, tbName, strlen(tbName), &vgTmp, sizeof(SVgroupInfo)));
|
||||||
|
code = taosHashPut(pVgHash, &vgTmp.vgId, sizeof(vgTmp.vgId), &vgTmp, sizeof(SVgroupInfo));
|
||||||
|
code = (code == TSDB_CODE_DUP_KEY) ? 0 : code;
|
||||||
|
RAW_RETURN_CHECK(code);
|
||||||
|
vgId = vgTmp.vgId;
|
||||||
|
} else {
|
||||||
|
vgId = vg->vgId;
|
||||||
|
}
|
||||||
|
|
||||||
|
STableMeta** pTableMeta = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName));
|
||||||
|
if (pTableMeta == NULL) {
|
||||||
if (pCreateReqDst) { // change stable name to get meta
|
if (pCreateReqDst) { // change stable name to get meta
|
||||||
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
||||||
}
|
}
|
||||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, pTableMeta));
|
||||||
|
RAW_RETURN_CHECK(taosHashPut(pMetaHash, tbName, strlen(tbName), pTableMeta, POINTER_BYTES));
|
||||||
if (pCreateReqDst) {
|
if (pCreateReqDst) {
|
||||||
pTableMeta->vgId = vg.vgId;
|
(*pTableMeta)->vgId = vgId;
|
||||||
pTableMeta->uid = pCreateReqDst->uid;
|
(*pTableMeta)->uid = pCreateReqDst->uid;
|
||||||
pCreateReqDst->ctb.suid = pTableMeta->suid;
|
pCreateReqDst->ctb.suid = (*pTableMeta)->suid;
|
||||||
}
|
}
|
||||||
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
|
|
||||||
if (hData == NULL) {
|
|
||||||
RAW_RETURN_CHECK(taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter);
|
||||||
|
@ -1930,13 +1964,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
if (pCreateReqDst){
|
if (pCreateReqDst){
|
||||||
RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp));
|
RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp));
|
||||||
}
|
}
|
||||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN);
|
code = rawBlockBindData(pQuery, *pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN);
|
||||||
if (pCreateReqTmp != NULL) {
|
if (pCreateReqTmp != NULL) {
|
||||||
tdDestroySVCreateTbReq(pCreateReqTmp);
|
tdDestroySVCreateTbReq(pCreateReqTmp);
|
||||||
taosMemoryFree(pCreateReqTmp);
|
taosMemoryFree(pCreateReqTmp);
|
||||||
}
|
}
|
||||||
taosMemoryFree(fields);
|
taosMemoryFree(fields);
|
||||||
taosMemoryFreeClear(pTableMeta);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
|
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -1950,21 +1983,24 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
|
|
||||||
end:
|
end:
|
||||||
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||||
|
tDeleteSTaosxRsp(&rspObj.rsp);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
destroyRequest(pRequest);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tmqClean() {
|
||||||
|
taosHashCleanup(pMetaHash);
|
||||||
|
taosHashCleanup(pNameHash);
|
||||||
void* pIter = taosHashIterate(pCreateTbHash, NULL);
|
void* pIter = taosHashIterate(pCreateTbHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
|
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
|
||||||
pIter = taosHashIterate(pCreateTbHash, pIter);
|
pIter = taosHashIterate(pCreateTbHash, pIter);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pCreateTbHash);
|
taosHashCleanup(pCreateTbHash);
|
||||||
|
|
||||||
tDeleteSTaosxRsp(&rspObj.rsp);
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
qDestroyQuery(pQuery);
|
|
||||||
destroyRequest(pRequest);
|
|
||||||
taosHashCleanup(pVgHash);
|
taosHashCleanup(pVgHash);
|
||||||
taosMemoryFreeClear(pTableMeta);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
|
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
|
||||||
|
|
Loading…
Reference in New Issue