Merge pull request #15451 from taosdata/feature/catalogBatchFetch
feat: support vnode meta batch fetching
This commit is contained in:
commit
08223c1153
|
@ -3051,6 +3051,34 @@ typedef struct SDeleteRes {
|
|||
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
|
||||
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
|
||||
|
||||
typedef struct {
|
||||
int32_t msgType;
|
||||
int32_t msgLen;
|
||||
void* msg;
|
||||
} SBatchMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
int32_t msgNum;
|
||||
SBatchMsg msg[];
|
||||
} SBatchReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t reqType;
|
||||
int32_t msgLen;
|
||||
int32_t rspCode;
|
||||
void* msg;
|
||||
} SBatchRsp;
|
||||
|
||||
static FORCE_INLINE void tFreeSBatchRsp(void *p) {
|
||||
if (NULL == p) {
|
||||
return;
|
||||
}
|
||||
|
||||
SBatchRsp* pRsp = (SBatchRsp*)p;
|
||||
taosMemoryFree(pRsp->msg);
|
||||
}
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -136,6 +136,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
|
||||
|
@ -180,6 +181,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_CFG, "vnode-table-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_META, "vnode-batch-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
|
||||
|
|
|
@ -184,6 +184,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_BATCH_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -343,6 +343,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -63,6 +63,106 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
int32_t offset = 0;
|
||||
int32_t rspSize = 0;
|
||||
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
|
||||
int32_t msgNum = ntohl(batchReq->msgNum);
|
||||
offset += sizeof(SBatchReq);
|
||||
SBatchMsg req = {0};
|
||||
SBatchRsp rsp = {0};
|
||||
SRpcMsg reqMsg = *pMsg;
|
||||
SRpcMsg rspMsg = {0};
|
||||
void* pRsp = NULL;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
||||
SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
if (NULL == batchRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < msgNum; ++i) {
|
||||
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgType);
|
||||
|
||||
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgLen);
|
||||
|
||||
req.msg = (char*)pMsg->pCont + offset;
|
||||
offset += req.msgLen;
|
||||
|
||||
reqMsg.msgType = req.msgType;
|
||||
reqMsg.pCont = req.msg;
|
||||
reqMsg.contLen = req.msgLen;
|
||||
reqMsg.info.rsp = NULL;
|
||||
reqMsg.info.rspLen = 0;
|
||||
|
||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req.msgType)];
|
||||
if (fp == NULL) {
|
||||
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((*fp)(&reqMsg)) {
|
||||
rsp.rspCode = terrno;
|
||||
} else {
|
||||
rsp.rspCode = 0;
|
||||
}
|
||||
rsp.reqType = reqMsg.msgType;
|
||||
rsp.msgLen = reqMsg.info.rspLen;
|
||||
rsp.msg = reqMsg.info.rsp;
|
||||
|
||||
taosArrayPush(batchRsp, &rsp);
|
||||
|
||||
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
|
||||
}
|
||||
|
||||
rspSize += sizeof(int32_t);
|
||||
offset = 0;
|
||||
|
||||
pRsp = rpcMallocCont(rspSize);
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(msgNum);
|
||||
offset += sizeof(msgNum);
|
||||
for (int32_t i = 0; i < msgNum; ++i) {
|
||||
SBatchRsp *p = taosArrayGet(batchRsp, i);
|
||||
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
|
||||
offset += sizeof(p->reqType);
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
|
||||
offset += sizeof(p->msgLen);
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
|
||||
offset += sizeof(p->rspCode);
|
||||
memcpy((char*)pRsp + offset, p->msg, p->msgLen);
|
||||
offset += p->msgLen;
|
||||
|
||||
rpcFreeCont(p->msg);
|
||||
}
|
||||
|
||||
taosArrayDestroy(batchRsp);
|
||||
batchRsp = NULL;
|
||||
|
||||
_exit:
|
||||
|
||||
pMsg->info.rsp = pRsp;
|
||||
pMsg->info.rspLen = rspSize;
|
||||
|
||||
if (code) {
|
||||
mError("mnd get batch meta failed cause of %s", tstrerror(code));
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndInitQuery(SMnode *pMnode) {
|
||||
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
|
||||
mError("failed to init qworker in mnode since %s", terrstr());
|
||||
|
@ -76,6 +176,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -78,8 +78,9 @@ void vnodeBufPoolReset(SVBufPool* pPool);
|
|||
// vnodeQuery.c
|
||||
int32_t vnodeQueryOpen(SVnode* pVnode);
|
||||
void vnodeQueryClose(SVnode* pVnode);
|
||||
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
|
||||
// vnodeCommit.c
|
||||
int32_t vnodeBegin(SVnode* pVnode);
|
||||
|
|
|
@ -21,7 +21,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
|
|||
|
||||
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
|
||||
|
||||
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
||||
STableInfoReq infoReq = {0};
|
||||
STableMetaRsp metaRsp = {0};
|
||||
SMetaReader mer1 = {0};
|
||||
|
@ -99,7 +99,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
pRsp = rpcMallocCont(rspLen);
|
||||
if (direct) {
|
||||
pRsp = rpcMallocCont(rspLen);
|
||||
} else {
|
||||
pRsp = taosMemoryCalloc(1, rspLen);
|
||||
}
|
||||
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
|
@ -117,15 +122,19 @@ _exit:
|
|||
qError("get table %s meta failed cause of %s", infoReq.tbName, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
|
||||
if (direct) {
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
} else {
|
||||
*pMsg = rpcMsg;
|
||||
}
|
||||
|
||||
taosMemoryFree(metaRsp.pSchemas);
|
||||
metaReaderClear(&mer2);
|
||||
metaReaderClear(&mer1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
||||
STableCfgReq cfgReq = {0};
|
||||
STableCfgRsp cfgRsp = {0};
|
||||
SMetaReader mer1 = {0};
|
||||
|
@ -209,7 +218,12 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
pRsp = rpcMallocCont(rspLen);
|
||||
if (direct) {
|
||||
pRsp = rpcMallocCont(rspLen);
|
||||
} else {
|
||||
pRsp = taosMemoryCalloc(1, rspLen);
|
||||
}
|
||||
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
|
@ -227,14 +241,124 @@ _exit:
|
|||
qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
|
||||
if (direct) {
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
} else {
|
||||
*pMsg = rpcMsg;
|
||||
}
|
||||
|
||||
tFreeSTableCfgRsp(&cfgRsp);
|
||||
metaReaderClear(&mer2);
|
||||
metaReaderClear(&mer1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
int32_t offset = 0;
|
||||
int32_t rspSize = 0;
|
||||
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
|
||||
int32_t msgNum = ntohl(batchReq->msgNum);
|
||||
offset += sizeof(SBatchReq);
|
||||
SBatchMsg req = {0};
|
||||
SBatchRsp rsp = {0};
|
||||
SRpcMsg reqMsg = *pMsg;
|
||||
SRpcMsg rspMsg = {0};
|
||||
void* pRsp = NULL;
|
||||
|
||||
SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
if (NULL == batchRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < msgNum; ++i) {
|
||||
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgType);
|
||||
|
||||
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
|
||||
offset += sizeof(req.msgLen);
|
||||
|
||||
req.msg = (char*)pMsg->pCont + offset;
|
||||
offset += req.msgLen;
|
||||
|
||||
reqMsg.msgType = req.msgType;
|
||||
reqMsg.pCont = req.msg;
|
||||
reqMsg.contLen = req.msgLen;
|
||||
|
||||
switch (req.msgType) {
|
||||
case TDMT_VND_TABLE_META:
|
||||
vnodeGetTableMeta(pVnode, &reqMsg, false);
|
||||
break;
|
||||
case TDMT_VND_TABLE_CFG:
|
||||
vnodeGetTableCfg(pVnode, &reqMsg, false);
|
||||
break;
|
||||
default:
|
||||
qError("invalid req msgType %d", req.msgType);
|
||||
reqMsg.code = TSDB_CODE_INVALID_MSG;
|
||||
reqMsg.pCont = NULL;
|
||||
reqMsg.contLen = 0;
|
||||
break;
|
||||
}
|
||||
|
||||
rsp.reqType = reqMsg.msgType;
|
||||
rsp.msgLen = reqMsg.contLen;
|
||||
rsp.rspCode = reqMsg.code;
|
||||
rsp.msg = reqMsg.pCont;
|
||||
|
||||
taosArrayPush(batchRsp, &rsp);
|
||||
|
||||
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
|
||||
}
|
||||
|
||||
rspSize += sizeof(int32_t);
|
||||
offset = 0;
|
||||
|
||||
pRsp = rpcMallocCont(rspSize);
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(msgNum);
|
||||
offset += sizeof(msgNum);
|
||||
for (int32_t i = 0; i < msgNum; ++i) {
|
||||
SBatchRsp *p = taosArrayGet(batchRsp, i);
|
||||
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
|
||||
offset += sizeof(p->reqType);
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
|
||||
offset += sizeof(p->msgLen);
|
||||
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
|
||||
offset += sizeof(p->rspCode);
|
||||
memcpy((char*)pRsp + offset, p->msg, p->msgLen);
|
||||
offset += p->msgLen;
|
||||
|
||||
taosMemoryFreeClear(p->msg);
|
||||
}
|
||||
|
||||
taosArrayDestroy(batchRsp);
|
||||
batchRsp = NULL;
|
||||
|
||||
_exit:
|
||||
|
||||
rspMsg.info = pMsg->info;
|
||||
rspMsg.pCont = pRsp;
|
||||
rspMsg.contLen = rspSize;
|
||||
rspMsg.code = code;
|
||||
rspMsg.msgType = pMsg->msgType;
|
||||
|
||||
if (code) {
|
||||
qError("vnd get batch meta failed cause of %s", tstrerror(code));
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
|
||||
|
||||
tmsgSendRsp(&rspMsg);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||
pLoad->vgId = TD_VID(pVnode);
|
||||
pLoad->syncState = syncGetMyRole(pVnode->sync);
|
||||
|
|
|
@ -298,7 +298,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("message in fetch queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
|
||||
pMsg->msgType == TDMT_VND_TABLE_CFG) &&
|
||||
pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!vnodeIsLeader(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
|
@ -320,9 +320,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
case TDMT_SCH_QUERY_HEARTBEAT:
|
||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_VND_TABLE_META:
|
||||
return vnodeGetTableMeta(pVnode, pMsg);
|
||||
return vnodeGetTableMeta(pVnode, pMsg, true);
|
||||
case TDMT_VND_TABLE_CFG:
|
||||
return vnodeGetTableCfg(pVnode, pMsg);
|
||||
return vnodeGetTableCfg(pVnode, pMsg, true);
|
||||
case TDMT_VND_BATCH_META:
|
||||
return vnodeGetBatchMeta(pVnode, pMsg);
|
||||
case TDMT_VND_CONSUME:
|
||||
return tqProcessPollReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_RUN:
|
||||
|
|
|
@ -31,6 +31,7 @@ extern "C" {
|
|||
#define CTG_DEFAULT_RENT_SECOND 10
|
||||
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
|
||||
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
|
||||
#define CTG_DEFAULT_BATCH_NUM 64
|
||||
|
||||
#define CTG_RENT_SLOT_SECOND 1.5
|
||||
|
||||
|
@ -38,6 +39,8 @@ extern "C" {
|
|||
|
||||
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_PAR_TABLE_NOT_EXIST
|
||||
|
||||
#define CTG_BATCH_FETCH 1
|
||||
|
||||
enum {
|
||||
CTG_READ = 1,
|
||||
CTG_WRITE,
|
||||
|
@ -200,8 +203,20 @@ typedef struct SCatalog {
|
|||
SCtgRentMgmt stbRent;
|
||||
} SCatalog;
|
||||
|
||||
typedef struct SCtgBatch {
|
||||
int32_t batchId;
|
||||
int32_t msgType;
|
||||
int32_t msgSize;
|
||||
SArray* pMsgs;
|
||||
SRequestConnInfo conn;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
SArray* pTaskIds;
|
||||
} SCtgBatch;
|
||||
|
||||
typedef struct SCtgJob {
|
||||
int64_t refId;
|
||||
int32_t batchId;
|
||||
SHashObj* pBatchs;
|
||||
SArray* pTasks;
|
||||
int32_t taskDone;
|
||||
SMetaData jobRes;
|
||||
|
@ -236,6 +251,16 @@ typedef struct SCtgMsgCtx {
|
|||
char* target;
|
||||
} SCtgMsgCtx;
|
||||
|
||||
|
||||
typedef struct SCtgTaskCallbackParam {
|
||||
uint64_t queryId;
|
||||
int64_t refId;
|
||||
SArray* taskId;
|
||||
int32_t reqType;
|
||||
int32_t batchId;
|
||||
} SCtgTaskCallbackParam;
|
||||
|
||||
|
||||
typedef struct SCtgTask SCtgTask;
|
||||
typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*);
|
||||
|
||||
|
@ -258,6 +283,7 @@ typedef struct SCtgTask {
|
|||
SRWLatch lock;
|
||||
SArray* pParents;
|
||||
SCtgSubRes subRes;
|
||||
SHashObj* pBatchs;
|
||||
} SCtgTask;
|
||||
|
||||
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
|
||||
|
@ -618,6 +644,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
|||
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
|
||||
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
|
||||
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
|
||||
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs);
|
||||
|
||||
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param);
|
||||
int32_t ctgLaunchJob(SCtgJob *pJob);
|
||||
|
@ -626,6 +653,9 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
|
|||
int32_t ctgGetTbCfgCb(SCtgTask *pTask);
|
||||
void ctgFreeHandle(SCatalog* pCatalog);
|
||||
|
||||
void ctgFreeMsgSendParam(void* param);
|
||||
void ctgFreeBatch(SCtgBatch *pBatch);
|
||||
void ctgFreeBatchs(SHashObj *pBatchs);
|
||||
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
|
||||
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
|
||||
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
|
||||
|
@ -642,7 +672,7 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
|
|||
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
|
||||
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
|
||||
char * ctgTaskTypeStr(CTG_TASK_TYPE type);
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId);
|
||||
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
|
||||
void ctgFreeSTableIndex(void *info);
|
||||
void ctgClearSubTaskRes(SCtgSubRes *pRes);
|
||||
|
|
|
@ -20,12 +20,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SCtgTaskCallbackParam {
|
||||
uint64_t queryId;
|
||||
int64_t refId;
|
||||
uint64_t taskId;
|
||||
int32_t reqType;
|
||||
} SCtgTaskCallbackParam;
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -473,8 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
|
|||
pJob->tbCfgNum = tbCfgNum;
|
||||
pJob->svrVerNum = svrVerNum;
|
||||
|
||||
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
|
||||
#if CTG_BATCH_FETCH
|
||||
pJob->pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pJob->pBatchs) {
|
||||
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
#endif
|
||||
|
||||
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
|
||||
if (NULL == pJob->pTasks) {
|
||||
ctgError("taosArrayInit %d tasks failed", taskNum);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
@ -560,7 +567,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(*job);
|
||||
ctgFreeJob(*job);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -776,7 +783,8 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
|
|||
pParent->subRes.code = code;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pParent->pBatchs = pTask->pBatchs;
|
||||
CTG_ERR_JRET(pParent->subRes.fp(pParent));
|
||||
}
|
||||
|
||||
|
@ -872,7 +880,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
|
||||
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
|
||||
|
||||
ctx->vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
|
||||
|
@ -890,7 +898,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(ctx->pName));
|
||||
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(ctx->pName));
|
||||
ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false);
|
||||
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
|
@ -1653,6 +1661,7 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
|
|||
if (CTG_TASK_DONE == pSub->status) {
|
||||
pTask->subRes.code = pSub->code;
|
||||
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
|
||||
pTask->pBatchs = pSub->pBatchs;
|
||||
CTG_ERR_JRET(pTask->subRes.fp(pTask));
|
||||
} else {
|
||||
if (NULL == pSub->pParents) {
|
||||
|
@ -1690,6 +1699,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
|
|||
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
|
||||
|
||||
if (newTask) {
|
||||
pSub->pBatchs = pTask->pBatchs;
|
||||
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
|
||||
pSub->status = CTG_TASK_LAUNCHED;
|
||||
}
|
||||
|
@ -1702,9 +1712,11 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
|
|||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
|
||||
pTask->pBatchs = pJob->pBatchs;
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
|
||||
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
|
||||
|
||||
pTask->status = CTG_TASK_LAUNCHED;
|
||||
}
|
||||
|
||||
|
@ -1712,6 +1724,10 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
|
|||
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
|
||||
|
||||
taosAsyncExec(ctgCallUserCb, pJob, NULL);
|
||||
#if CTG_BATCH_FETCH
|
||||
} else {
|
||||
ctgLaunchBatchs(pJob->pCtg, pJob, pJob->pBatchs);
|
||||
#endif
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -18,9 +18,67 @@
|
|||
#include "tname.h"
|
||||
#include "catalogInt.h"
|
||||
#include "systable.h"
|
||||
#include "ctgRemote.h"
|
||||
#include "tref.h"
|
||||
|
||||
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SArray* pTaskId = cbParam->taskId;
|
||||
SCatalog* pCtg = pJob->pCtg;
|
||||
int32_t taskNum = taosArrayGetSize(pTaskId);
|
||||
SDataBuf taskMsg = *pMsg;
|
||||
int32_t offset = 0;
|
||||
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
|
||||
ASSERT(taskNum == msgNum || 0 == msgNum);
|
||||
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, TMSG_INFO(cbParam->reqType + 1));
|
||||
|
||||
offset += sizeof(msgNum);
|
||||
SBatchRsp rsp = {0};
|
||||
SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pBatchs) {
|
||||
ctgError("taosHashInit %d batch failed", taskNum);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taskNum; ++i) {
|
||||
int32_t* taskId = taosArrayGet(pTaskId, i);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
if (msgNum > 0) {
|
||||
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
|
||||
offset += sizeof(rsp.reqType);
|
||||
rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
|
||||
offset += sizeof(rsp.msgLen);
|
||||
rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
|
||||
offset += sizeof(rsp.rspCode);
|
||||
rsp.msg = ((char*)pMsg->pData) + offset;
|
||||
offset += rsp.msgLen;
|
||||
|
||||
taskMsg.msgType = rsp.reqType;
|
||||
taskMsg.pData = rsp.msg;
|
||||
taskMsg.len = rsp.msgLen;
|
||||
} else {
|
||||
rsp.reqType = -1;
|
||||
taskMsg.msgType = -1;
|
||||
taskMsg.pData = NULL;
|
||||
taskMsg.len = 0;
|
||||
}
|
||||
|
||||
pTask->pBatchs = pBatchs;
|
||||
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(taskMsg.msgType + 1));
|
||||
|
||||
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
|
||||
|
||||
_return:
|
||||
|
||||
ctgFreeBatchs(pBatchs);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -233,6 +291,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
|
|||
break;
|
||||
}
|
||||
default:
|
||||
if (TSDB_CODE_SUCCESS != rspCode) {
|
||||
qError("Got error rsp, error:%s", tstrerror(rspCode));
|
||||
CTG_ERR_RET(rspCode);
|
||||
}
|
||||
|
||||
qError("invalid req type %s", TMSG_INFO(reqType));
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
@ -254,12 +317,32 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, cbParam->taskId);
|
||||
SCatalog* pCtg = pJob->pCtg;
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
|
||||
if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
|
||||
CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
|
||||
} else {
|
||||
int32_t *taskId = taosArrayGet(cbParam->taskId, 0);
|
||||
SCtgTask *pTask = taosArrayGet(pJob->pTasks, *taskId);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, TMSG_INFO(cbParam->reqType + 1));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
SHashObj* pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pBatchs) {
|
||||
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pTask->pBatchs = pBatchs;
|
||||
#endif
|
||||
|
||||
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
|
||||
#endif
|
||||
}
|
||||
|
||||
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode));
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
@ -272,12 +355,12 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
|
||||
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == msgSendInfo) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
|
||||
CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCtgTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
|
||||
|
@ -287,12 +370,13 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg
|
|||
}
|
||||
|
||||
param->reqType = msgType;
|
||||
param->queryId = pTask->pJob->queryId;
|
||||
param->refId = pTask->pJob->refId;
|
||||
param->taskId = pTask->taskId;
|
||||
param->queryId = pJob->queryId;
|
||||
param->refId = pJob->refId;
|
||||
param->taskId = pTaskId;
|
||||
param->batchId = batchId;
|
||||
|
||||
msgSendInfo->param = param;
|
||||
msgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
|
||||
msgSendInfo->fp = ctgHandleMsgCallback;
|
||||
|
||||
*pMsgSendInfo = msgSendInfo;
|
||||
|
@ -301,18 +385,19 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(param);
|
||||
taosMemoryFree(msgSendInfo);
|
||||
taosArrayDestroy(pTaskId);
|
||||
destroySendMsgInfo(msgSendInfo);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob* pJob, SArray* pTaskId,
|
||||
int32_t batchId, char* dbFName, int32_t vgId, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo *pMsgSendInfo = NULL;
|
||||
CTG_ERR_JRET(ctgMakeMsgSendInfo(pTask, msgType, &pMsgSendInfo));
|
||||
CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo));
|
||||
|
||||
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, pTask);
|
||||
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
|
||||
|
||||
pMsgSendInfo->requestId = pConn->requestId;
|
||||
pMsgSendInfo->requestObjRefId = pConn->requestObjRefId;
|
||||
|
@ -323,24 +408,178 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTask* pTask
|
|||
|
||||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
|
||||
pMsgSendInfo = NULL;
|
||||
if (code) {
|
||||
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
|
||||
CTG_ERR_JRET(code);
|
||||
}
|
||||
|
||||
ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pTask->pJob->queryId, msgType, TMSG_INFO(msgType));
|
||||
ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (pMsgSendInfo) {
|
||||
taosMemoryFreeClear(pMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pMsgSendInfo);
|
||||
destroySendMsgInfo(pMsgSendInfo);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo *pConn, SCtgTask* pTask, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
SHashObj* pBatchs = pTask->pBatchs;
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
|
||||
int32_t taskNum = taosArrayGetSize(pTask->pJob->pTasks);
|
||||
SCtgBatch newBatch = {0};
|
||||
SBatchMsg req = {0};
|
||||
|
||||
if (NULL == pBatch) {
|
||||
newBatch.pMsgs = taosArrayInit(taskNum, sizeof(SBatchMsg));
|
||||
newBatch.pTaskIds = taosArrayInit(taskNum, sizeof(int32_t));
|
||||
if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
newBatch.conn = *pConn;
|
||||
|
||||
req.msgType = msgType;
|
||||
req.msgLen = msgSize;
|
||||
req.msg = msg;
|
||||
if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
|
||||
|
||||
if (vgId > 0) {
|
||||
if (TDMT_VND_TABLE_CFG == msgType) {
|
||||
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
|
||||
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
|
||||
} else if (TDMT_VND_TABLE_META == msgType) {
|
||||
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
|
||||
} else {
|
||||
ctgError("invalid vnode msgType %d", msgType);
|
||||
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
|
||||
newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1);
|
||||
|
||||
if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId, vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
req.msgType = msgType;
|
||||
req.msgLen = msgSize;
|
||||
req.msg = msg;
|
||||
if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;
|
||||
|
||||
if (vgId > 0) {
|
||||
if (TDMT_VND_TABLE_CFG == msgType) {
|
||||
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
|
||||
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
|
||||
} else if (TDMT_VND_TABLE_META == msgType) {
|
||||
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||
tNameGetFullDbName(ctx->pName, newBatch.dbFName);
|
||||
} else {
|
||||
ctgError("invalid vnode msgType %d", msgType);
|
||||
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, vgId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
ctgFreeBatch(&newBatch);
|
||||
taosMemoryFree(msg);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
|
||||
*msg = taosMemoryMalloc(pBatch->msgSize);
|
||||
if (NULL == (*msg)) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
int32_t offset = 0;
|
||||
int32_t num = taosArrayGetSize(pBatch->pMsgs);
|
||||
SBatchReq *pBatchReq = (SBatchReq*)(*msg);
|
||||
|
||||
pBatchReq->header.vgId = htonl(vgId);
|
||||
pBatchReq->msgNum = htonl(num);
|
||||
offset += sizeof(SBatchReq);
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
|
||||
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
|
||||
offset += sizeof(pReq->msgType);
|
||||
*(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen);
|
||||
offset += sizeof(pReq->msgLen);
|
||||
memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen);
|
||||
offset += pReq->msgLen;
|
||||
}
|
||||
|
||||
ASSERT(pBatch->msgSize == offset);
|
||||
|
||||
qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs) {
|
||||
int32_t code = 0;
|
||||
void* msg = NULL;
|
||||
void* p = taosHashIterate(pBatchs, NULL);
|
||||
while (NULL != p) {
|
||||
size_t len = 0;
|
||||
int32_t* vgId = taosHashGetKey(p, &len);
|
||||
SCtgBatch* pBatch = (SCtgBatch*)p;
|
||||
|
||||
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
|
||||
|
||||
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
|
||||
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId,
|
||||
pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
|
||||
pBatch->pTaskIds = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
p = taosHashIterate(pBatchs, p);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
if (p) {
|
||||
taosHashCancelIterate(pBatchs, p);
|
||||
}
|
||||
taosMemoryFree(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
@ -361,7 +600,18 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, NULL));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -396,7 +646,18 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
|
|||
|
||||
if (pTask) {
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -436,8 +697,18 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, input->db));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -476,8 +747,18 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)dbFName));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -516,8 +797,18 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)indexName));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -559,8 +850,18 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
|
|||
}
|
||||
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -599,8 +900,18 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)funcName));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -639,8 +950,18 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)user));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -684,8 +1005,17 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
|
|||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -744,7 +1074,21 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
|||
.requestId = pConn->requestId,
|
||||
.requestObjRefId = pConn->requestObjRefId,
|
||||
.mgmtEps = vgroupInfo->epSet};
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask, reqType, msg, msgLen));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(ctx->pName, dbFName);
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -791,7 +1135,20 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
|
|||
.requestId = pConn->requestId,
|
||||
.requestObjRefId = pConn->requestObjRefId,
|
||||
.mgmtEps = vgroupInfo->epSet};
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask, reqType, msg, msgLen));
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(ctx->pName, dbFName);
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -832,8 +1189,17 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
|
|||
|
||||
if (pTask) {
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
@ -868,8 +1234,18 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
|
|||
|
||||
if (pTask) {
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, NULL));
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen));
|
||||
#else
|
||||
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
|
||||
if (NULL == pTaskId) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
taosArrayPush(pTaskId, &pTask->taskId);
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen));
|
||||
#endif
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
|
|
@ -19,6 +19,39 @@
|
|||
#include "catalogInt.h"
|
||||
#include "systable.h"
|
||||
|
||||
void ctgFreeMsgSendParam(void* param) {
|
||||
if (NULL == param) {
|
||||
return;
|
||||
}
|
||||
|
||||
SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param;
|
||||
taosArrayDestroy(pParam->taskId);
|
||||
|
||||
taosMemoryFree(param);
|
||||
}
|
||||
|
||||
void ctgFreeBatch(SCtgBatch *pBatch) {
|
||||
if (NULL == pBatch) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBatch->pMsgs);
|
||||
taosArrayDestroy(pBatch->pTaskIds);
|
||||
}
|
||||
|
||||
void ctgFreeBatchs(SHashObj *pBatchs) {
|
||||
void* p = taosHashIterate(pBatchs, NULL);
|
||||
while (NULL != p) {
|
||||
SCtgBatch* pBatch = (SCtgBatch*)p;
|
||||
|
||||
ctgFreeBatch(pBatch);
|
||||
|
||||
p = taosHashIterate(pBatchs, p);
|
||||
}
|
||||
|
||||
taosHashCleanup(pBatchs);
|
||||
}
|
||||
|
||||
char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
|
||||
switch (type) {
|
||||
case CTG_TASK_GET_QNODE:
|
||||
|
@ -612,6 +645,7 @@ void ctgFreeJob(void* job) {
|
|||
uint64_t qid = pJob->queryId;
|
||||
|
||||
ctgFreeTasks(pJob->pTasks);
|
||||
ctgFreeBatchs(pJob->pBatchs);
|
||||
|
||||
ctgFreeSMetaData(&pJob->jobRes);
|
||||
|
||||
|
@ -867,14 +901,10 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) {
|
||||
if (msgType == TDMT_VND_TABLE_META) {
|
||||
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(ctx->pName, dbFName);
|
||||
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) {
|
||||
if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META) {
|
||||
pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
|
||||
pMsgSendInfo->target.vgId = ctx->vgId;
|
||||
pMsgSendInfo->target.vgId = vgId;
|
||||
pMsgSendInfo->target.dbFName = strdup(dbFName);
|
||||
} else {
|
||||
pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
|
||||
|
|
Loading…
Reference in New Issue