diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 206bc63d19..eb4eed8b6d 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -888,9 +888,6 @@ end: } static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { - if (taos == NULL || meta == NULL) { - return TSDB_CODE_INVALID_PARA; - } SVCreateStbReq req = {0}; SDecoder coder; SMCreateStbReq pReq = {0}; @@ -1001,9 +998,6 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { } static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { - if (taos == NULL || meta == NULL) { - return TSDB_CODE_INVALID_PARA; - } SVDropStbReq req = {0}; SDecoder coder = {0}; SMDropStbReq pReq = {0}; @@ -1113,9 +1107,6 @@ static void destroyCreateTbReqBatch(void* data) { } static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { - if (taos == NULL || meta == NULL) { - return TSDB_CODE_INVALID_PARA; - } SVCreateTbBatchReq req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1302,9 +1293,6 @@ static void destroyDropTbReqBatch(void* data) { } static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { - if (taos == NULL || meta == NULL) { - return TSDB_CODE_INVALID_PARA; - } SVDropTbBatchReq req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1417,9 +1405,6 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { - if (taos == NULL || meta == NULL) { - return TSDB_CODE_INVALID_PARA; - } SDeleteRes req = {0}; SDecoder coder = {0}; char sql[256] = {0}; @@ -1455,9 +1440,6 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { - if (taos == NULL || meta == NULL) { - return TSDB_CODE_INVALID_PARA; - } SVAlterTbReq req = {0}; SDecoder dcoder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1771,7 +1753,7 @@ static void freeRawCache(void *data) { static int32_t initRawCacheHash(){ if (writeRawCache == NULL){ - writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (writeRawCache == NULL){ return terrno; } @@ -1844,57 +1826,113 @@ end: return code; } -static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) { - if (taos == NULL || data == NULL) { - SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); - return TSDB_CODE_INVALID_PARA; +static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo *conn){ + int32_t code = 0; + RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, pRequest)); + (*pRequest)->syncQuery = true; + if (!(*pRequest)->pDb) { + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; } - int32_t code = TSDB_CODE_SUCCESS; - SQuery* pQuery = NULL; - SMqRspObj rspObj = {0}; - SDecoder decoder = {0}; - SHashObj* pCreateTbHash = NULL; - SRequestObj* pRequest = NULL; - RAW_RETURN_CHECK(createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0, &pRequest)); - uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); - pRequest->syncQuery = true; - rspObj.resIter = -1; -// rspObj.resType = RES_TYPE__TMQ_METADATA; + RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog)); + conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter; + conn->requestId = (*pRequest)->requestId; + conn->requestObjRefId = (*pRequest)->self; + conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp); +end: + return code; +} + +typedef int32_t _raw_decode_func_(SDecoder *pDecoder, SMqDataRsp *pRsp); +static int32_t decodeRawData(SDecoder *decoder, void* data, int32_t dataLen, _raw_decode_func_ func, SMqRspObj* rspObj){ int8_t dataVersion = *(int8_t*)data; if (dataVersion >= MQ_DATA_RSP_VERSION) { data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); dataLen -= sizeof(int8_t) + sizeof(int32_t); } - tDecoderInit(&decoder, data, dataLen); - code = (type == RES_TYPE__TMQ_METADATA) ? tDecodeSTaosxRsp(&decoder, &rspObj.dataRsp) : tDecodeMqDataRsp(&decoder, &rspObj.dataRsp); + rspObj->resIter = -1; + tDecoderInit(decoder, data, dataLen); + int32_t code = func(decoder, &rspObj->dataRsp); if (code != 0) { SET_ERROR_MSG("decode mq taosx data rsp failed"); - code = TSDB_CODE_INVALID_MSG; - goto end; + } + return code; +} + +static int32_t processCacheMeta(SHashObj *pVgHash, SHashObj *pNameHash, SHashObj *pMetaHash, SVCreateTbReq* pCreateReqDst, + SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName, + STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry){ + int32_t code = 0; + STableMeta* pTableMeta = NULL; + tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname)); + if (tmpInfo == NULL || retry > 0) { + tbInfo info = {0}; + + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo)); + if (pCreateReqDst && tmpInfo == NULL) { // change stable name to get meta + tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN); + } + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta)); + info.uid = pTableMeta->uid; + if (pTableMeta->tableType == TSDB_CHILD_TABLE){ + info.suid = pTableMeta->suid; + } else { + info.suid = pTableMeta->uid; + } + code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); + if (code != 0){ + taosMemoryFree(pTableMeta); + goto end; + } + if (pCreateReqDst) { + pTableMeta->vgId = info.vgInfo.vgId; + pTableMeta->uid = pCreateReqDst->uid; + pCreateReqDst->ctb.suid = pTableMeta->suid; + } + + RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo))); + tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname)); + RAW_RETURN_CHECK(taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo))); } - if (!pRequest->pDb) { - code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; - goto end; + if (pTableMeta == NULL || retry > 0){ + STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES); + if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) { + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta)); + code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); + if (code != 0){ + taosMemoryFree(pTableMeta); + goto end; + } + + }else{ + pTableMeta = *pTableMetaTmp; + pTableMeta->uid = tmpInfo->uid; + pTableMeta->vgId = tmpInfo->vgInfo.vgId; + } } + *pMeta = pTableMeta; - struct SCatalog* pCatalog = NULL; - RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); +end: + return code; +} - SRequestConnInfo conn = {0}; - conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; - conn.requestId = pRequest->requestId; - conn.requestObjRefId = pRequest->self; - conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); +static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen){ + int32_t code = TSDB_CODE_SUCCESS; + SQuery* pQuery = NULL; + SMqRspObj rspObj = {0}; + SDecoder decoder = {0}; - if (type == RES_TYPE__TMQ_METADATA) { - pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pCreateTbHash); - RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash)); - } + SRequestObj* pRequest = NULL; + SCatalog* pCatalog = NULL; + SRequestConnInfo conn = {0}; + + uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); + RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn)); + RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj)); SHashObj *pVgHash = NULL; SHashObj *pNameHash = NULL; @@ -1903,89 +1941,33 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da int retry = 0; while(1){ RAW_RETURN_CHECK(smlInitHandle(&pQuery)); - uDebug(LOG_ID_TAG " write raw data type:%d block num:%d", LOG_ID_VALUE, type, rspObj.dataRsp.blockNum); + uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum); while (++rspObj.resIter < rspObj.dataRsp.blockNum) { - void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); - RAW_NULL_CHECK(pRetrieve); if (!rspObj.dataRsp.withSchema) { goto end; } const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); RAW_NULL_CHECK(tbName); - - uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); - SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; - (void)strcpy(pName.dbname, pRequest->pDb); - (void)strcpy(pName.tname, tbName); - - // find schema data info - SVCreateTbReq* pCreateReqDst = NULL; - if (type == RES_TYPE__TMQ_METADATA){ - pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); - } - STableMeta* pTableMeta = NULL; - tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, tbName, strlen(tbName)); - if (tmpInfo == NULL || retry > 0) { - tbInfo info = {0}; - - RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &info.vgInfo)); - if (pCreateReqDst && tmpInfo == NULL) { // change stable name to get meta - (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); - } - RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); - info.uid = pTableMeta->uid; - if (pTableMeta->tableType == TSDB_CHILD_TABLE){ - info.suid = pTableMeta->suid; - } else { - info.suid = pTableMeta->uid; - } - code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); - if (code != 0){ - taosMemoryFree(pTableMeta); - goto end; - } - if (pCreateReqDst) { - pTableMeta->vgId = info.vgInfo.vgId; - pTableMeta->uid = pCreateReqDst->uid; - pCreateReqDst->ctb.suid = pTableMeta->suid; - } - - RAW_RETURN_CHECK(taosHashPut(pNameHash, pName.tname, strlen(pName.tname), &info, sizeof(tbInfo))); - tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName.tname, strlen(pName.tname)); -// code = (code == TSDB_CODE_DUP_KEY) ? 0 : code; -// RAW_RETURN_CHECK(code); - RAW_RETURN_CHECK(taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo))); -// code = (code == TSDB_CODE_DUP_KEY) ? 0 : code; -// RAW_RETURN_CHECK(code); - } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); RAW_NULL_CHECK(pSW); + void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); + RAW_NULL_CHECK(pRetrieve); void* rawData = getRawDataFromRes(pRetrieve); RAW_NULL_CHECK(rawData); - if (pTableMeta == NULL || retry > 0){ - STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES); - if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) { - RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); - code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); - if (code != 0){ - taosMemoryFree(pTableMeta); - goto end; - } - - }else{ - pTableMeta = *pTableMetaTmp; - pTableMeta->uid = tmpInfo->uid; - pTableMeta->vgId = tmpInfo->vgInfo.vgId; - } - } + uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN); + tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); + STableMeta* pTableMeta = NULL; + processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, + &pName, &pTableMeta, pSW, rawData, retry); char err[ERR_MSG_LEN] = {0}; - code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); + code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("table:%s, err:%s", tbName, err); + SET_ERROR_MSG("table:%s, err:%s", pName.tname, err); goto end; } } @@ -1993,11 +1975,8 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; - if (NEED_CLIENT_HANDLE_ERROR(code)) { + if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) { uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code)); - if (retry++ >= 3) { - break; - } qDestroyQuery(pQuery); pQuery = NULL; rspObj.resIter = -1; @@ -2007,12 +1986,89 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da } end: - uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); - if (type == RES_TYPE__TMQ_METADATA){ - tDeleteSTaosxRsp(&rspObj.dataRsp); - }else { - tDeleteMqDataRsp(&rspObj.dataRsp); + uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code)); + tDeleteMqDataRsp(&rspObj.dataRsp); + tDecoderClear(&decoder); + qDestroyQuery(pQuery); + destroyRequest(pRequest); + return code; +} + +static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { + int32_t code = TSDB_CODE_SUCCESS; + SQuery* pQuery = NULL; + SMqRspObj rspObj = {0}; + SDecoder decoder = {0}; + SHashObj* pCreateTbHash = NULL; + + SRequestObj* pRequest = NULL; + SCatalog* pCatalog = NULL; + SRequestConnInfo conn = {0}; + + uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); + RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn)); + RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj)); + + pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pCreateTbHash); + RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash)); + + SHashObj *pVgHash = NULL; + SHashObj *pNameHash = NULL; + SHashObj *pMetaHash = NULL; + RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos)); + int retry = 0; + while(1){ + RAW_RETURN_CHECK(smlInitHandle(&pQuery)); + uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum); + while (++rspObj.resIter < rspObj.dataRsp.blockNum) { + if (!rspObj.dataRsp.withSchema) { + goto end; + } + + const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); + RAW_NULL_CHECK(tbName); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); + RAW_NULL_CHECK(pSW); + void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); + RAW_NULL_CHECK(pRetrieve); + void* rawData = getRawDataFromRes(pRetrieve); + RAW_NULL_CHECK(rawData); + + uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN); + tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); + + // find schema data info + SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname)); + STableMeta* pTableMeta = NULL; + processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, + &pName, &pTableMeta, pSW, rawData, retry); + char err[ERR_MSG_LEN] = {0}; + code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); + if (code != TSDB_CODE_SUCCESS) { + SET_ERROR_MSG("table:%s, err:%s", pName.tname, err); + goto end; + } + } + RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + + if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) { + uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code)); + qDestroyQuery(pQuery); + pQuery = NULL; + rspObj.resIter = -1; + continue; + } + break; } + +end: + uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); + tDeleteSTaosxRsp(&rspObj.dataRsp); void* pIter = taosHashIterate(pCreateTbHash, NULL); while (pIter) { tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE); @@ -2243,8 +2299,10 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) return taosDropTable(taos, buf, len); } else if (type == TDMT_VND_DELETE) { return taosDeleteData(taos, buf, len); - } else if (type == RES_TYPE__TMQ_METADATA || type == RES_TYPE__TMQ) { - return tmqWriteRawImpl(taos, type, buf, len); + } else if (type == RES_TYPE__TMQ_METADATA){ + return tmqWriteRawMetaDataImpl(taos, buf, len); + } else if (type == RES_TYPE__TMQ) { + return tmqWriteRawDataImpl(taos, buf, len); } else if (type == RES_TYPE__TMQ_BATCH_META) { return tmqWriteBatchMetaDataImpl(taos, buf, len); } @@ -2252,7 +2310,8 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) } int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { - if (!taos) { + if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) { + SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw); return TSDB_CODE_INVALID_PARA; }