change rpc ctx api
This commit is contained in:
parent
7259a35620
commit
c867c6812a
|
@ -643,13 +643,14 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
||||||
pMsgSendInfo->param = param;
|
pMsgSendInfo->param = param;
|
||||||
pMsgSendInfo->fp = fp;
|
pMsgSendInfo->fp = fp;
|
||||||
|
|
||||||
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
|
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
|
||||||
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
||||||
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));
|
SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, true));
|
||||||
|
pCtx->freeFunc = schFreeRpcCtxVal;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -938,7 +939,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
||||||
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
|
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
|
||||||
|
|
||||||
int32_t msgType = TDMT_VND_RES_READY_RSP;
|
int32_t msgType = TDMT_VND_RES_READY_RSP;
|
||||||
SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
|
SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo};
|
||||||
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
||||||
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
@ -952,6 +953,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));
|
SCH_ERR_JRET(schMakeBrokenLinkVal(pJob, pTask, &pCtx->brokenVal, false));
|
||||||
|
pCtx->freeFunc = schFreeRpcCtxVal;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
|
|
@ -77,16 +77,14 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
|
SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
|
||||||
|
|
||||||
(*ctxVal->freeFunc)(ctxVal->val);
|
(*pCtx->freeFunc)(ctxVal->val);
|
||||||
|
|
||||||
pIter = taosHashIterate(pCtx->args, pIter);
|
pIter = taosHashIterate(pCtx->args, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pCtx->args);
|
taosHashCleanup(pCtx->args);
|
||||||
|
|
||||||
if (pCtx->brokenVal.freeFunc) {
|
(*pCtx->freeFunc)(pCtx->brokenVal.val);
|
||||||
(*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue