refactor(tmq)

This commit is contained in:
Liu Jicong 2022-09-02 22:09:58 +08:00
parent 004a84ac81
commit b050d1c33a
2 changed files with 20 additions and 19 deletions

View File

@ -766,22 +766,23 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosArrayPush(pRequest->tableList, &pName); taosArrayPush(pRequest->tableList, &pName);
// change tag cid to new cid // change tag cid to new cid
if(pCreateReq->type == TSDB_CHILD_TABLE){ if (pCreateReq->type == TSDB_CHILD_TABLE) {
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SName sName = {0}; SName sName = {0};
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.name, &sName); toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.name, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if(code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.name); uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.name);
goto end; goto end;
} }
for(int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++){ for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
char* tName = taosArrayGet(pCreateReq->ctb.tagName, i); char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
for(int32_t j = pTableMeta->tableInfo.numOfColumns; j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++){ for (int32_t j = pTableMeta->tableInfo.numOfColumns;
SSchema *tag = &pTableMeta->schema[j]; j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
if(strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON){ SSchema* tag = &pTableMeta->schema[j];
tTagSetCid((STag *)pCreateReq->ctb.pTag, i, tag->colId); if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
tTagSetCid((STag*)pCreateReq->ctb.pTag, i, tag->colId);
} }
} }
} }
@ -1322,12 +1323,12 @@ end:
return code; return code;
} }
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) { static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL; SHashObj* pVgHash = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
SMqRspObj rspObj = {0}; SMqRspObj rspObj = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
@ -1405,7 +1406,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
} }
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST){ if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
continue; continue;
@ -1466,7 +1467,7 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
} }
// pSW->pSchema should be same as pTableMeta->schema // pSW->pSchema should be same as pTableMeta->schema
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns); // ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid; uint64_t uid = pTableMeta->uid;
int16_t sver = pTableMeta->sversion; int16_t sver = pTableMeta->sversion;
@ -1494,10 +1495,10 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
int32_t offset = 0; int32_t offset = 0;
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) { for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
const SSchema* pColumn = &pTableMeta->schema[k]; const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name)); int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if(!index){ if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}else{ } else {
char* colData = rspObj.resInfo.row[*index]; char* colData = rspObj.resInfo.row[*index];
if (!colData) { if (!colData) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
@ -1668,7 +1669,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
} else if (raw.raw_type == TDMT_VND_DELETE) { } else if (raw.raw_type == TDMT_VND_DELETE) {
return taosDeleteData(taos, raw.raw, raw.raw_len); return taosDeleteData(taos, raw.raw, raw.raw_len);
} else if (raw.raw_type == RES_TYPE__TMQ) { } else if (raw.raw_type == RES_TYPE__TMQ) {
return tmqWriteRaw(taos, raw.raw, raw.raw_len); return tmqWriteRawDataImpl(taos, raw.raw, raw.raw_len);
} }
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }