fix valgrind error
This commit is contained in:
parent
70e806b0d2
commit
8c050431d1
|
@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
|
||||
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
|
||||
&pRequest->body.queryJob);
|
||||
pCxt = NULL;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
|
@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
|||
_error:
|
||||
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
taosMemoryFree(pCxt);
|
||||
|
||||
terrno = code;
|
||||
pRequest->code = code;
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
|
|
|
@ -2196,7 +2196,7 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
|
|||
cJSON* tvalue = NULL;
|
||||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||
char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
|
||||
if(!buf) goto end;
|
||||
if (!buf) goto end;
|
||||
dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
|
||||
tvalue = cJSON_CreateString(buf);
|
||||
taosMemoryFree(buf);
|
||||
|
@ -2506,7 +2506,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
|
||||
launchQueryImpl(pRequest, &pQuery, true, NULL);
|
||||
|
||||
if(pRequest->code == TSDB_CODE_SUCCESS){
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||
SCatalog* pCatalog = NULL;
|
||||
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
catalogRemoveTableMeta(pCatalog, &tableName);
|
||||
|
@ -2575,7 +2575,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
|
||||
launchQueryImpl(pRequest, &pQuery, true, NULL);
|
||||
|
||||
if(pRequest->code == TSDB_CODE_SUCCESS){
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||
SCatalog* pCatalog = NULL;
|
||||
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
catalogRemoveTableMeta(pCatalog, &tableName);
|
||||
|
@ -2695,7 +2695,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
}
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS){
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||
removeMeta(pTscObj, pRequest->tableList);
|
||||
}
|
||||
|
||||
|
@ -2812,7 +2812,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
}
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS){
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||
removeMeta(pTscObj, pRequest->tableList);
|
||||
}
|
||||
code = pRequest->code;
|
||||
|
@ -2827,7 +2827,7 @@ end:
|
|||
|
||||
// delete from db.tabl where .. -> delete from tabl where ..
|
||||
// delete from db .tabl where .. -> delete from tabl where ..
|
||||
//static void getTbName(char *sql){
|
||||
// static void getTbName(char *sql){
|
||||
// char *ch = sql;
|
||||
//
|
||||
// bool inBackQuote = false;
|
||||
|
@ -2871,13 +2871,14 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
goto end;
|
||||
}
|
||||
|
||||
// getTbName(req.tableFName);
|
||||
// getTbName(req.tableFName);
|
||||
char sql[256] = {0};
|
||||
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey);
|
||||
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName,
|
||||
req.skey, req.tsColName, req.ekey);
|
||||
printf("delete sql:%s\n", sql);
|
||||
|
||||
TAOS_RES* res = taos_query(taos, sql);
|
||||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
SRequestObj* pRequest = (SRequestObj*)res;
|
||||
code = pRequest->code;
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
|
@ -2985,9 +2986,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
|||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if(pRequest->code == TSDB_CODE_SUCCESS){
|
||||
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||
SExecResult* pRes = &pRequest->body.resInfo.execRes;
|
||||
if(pRes->res != NULL){
|
||||
if (pRes->res != NULL) {
|
||||
code = handleAlterTbExecRes(pRes->res, pCatalog);
|
||||
}
|
||||
}
|
||||
|
@ -3001,23 +3002,23 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct{
|
||||
typedef struct {
|
||||
SVgroupInfo vg;
|
||||
void *data;
|
||||
}VgData;
|
||||
void* data;
|
||||
} VgData;
|
||||
|
||||
static void destroyVgHash(void* data) {
|
||||
VgData* vgData = (VgData*)data;
|
||||
taosMemoryFreeClear(vgData->data);
|
||||
}
|
||||
|
||||
int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
||||
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SQuery *pQuery = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
||||
if(!pRequest){
|
||||
if (!pRequest) {
|
||||
uError("WriteRaw:createRequest error request is null");
|
||||
code = terrno;
|
||||
goto end;
|
||||
|
@ -3033,9 +3034,9 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
|||
strcpy(pName.dbname, pRequest->pDb);
|
||||
strcpy(pName.tname, tbname);
|
||||
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw: get gatlog error");
|
||||
goto end;
|
||||
}
|
||||
|
@ -3066,11 +3067,11 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
|||
int32_t rowSize = 0;
|
||||
int16_t nVar = 0;
|
||||
for (int i = 0; i < numOfCols; i++) {
|
||||
SSchema *schema = pTableMeta->schema + i;
|
||||
SSchema* schema = pTableMeta->schema + i;
|
||||
fLen += TYPE_BYTES[schema->type];
|
||||
rowSize += schema->bytes;
|
||||
if(IS_VAR_DATA_TYPE(schema->type)){
|
||||
nVar ++;
|
||||
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||
nVar++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3094,7 +3095,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
|||
int32_t* colLength = (int32_t*)pStart;
|
||||
pStart += sizeof(int32_t) * numOfCols;
|
||||
|
||||
SResultColumn *pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
|
||||
SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
|
||||
|
@ -3159,17 +3160,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
|||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->haveResultSet = false;
|
||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (NULL == pQuery->pRoot) {
|
||||
uError("create pQuery->pRoot error");
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
|
||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
||||
|
||||
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == dst) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
|
@ -3195,16 +3196,16 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SHashObj *pVgHash = NULL;
|
||||
SQuery *pQuery = NULL;
|
||||
SHashObj* pVgHash = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
SMqRspObj rspObj = {0};
|
||||
SDecoder decoder = {0};
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
||||
if(!pRequest){
|
||||
if (!pRequest) {
|
||||
uError("WriteRaw:createRequest error request is null");
|
||||
return terrno;
|
||||
}
|
||||
|
@ -3214,7 +3215,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
|
||||
tDecoderInit(&decoder, data, dataLen);
|
||||
code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
|
||||
if (code != 0){
|
||||
if (code != 0) {
|
||||
uError("WriteRaw:decode smqDataRsp error");
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto end;
|
||||
|
@ -3228,9 +3229,9 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
|
||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw: get gatlog error");
|
||||
goto end;
|
||||
}
|
||||
|
@ -3252,7 +3253,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
|
||||
|
||||
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("WriteRaw: setQueryResultFromRsp error");
|
||||
goto end;
|
||||
}
|
||||
|
@ -3261,11 +3262,11 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
int32_t rowSize = 0;
|
||||
int16_t nVar = 0;
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
SSchema *schema = pSW->pSchema + i;
|
||||
SSchema* schema = pSW->pSchema + i;
|
||||
fLen += TYPE_BYTES[schema->type];
|
||||
rowSize += schema->bytes;
|
||||
if(IS_VAR_DATA_TYPE(schema->type)){
|
||||
nVar ++;
|
||||
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||
nVar++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3276,7 +3277,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||
|
||||
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
||||
if(!tbName){
|
||||
if (!tbName) {
|
||||
uError("WriteRaw: tbname is null");
|
||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
goto end;
|
||||
|
@ -3296,12 +3297,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
|
||||
SSubmitReq* subReq = NULL;
|
||||
SSubmitBlk* blk = NULL;
|
||||
void *hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
||||
if(hData){
|
||||
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
||||
if (hData) {
|
||||
vgData = *(VgData*)hData;
|
||||
|
||||
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
||||
void *tmp = taosMemoryRealloc(vgData.data, totalLen);
|
||||
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
|
||||
if (tmp == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
|
@ -3310,15 +3311,15 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
((VgData*)hData)->data = tmp;
|
||||
subReq = (SSubmitReq*)(vgData.data);
|
||||
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
||||
}else{
|
||||
} else {
|
||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||
void *tmp = taosMemoryCalloc(1, totalLen);
|
||||
void* tmp = taosMemoryCalloc(1, totalLen);
|
||||
if (tmp == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
vgData.data = tmp;
|
||||
taosHashPut(pVgHash, (const char *)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char *)&vgData, sizeof(vgData));
|
||||
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
|
||||
subReq = (SSubmitReq*)(vgData.data);
|
||||
subReq->length = sizeof(SSubmitReq);
|
||||
subReq->numOfBlocks = 0;
|
||||
|
@ -3353,11 +3354,11 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
int32_t offset = 0;
|
||||
for (int32_t k = 0; k < pSW->nCols; k++) {
|
||||
const SSchema* pColumn = &pSW->pSchema[k];
|
||||
char *data = rspObj.resInfo.row[k];
|
||||
char* data = rspObj.resInfo.row[k];
|
||||
if (!data) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||
} else {
|
||||
if(IS_VAR_DATA_TYPE(pColumn->type)){
|
||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||
data -= VARSTR_HEADER_SIZE;
|
||||
}
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
|
||||
|
@ -3389,21 +3390,21 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||
pQuery->haveResultSet = false;
|
||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (NULL == pQuery->pRoot) {
|
||||
uError("create pQuery->pRoot error");
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
|
||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||
|
||||
VgData *vData = (VgData *)taosHashIterate(pVgHash, NULL);
|
||||
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
|
||||
while (vData) {
|
||||
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == dst) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
|
@ -3420,7 +3421,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
|||
subReq->length = htonl(subReq->length);
|
||||
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
||||
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
||||
vData = (VgData *)taosHashIterate(pVgHash, vData);
|
||||
vData = (VgData*)taosHashIterate(pVgHash, vData);
|
||||
}
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
|
@ -3459,8 +3460,8 @@ char* tmq_get_json_meta(TAOS_RES* res) {
|
|||
|
||||
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
|
||||
|
||||
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
||||
if (!raw || !res){
|
||||
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
||||
if (!raw || !res) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
if (TD_RES_TMQ_META(res)) {
|
||||
|
@ -3468,8 +3469,8 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
|||
raw->raw = pMetaRspObj->metaRsp.metaRsp;
|
||||
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
|
||||
raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
|
||||
} else if(TD_RES_TMQ(res)){
|
||||
SMqRspObj *rspObj = ((SMqRspObj*)res);
|
||||
} else if (TD_RES_TMQ(res)) {
|
||||
SMqRspObj* rspObj = ((SMqRspObj*)res);
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
|
@ -3478,7 +3479,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
void *buf = taosMemoryCalloc(1, len);
|
||||
void* buf = taosMemoryCalloc(1, len);
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
|
||||
|
@ -3494,31 +3495,31 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
|||
}
|
||||
|
||||
void tmq_free_raw(tmq_raw_data raw) {
|
||||
if (raw.raw_type == RES_TYPE__TMQ){
|
||||
if (raw.raw_type == RES_TYPE__TMQ) {
|
||||
taosMemoryFree(raw.raw);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw){
|
||||
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
||||
if (!taos) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if(raw.raw_type == TDMT_VND_CREATE_STB) {
|
||||
if (raw.raw_type == TDMT_VND_CREATE_STB) {
|
||||
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
||||
}else if(raw.raw_type == TDMT_VND_ALTER_STB){
|
||||
} else if (raw.raw_type == TDMT_VND_ALTER_STB) {
|
||||
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
||||
}else if(raw.raw_type == TDMT_VND_DROP_STB){
|
||||
} else if (raw.raw_type == TDMT_VND_DROP_STB) {
|
||||
return taosDropStb(taos, raw.raw, raw.raw_len);
|
||||
}else if(raw.raw_type == TDMT_VND_CREATE_TABLE){
|
||||
} else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
|
||||
return taosCreateTable(taos, raw.raw, raw.raw_len);
|
||||
}else if(raw.raw_type == TDMT_VND_ALTER_TABLE){
|
||||
} else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
|
||||
return taosAlterTable(taos, raw.raw, raw.raw_len);
|
||||
}else if(raw.raw_type == TDMT_VND_DROP_TABLE) {
|
||||
} else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
|
||||
return taosDropTable(taos, raw.raw, raw.raw_len);
|
||||
}else if(raw.raw_type == TDMT_VND_DELETE){
|
||||
} else if (raw.raw_type == TDMT_VND_DELETE) {
|
||||
return taosDeleteData(taos, raw.raw, raw.raw_len);
|
||||
}else if(raw.raw_type == RES_TYPE__TMQ){
|
||||
} else if (raw.raw_type == RES_TYPE__TMQ) {
|
||||
return tmqWriteRaw(taos, raw.raw, raw.raw_len);
|
||||
}
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
|
|
@ -13,14 +13,14 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "trpc.h"
|
||||
#include "query.h"
|
||||
#include "tname.h"
|
||||
#include "catalogInt.h"
|
||||
#include "query.h"
|
||||
#include "systable.h"
|
||||
#include "tname.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
|
||||
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SArray* pTaskId = cbParam->taskId;
|
||||
SCatalog* pCtg = pJob->pCtg;
|
||||
|
@ -30,7 +30,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
|||
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
|
||||
ASSERT(taskNum == msgNum || 0 == msgNum);
|
||||
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1));
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
|
||||
TMSG_INFO(cbParam->reqType + 1));
|
||||
|
||||
offset += sizeof(msgNum);
|
||||
SBatchRsp rsp = {0};
|
||||
|
@ -42,7 +43,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
|||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
int32_t* taskId = taosArrayGet(pTaskId, i);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
if (msgNum > 0) {
|
||||
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
|
||||
offset += sizeof(rsp.reqType);
|
||||
|
@ -65,7 +66,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
|
|||
|
||||
pTask->pBatchs = pBatchs;
|
||||
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1));
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
|
||||
TMSG_INFO(taskMsg.msgType + 1));
|
||||
|
||||
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
|
||||
}
|
||||
|
@ -78,7 +80,6 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -303,8 +304,7 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
|
||||
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
|
||||
int32_t code = 0;
|
||||
SCtgJob* pJob = NULL;
|
||||
|
@ -322,13 +322,15 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
|||
if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
|
||||
CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
|
||||
} else {
|
||||
int32_t *taskId = taosArrayGet(cbParam->taskId, 0);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
int32_t* taskId = taosArrayGet(cbParam->taskId, 0);
|
||||
SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
|
||||
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
|
||||
TMSG_INFO(cbParam->reqType + 1));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
SHashObj* pBatchs =
|
||||
taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pBatchs) {
|
||||
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
@ -354,16 +356,16 @@ _return:
|
|||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
|
||||
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType,
|
||||
SMsgSendInfo** pMsgSendInfo) {
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == msgSendInfo) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
|
||||
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
|
||||
SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
|
||||
if (NULL == param) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
|
||||
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -391,10 +393,10 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId,
|
||||
int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
|
||||
char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo *pMsgSendInfo = NULL;
|
||||
SMsgSendInfo* pMsgSendInfo = NULL;
|
||||
CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo));
|
||||
|
||||
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
|
||||
|
@ -426,7 +428,8 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTask* pTask, int32_t msgType, void* msg,
|
||||
uint32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
SHashObj* pBatchs = pTask->pBatchs;
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
|
@ -475,7 +478,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
|
|||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId);
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
|
||||
vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -504,7 +508,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgT
|
|||
}
|
||||
}
|
||||
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId);
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
|
||||
vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -517,14 +522,14 @@ _return:
|
|||
}
|
||||
|
||||
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
|
||||
*msg = taosMemoryMalloc(pBatch->msgSize);
|
||||
*msg = taosMemoryCalloc(1, pBatch->msgSize);
|
||||
if (NULL == (*msg)) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
int32_t offset = 0;
|
||||
int32_t num = taosArrayGetSize(pBatch->pMsgs);
|
||||
SBatchReq *pBatchReq = (SBatchReq*)(*msg);
|
||||
SBatchReq* pBatchReq = (SBatchReq*)(*msg);
|
||||
|
||||
pBatchReq->header.vgId = htonl(vgId);
|
||||
pBatchReq->msgNum = htonl(num);
|
||||
|
@ -547,7 +552,7 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) {
|
||||
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
|
||||
int32_t code = 0;
|
||||
void* msg = NULL;
|
||||
void* p = taosHashIterate(pBatchs, NULL);
|
||||
|
@ -559,8 +564,8 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) {
|
|||
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
|
||||
|
||||
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
|
||||
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId,
|
||||
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
|
||||
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId,
|
||||
pBatch->msgType, msg, pBatch->msgSize);
|
||||
pBatch->pTaskIds = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
|
@ -579,12 +584,11 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_QNODE_LIST;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
|
||||
|
||||
|
@ -630,11 +634,11 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_DNODE_LIST;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
|
||||
|
||||
|
@ -676,12 +680,12 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
|
||||
SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_USE_DB;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
|
||||
|
||||
|
@ -727,15 +731,16 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
|
||||
SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_GET_DB_CFG;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp);
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -777,15 +782,16 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
|
||||
SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_GET_INDEX;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get index from mnode, indexName:%s", indexName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp);
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -827,17 +833,18 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
|
||||
SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(name, tbFName);
|
||||
|
||||
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -880,15 +887,16 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
|
||||
SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp);
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -930,15 +938,16 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
|
||||
SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_GET_USER_AUTH;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get user auth from mnode, user:%s", user);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp);
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
|
||||
CTG_ERR_RET(code);
|
||||
|
@ -980,16 +989,16 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) {
|
||||
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
|
||||
STableMetaOutput* out, SCtgTask* pTask) {
|
||||
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
|
||||
char *msg = NULL;
|
||||
SEpSet *pVnodeEpSet = NULL;
|
||||
char* msg = NULL;
|
||||
SEpSet* pVnodeEpSet = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_TABLE_META;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
sprintf(tbFName, "%s.%s", dbFName, tbName);
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
|
||||
|
||||
|
@ -1034,27 +1043,30 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask) {
|
||||
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
|
||||
SCtgTask* pTask) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char *)pTableName->tname, out, pTask);
|
||||
return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, pTask);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask) {
|
||||
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
|
||||
STableMetaOutput* out, SCtgTask* pTask) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
int32_t reqType = TDMT_VND_TABLE_META;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
|
||||
ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s",
|
||||
vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||
ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
|
||||
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||
|
||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
|
||||
char *msg = NULL;
|
||||
SBuildTableInput bInput = {
|
||||
.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
|
@ -1107,20 +1119,21 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
|
||||
SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_VND_TABLE_CFG;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFName);
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
||||
|
||||
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
|
||||
ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s",
|
||||
vgroupInfo->vgId, vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||
ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
|
||||
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
|
@ -1167,14 +1180,14 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
|
||||
SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_TABLE_CFG;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFName);
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
|
||||
|
@ -1218,11 +1231,11 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
|
||||
char* msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_SERVER_VERSION;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
|
||||
qDebug("try to get svr ver from mnode");
|
||||
|
||||
|
@ -1263,5 +1276,3 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue