enh: add binary serialization method to node structure
This commit is contained in:
parent
2f905064e5
commit
ff9d673489
|
@ -52,10 +52,14 @@ int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstrea
|
|||
|
||||
void qClearSubplanExecutionNode(SSubplan* pSubplan);
|
||||
|
||||
// Convert to subplan to string for the scheduler to send to the executor
|
||||
// Convert to subplan to display string for the scheduler to send to the executor
|
||||
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
|
||||
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
|
||||
|
||||
// Convert to subplan to msg for the scheduler to send to the executor
|
||||
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
|
||||
int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan);
|
||||
|
||||
char* qQueryPlanToString(const SQueryPlan* pPlan);
|
||||
SQueryPlan* qStringToQueryPlan(const char* pStr);
|
||||
|
||||
|
|
|
@ -1082,6 +1082,170 @@ static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
enum { EP_CODE_FQDN = 1, EP_CODE_port };
|
||||
|
||||
static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SEp* pNode = (const SEp*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) {
|
||||
SEp* pNode = (SEp*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case EP_CODE_FQDN:
|
||||
code = tlvDecodeCStr(pTlv, pNode->fqdn);
|
||||
break;
|
||||
case EP_CODE_port:
|
||||
code = tlvDecodeU16(pTlv, &pNode->port);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS };
|
||||
|
||||
static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SEpSet* pNode = (const SEpSet*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeI8(pEncoder, EP_SET_CODE_IN_USE, pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI8(pEncoder, EP_SET_CODE_NUM_OF_EPS, pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObjArray(pEncoder, EP_SET_CODE_EPS, epToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) {
|
||||
SEpSet* pNode = (SEpSet*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case EP_SET_CODE_IN_USE:
|
||||
code = tlvDecodeI8(pTlv, &pNode->inUse);
|
||||
break;
|
||||
case EP_SET_CODE_NUM_OF_EPS:
|
||||
code = tlvDecodeI8(pTlv, &pNode->numOfEps);
|
||||
break;
|
||||
case EP_SET_CODE_EPS:
|
||||
code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->eps, sizeof(SEp));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET };
|
||||
|
||||
static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, QUERY_NODE_ADDR_CODE_EP_SET, epSetToMsg, &pNode->epSet);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) {
|
||||
SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case QUERY_NODE_ADDR_CODE_NODE_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->nodeId);
|
||||
break;
|
||||
case QUERY_NODE_ADDR_CODE_EP_SET:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum {
|
||||
DOWNSTREAM_SOURCE_CODE_ADDR = 1,
|
||||
DOWNSTREAM_SOURCE_CODE_TASK_ID,
|
||||
DOWNSTREAM_SOURCE_CODE_SCHED_ID,
|
||||
DOWNSTREAM_SOURCE_CODE_EXEC_ID,
|
||||
DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE
|
||||
};
|
||||
|
||||
static int32_t downstreamSourceNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeObj(pEncoder, DOWNSTREAM_SOURCE_CODE_ADDR, queryNodeAddrToMsg, &pNode->addr);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_TASK_ID, pNode->taskId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_SCHED_ID, pNode->schedId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_EXEC_ID, pNode->execId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE, pNode->fetchMsgType);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToDownstreamSourceNode(STlvDecoder* pDecoder, void* pObj) {
|
||||
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case DOWNSTREAM_SOURCE_CODE_ADDR:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->addr);
|
||||
break;
|
||||
case DOWNSTREAM_SOURCE_CODE_TASK_ID:
|
||||
code = tlvDecodeU64(pTlv, &pNode->taskId);
|
||||
break;
|
||||
case DOWNSTREAM_SOURCE_CODE_SCHED_ID:
|
||||
code = tlvDecodeU64(pTlv, &pNode->schedId);
|
||||
break;
|
||||
case DOWNSTREAM_SOURCE_CODE_EXEC_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->execId);
|
||||
break;
|
||||
case DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE:
|
||||
code = tlvDecodeI32(pTlv, &pNode->fetchMsgType);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum {
|
||||
PHY_NODE_CODE_OUTPUT_DESC = 1,
|
||||
PHY_NODE_CODE_CONDITIONS,
|
||||
|
@ -1401,80 +1565,6 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
enum { EP_CODE_FQDN = 1, EP_CODE_port };
|
||||
|
||||
static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SEp* pNode = (const SEp*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) {
|
||||
SEp* pNode = (SEp*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case EP_CODE_FQDN:
|
||||
code = tlvDecodeCStr(pTlv, pNode->fqdn);
|
||||
break;
|
||||
case EP_CODE_port:
|
||||
code = tlvDecodeU16(pTlv, &pNode->port);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS };
|
||||
|
||||
static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SEpSet* pNode = (const SEpSet*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeI8(pEncoder, EP_SET_CODE_IN_USE, pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI8(pEncoder, EP_SET_CODE_NUM_OF_EPS, pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObjArray(pEncoder, EP_SET_CODE_EPS, epToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) {
|
||||
SEpSet* pNode = (SEpSet*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case EP_SET_CODE_IN_USE:
|
||||
code = tlvDecodeI8(pTlv, &pNode->inUse);
|
||||
break;
|
||||
case EP_SET_CODE_NUM_OF_EPS:
|
||||
code = tlvDecodeI8(pTlv, &pNode->numOfEps);
|
||||
break;
|
||||
case EP_SET_CODE_EPS:
|
||||
code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->eps, sizeof(SEp));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum {
|
||||
PHY_SYSTABLE_SCAN_CODE_SCAN = 1,
|
||||
PHY_SYSTABLE_SCAN_CODE_MGMT_EP_SET,
|
||||
|
@ -2594,38 +2684,6 @@ static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET };
|
||||
|
||||
static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj;
|
||||
|
||||
int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, QUERY_NODE_ADDR_CODE_EP_SET, epSetToMsg, &pNode->epSet);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) {
|
||||
SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STlv* pTlv = NULL;
|
||||
tlvForEach(pDecoder, pTlv, code) {
|
||||
switch (pTlv->type) {
|
||||
case QUERY_NODE_ADDR_CODE_NODE_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->nodeId);
|
||||
break;
|
||||
case QUERY_NODE_ADDR_CODE_EP_SET:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
enum {
|
||||
SUBPLAN_CODE_SUBPLAN_ID = 1,
|
||||
SUBPLAN_CODE_SUBPLAN_TYPE,
|
||||
|
@ -2802,6 +2860,8 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
case QUERY_NODE_SLOT_DESC:
|
||||
code = slotDescNodeToMsg(pObj, pEncoder);
|
||||
break;
|
||||
case QUERY_NODE_DOWNSTREAM_SOURCE:
|
||||
return downstreamSourceNodeToMsg(pObj, pEncoder);
|
||||
case QUERY_NODE_LEFT_VALUE:
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
|
@ -2929,6 +2989,8 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case QUERY_NODE_SLOT_DESC:
|
||||
code = msgToSlotDescNode(pDecoder, pObj);
|
||||
break;
|
||||
case QUERY_NODE_DOWNSTREAM_SOURCE:
|
||||
return msgToDownstreamSourceNode(pDecoder, pObj);
|
||||
case QUERY_NODE_LEFT_VALUE:
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
|
|
|
@ -123,6 +123,21 @@ int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
|
|||
|
||||
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); }
|
||||
|
||||
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
|
||||
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
|
||||
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
|
||||
*pLen = insert->size;
|
||||
*pStr = insert->pData;
|
||||
insert->pData = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return nodesNodeToMsg((const SNode*)pSubplan, pStr, pLen);
|
||||
}
|
||||
|
||||
int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan) {
|
||||
return nodesMsgToNode(pStr, len, (SNode**)pSubplan);
|
||||
}
|
||||
|
||||
char* qQueryPlanToString(const SQueryPlan* pPlan) {
|
||||
char* pStr = NULL;
|
||||
int32_t len = 0;
|
||||
|
|
|
@ -559,7 +559,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
|
|||
|
||||
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
|
||||
|
||||
code = qStringToSubplan(qwMsg->msg, &plan);
|
||||
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
|
||||
|
@ -968,7 +968,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
|
|||
DataSinkHandle sinkHandle = NULL;
|
||||
SQWTaskCtx ctx = {0};
|
||||
|
||||
code = qStringToSubplan(qwMsg->msg, &plan);
|
||||
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
|
||||
|
|
|
@ -860,7 +860,7 @@ int32_t schLaunchTaskImpl(void *param) {
|
|||
SSubplan *plan = pTask->plan;
|
||||
|
||||
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
|
||||
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
|
||||
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
|
||||
pTask->msgLen);
|
||||
|
|
Loading…
Reference in New Issue