enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info
This commit is contained in:
parent
bc05289192
commit
51c7f8b4f0
|
@ -888,9 +888,6 @@ end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
if (taos == NULL || meta == NULL) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
SVCreateStbReq req = {0};
|
SVCreateStbReq req = {0};
|
||||||
SDecoder coder;
|
SDecoder coder;
|
||||||
SMCreateStbReq pReq = {0};
|
SMCreateStbReq pReq = {0};
|
||||||
|
@ -1001,9 +998,6 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
if (taos == NULL || meta == NULL) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
SVDropStbReq req = {0};
|
SVDropStbReq req = {0};
|
||||||
SDecoder coder = {0};
|
SDecoder coder = {0};
|
||||||
SMDropStbReq pReq = {0};
|
SMDropStbReq pReq = {0};
|
||||||
|
@ -1113,9 +1107,6 @@ static void destroyCreateTbReqBatch(void* data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
if (taos == NULL || meta == NULL) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
SVCreateTbBatchReq req = {0};
|
SVCreateTbBatchReq req = {0};
|
||||||
SDecoder coder = {0};
|
SDecoder coder = {0};
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1302,9 +1293,6 @@ static void destroyDropTbReqBatch(void* data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
if (taos == NULL || meta == NULL) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
SVDropTbBatchReq req = {0};
|
SVDropTbBatchReq req = {0};
|
||||||
SDecoder coder = {0};
|
SDecoder coder = {0};
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1417,9 +1405,6 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
if (taos == NULL || meta == NULL) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
SDeleteRes req = {0};
|
SDeleteRes req = {0};
|
||||||
SDecoder coder = {0};
|
SDecoder coder = {0};
|
||||||
char sql[256] = {0};
|
char sql[256] = {0};
|
||||||
|
@ -1455,9 +1440,6 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
if (taos == NULL || meta == NULL) {
|
|
||||||
return TSDB_CODE_INVALID_PARA;
|
|
||||||
}
|
|
||||||
SVAlterTbReq req = {0};
|
SVAlterTbReq req = {0};
|
||||||
SDecoder dcoder = {0};
|
SDecoder dcoder = {0};
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1771,7 +1753,7 @@ static void freeRawCache(void *data) {
|
||||||
|
|
||||||
static int32_t initRawCacheHash(){
|
static int32_t initRawCacheHash(){
|
||||||
if (writeRawCache == NULL){
|
if (writeRawCache == NULL){
|
||||||
writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
if (writeRawCache == NULL){
|
if (writeRawCache == NULL){
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -1844,96 +1826,56 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) {
|
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo *conn){
|
||||||
if (taos == NULL || data == NULL) {
|
int32_t code = 0;
|
||||||
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
|
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, pRequest));
|
||||||
return TSDB_CODE_INVALID_PARA;
|
(*pRequest)->syncQuery = true;
|
||||||
|
if (!(*pRequest)->pDb) {
|
||||||
|
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
SQuery* pQuery = NULL;
|
|
||||||
SMqRspObj rspObj = {0};
|
|
||||||
SDecoder decoder = {0};
|
|
||||||
SHashObj* pCreateTbHash = NULL;
|
|
||||||
SRequestObj* pRequest = NULL;
|
|
||||||
RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest));
|
|
||||||
|
|
||||||
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
|
RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
|
||||||
pRequest->syncQuery = true;
|
conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
|
||||||
rspObj.resIter = -1;
|
conn->requestId = (*pRequest)->requestId;
|
||||||
// rspObj.resType = RES_TYPE__TMQ_METADATA;
|
conn->requestObjRefId = (*pRequest)->self;
|
||||||
|
conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
end:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef int32_t _raw_decode_func_(SDecoder *pDecoder, SMqDataRsp *pRsp);
|
||||||
|
static int32_t decodeRawData(SDecoder *decoder, void* data, int32_t dataLen, _raw_decode_func_ func, SMqRspObj* rspObj){
|
||||||
int8_t dataVersion = *(int8_t*)data;
|
int8_t dataVersion = *(int8_t*)data;
|
||||||
if (dataVersion >= MQ_DATA_RSP_VERSION) {
|
if (dataVersion >= MQ_DATA_RSP_VERSION) {
|
||||||
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
|
||||||
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
dataLen -= sizeof(int8_t) + sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderInit(&decoder, data, dataLen);
|
rspObj->resIter = -1;
|
||||||
code = (type == RES_TYPE__TMQ_METADATA) ? tDecodeSTaosxRsp(&decoder, &rspObj.dataRsp) : tDecodeMqDataRsp(&decoder, &rspObj.dataRsp);
|
tDecoderInit(decoder, data, dataLen);
|
||||||
|
int32_t code = func(decoder, &rspObj->dataRsp);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
SET_ERROR_MSG("decode mq taosx data rsp failed");
|
SET_ERROR_MSG("decode mq taosx data rsp failed");
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto end;
|
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (!pRequest->pDb) {
|
static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj *pMetaHash, SVCreateTbReq* pCreateReqDst,
|
||||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
|
||||||
goto end;
|
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry){
|
||||||
}
|
int32_t code = 0;
|
||||||
|
|
||||||
struct SCatalog* pCatalog = NULL;
|
|
||||||
RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
|
|
||||||
|
|
||||||
SRequestConnInfo conn = {0};
|
|
||||||
conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
|
|
||||||
conn.requestId = pRequest->requestId;
|
|
||||||
conn.requestObjRefId = pRequest->self;
|
|
||||||
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
|
||||||
|
|
||||||
if (type == RES_TYPE__TMQ_METADATA) {
|
|
||||||
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
|
||||||
RAW_NULL_CHECK(pCreateTbHash);
|
|
||||||
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
|
|
||||||
}
|
|
||||||
|
|
||||||
SHashObj *pVgHash = NULL;
|
|
||||||
SHashObj *pNameHash = NULL;
|
|
||||||
SHashObj *pMetaHash = NULL;
|
|
||||||
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
|
|
||||||
int retry = 0;
|
|
||||||
while(1){
|
|
||||||
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
|
||||||
uDebug(LOG_ID_TAG " write raw data type:%d block num:%d", LOG_ID_VALUE, type, rspObj.dataRsp.blockNum);
|
|
||||||
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
|
|
||||||
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
|
|
||||||
RAW_NULL_CHECK(pRetrieve);
|
|
||||||
if (!rspObj.dataRsp.withSchema) {
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
|
|
||||||
RAW_NULL_CHECK(tbName);
|
|
||||||
|
|
||||||
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
|
|
||||||
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
|
||||||
(void)strcpy(pName.dbname, pRequest->pDb);
|
|
||||||
(void)strcpy(pName.tname, tbName);
|
|
||||||
|
|
||||||
// find schema data info
|
|
||||||
SVCreateTbReq* pCreateReqDst = NULL;
|
|
||||||
if (type == RES_TYPE__TMQ_METADATA){
|
|
||||||
pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
|
|
||||||
}
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, tbName, strlen(tbName));
|
tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
|
||||||
if (tmpInfo == NULL || retry > 0) {
|
if (tmpInfo == NULL || retry > 0) {
|
||||||
tbInfo info = {0};
|
tbInfo info = {0};
|
||||||
|
|
||||||
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &info.vgInfo));
|
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
|
||||||
if (pCreateReqDst && tmpInfo == NULL) { // change stable name to get meta
|
if (pCreateReqDst && tmpInfo == NULL) { // change stable name to get meta
|
||||||
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
|
||||||
}
|
}
|
||||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
|
||||||
info.uid = pTableMeta->uid;
|
info.uid = pTableMeta->uid;
|
||||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE){
|
if (pTableMeta->tableType == TSDB_CHILD_TABLE){
|
||||||
info.suid = pTableMeta->suid;
|
info.suid = pTableMeta->suid;
|
||||||
|
@ -1951,24 +1893,15 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
||||||
pCreateReqDst->ctb.suid = pTableMeta->suid;
|
pCreateReqDst->ctb.suid = pTableMeta->suid;
|
||||||
}
|
}
|
||||||
|
|
||||||
RAW_RETURN_CHECK(taosHashPut(pNameHash, pName.tname, strlen(pName.tname), &info, sizeof(tbInfo)));
|
RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
|
||||||
tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName.tname, strlen(pName.tname));
|
tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
|
||||||
// code = (code == TSDB_CODE_DUP_KEY) ? 0 : code;
|
|
||||||
// RAW_RETURN_CHECK(code);
|
|
||||||
RAW_RETURN_CHECK(taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
|
RAW_RETURN_CHECK(taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
|
||||||
// code = (code == TSDB_CODE_DUP_KEY) ? 0 : code;
|
|
||||||
// RAW_RETURN_CHECK(code);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
|
||||||
RAW_NULL_CHECK(pSW);
|
|
||||||
void* rawData = getRawDataFromRes(pRetrieve);
|
|
||||||
RAW_NULL_CHECK(rawData);
|
|
||||||
|
|
||||||
if (pTableMeta == NULL || retry > 0){
|
if (pTableMeta == NULL || retry > 0){
|
||||||
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
|
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
|
||||||
if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
|
if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
|
||||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
|
||||||
code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
|
code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
|
||||||
if (code != 0){
|
if (code != 0){
|
||||||
taosMemoryFree(pTableMeta);
|
taosMemoryFree(pTableMeta);
|
||||||
|
@ -1981,11 +1914,60 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
||||||
pTableMeta->vgId = tmpInfo->vgInfo.vgId;
|
pTableMeta->vgId = tmpInfo->vgInfo.vgId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*pMeta = pTableMeta;
|
||||||
|
|
||||||
|
end:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SQuery* pQuery = NULL;
|
||||||
|
SMqRspObj rspObj = {0};
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
|
||||||
|
SRequestObj* pRequest = NULL;
|
||||||
|
SCatalog* pCatalog = NULL;
|
||||||
|
SRequestConnInfo conn = {0};
|
||||||
|
|
||||||
|
uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
|
||||||
|
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
|
||||||
|
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
|
||||||
|
|
||||||
|
SHashObj *pVgHash = NULL;
|
||||||
|
SHashObj *pNameHash = NULL;
|
||||||
|
SHashObj *pMetaHash = NULL;
|
||||||
|
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
|
||||||
|
int retry = 0;
|
||||||
|
while(1){
|
||||||
|
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||||
|
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
|
||||||
|
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
|
||||||
|
if (!rspObj.dataRsp.withSchema) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
|
||||||
|
RAW_NULL_CHECK(tbName);
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||||
|
RAW_NULL_CHECK(pSW);
|
||||||
|
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
|
||||||
|
RAW_NULL_CHECK(pRetrieve);
|
||||||
|
void* rawData = getRawDataFromRes(pRetrieve);
|
||||||
|
RAW_NULL_CHECK(rawData);
|
||||||
|
|
||||||
|
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
|
||||||
|
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||||
|
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
|
||||||
|
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
|
||||||
|
|
||||||
|
STableMeta* pTableMeta = NULL;
|
||||||
|
processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn,
|
||||||
|
&pName, &pTableMeta, pSW, rawData, retry);
|
||||||
char err[ERR_MSG_LEN] = {0};
|
char err[ERR_MSG_LEN] = {0};
|
||||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
|
SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1993,11 +1975,8 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
||||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
code = pRequest->code;
|
code = pRequest->code;
|
||||||
|
|
||||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
|
||||||
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
|
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
|
||||||
if (retry++ >= 3) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
pQuery = NULL;
|
pQuery = NULL;
|
||||||
rspObj.resIter = -1;
|
rspObj.resIter = -1;
|
||||||
|
@ -2007,12 +1986,89 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||||
if (type == RES_TYPE__TMQ_METADATA){
|
|
||||||
tDeleteSTaosxRsp(&rspObj.dataRsp);
|
|
||||||
}else {
|
|
||||||
tDeleteMqDataRsp(&rspObj.dataRsp);
|
tDeleteMqDataRsp(&rspObj.dataRsp);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
destroyRequest(pRequest);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SQuery* pQuery = NULL;
|
||||||
|
SMqRspObj rspObj = {0};
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
SHashObj* pCreateTbHash = NULL;
|
||||||
|
|
||||||
|
SRequestObj* pRequest = NULL;
|
||||||
|
SCatalog* pCatalog = NULL;
|
||||||
|
SRequestConnInfo conn = {0};
|
||||||
|
|
||||||
|
uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
|
||||||
|
RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
|
||||||
|
RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
|
||||||
|
|
||||||
|
pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
RAW_NULL_CHECK(pCreateTbHash);
|
||||||
|
RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
|
||||||
|
|
||||||
|
SHashObj *pVgHash = NULL;
|
||||||
|
SHashObj *pNameHash = NULL;
|
||||||
|
SHashObj *pMetaHash = NULL;
|
||||||
|
RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
|
||||||
|
int retry = 0;
|
||||||
|
while(1){
|
||||||
|
RAW_RETURN_CHECK(smlInitHandle(&pQuery));
|
||||||
|
uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
|
||||||
|
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
|
||||||
|
if (!rspObj.dataRsp.withSchema) {
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
|
||||||
|
RAW_NULL_CHECK(tbName);
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||||
|
RAW_NULL_CHECK(pSW);
|
||||||
|
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
|
||||||
|
RAW_NULL_CHECK(pRetrieve);
|
||||||
|
void* rawData = getRawDataFromRes(pRetrieve);
|
||||||
|
RAW_NULL_CHECK(rawData);
|
||||||
|
|
||||||
|
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
|
||||||
|
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||||
|
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
|
||||||
|
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
|
||||||
|
|
||||||
|
// find schema data info
|
||||||
|
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
|
||||||
|
STableMeta* pTableMeta = NULL;
|
||||||
|
processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn,
|
||||||
|
&pName, &pTableMeta, pSW, rawData, retry);
|
||||||
|
char err[ERR_MSG_LEN] = {0};
|
||||||
|
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
|
||||||
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
|
code = pRequest->code;
|
||||||
|
|
||||||
|
if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
|
||||||
|
uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
pQuery = NULL;
|
||||||
|
rspObj.resIter = -1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
|
||||||
|
tDeleteSTaosxRsp(&rspObj.dataRsp);
|
||||||
void* pIter = taosHashIterate(pCreateTbHash, NULL);
|
void* pIter = taosHashIterate(pCreateTbHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
|
tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
|
||||||
|
@ -2243,8 +2299,10 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
|
||||||
return taosDropTable(taos, buf, len);
|
return taosDropTable(taos, buf, len);
|
||||||
} else if (type == TDMT_VND_DELETE) {
|
} else if (type == TDMT_VND_DELETE) {
|
||||||
return taosDeleteData(taos, buf, len);
|
return taosDeleteData(taos, buf, len);
|
||||||
} else if (type == RES_TYPE__TMQ_METADATA || type == RES_TYPE__TMQ) {
|
} else if (type == RES_TYPE__TMQ_METADATA){
|
||||||
return tmqWriteRawImpl(taos, type, buf, len);
|
return tmqWriteRawMetaDataImpl(taos, buf, len);
|
||||||
|
} else if (type == RES_TYPE__TMQ) {
|
||||||
|
return tmqWriteRawDataImpl(taos, buf, len);
|
||||||
} else if (type == RES_TYPE__TMQ_BATCH_META) {
|
} else if (type == RES_TYPE__TMQ_BATCH_META) {
|
||||||
return tmqWriteBatchMetaDataImpl(taos, buf, len);
|
return tmqWriteBatchMetaDataImpl(taos, buf, len);
|
||||||
}
|
}
|
||||||
|
@ -2252,7 +2310,8 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
||||||
if (!taos) {
|
if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
|
||||||
|
SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue