enh: refact insert into select processing

This commit is contained in:
dapan1121 2022-12-20 16:55:11 +08:00
parent 129770d5ad
commit 32e8202a25
6 changed files with 224 additions and 130 deletions

View File

@ -265,7 +265,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug // for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf); char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);

View File

@ -90,7 +90,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
#define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE) #define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE)
// SRow ================================ // SRow ================================
int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow); int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow);
void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow); void tRowDestroy(SRow *pRow);
void tRowSort(SArray *aRowP); void tRowSort(SArray *aRowP);

View File

@ -2226,15 +2226,16 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
} }
#endif #endif
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
int32_t vgId, tb_uid_t suid) { int64_t uid, int32_t vgId, tb_uid_t suid) {
SSubmitReq2* pReq = NULL; SSubmitReq2* pReq = *ppReq;
SArray* pVals = NULL; SArray* pVals = NULL;
int32_t numOfBlks = 0; int32_t numOfBlks = 0;
int32_t sz = 1; int32_t sz = 1;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (NULL == pReq) {
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) { if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
@ -2243,6 +2244,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
goto _end; goto _end;
} }
}
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
@ -2261,7 +2263,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
goto _end; goto _end;
} }
tbData.suid = suid; tbData.suid = suid;
tbData.uid = pDataBlock->info.id.groupId; tbData.uid = uid;
tbData.sver = pTSchema->version; tbData.sver = pTSchema->version;
if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) { if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
@ -2277,7 +2279,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
int32_t offset = 0; int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
STColumn* pCol = &pTSchema->columns[k]; const STColumn* pCol = &pTSchema->columns[k];
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) { switch (pColInfoData->info.type) {

View File

@ -97,7 +97,7 @@ typedef struct {
} \ } \
} while (0) } while (0)
int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) { int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow) {
int32_t code = 0; int32_t code = 0;
ASSERT(TARRAY_SIZE(aColVal) > 0); ASSERT(TARRAY_SIZE(aColVal) > 0);
@ -112,7 +112,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
const int32_t nColVal = TARRAY_SIZE(aColVal); const int32_t nColVal = TARRAY_SIZE(aColVal);
SColVal *pColVal = (iColVal < nColVal) ? &colVals[iColVal] : NULL; SColVal *pColVal = (iColVal < nColVal) ? &colVals[iColVal] : NULL;
int32_t iTColumn = 1; int32_t iTColumn = 1;
STColumn *pTColumn = pTSchema->columns + iTColumn; const STColumn *pTColumn = pTSchema->columns + iTColumn;
int32_t ntp = 0; int32_t ntp = 0;
int32_t nkv = 0; int32_t nkv = 0;
int32_t maxIdx = 0; int32_t maxIdx = 0;

View File

@ -705,7 +705,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8 smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8
" failed since %s", " failed since %s",
SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr());

View File

@ -58,7 +58,7 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
pInserter->submitRes.code = code; pInserter->submitRes.code = code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp)); pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecoderInit(&coder, pMsg->pData, pMsg->len);
code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp); code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
@ -83,8 +83,7 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
} }
} }
// pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows; pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
// pInserter->submitRes.affectedRows += pInserter->submitRes.
qDebug("submit rsp received, affectedRows:%d, total:%"PRId64, pInserter->submitRes.pRsp->affectedRows, qDebug("submit rsp received, affectedRows:%d, total:%"PRId64, pInserter->submitRes.pRsp->affectedRows,
pInserter->submitRes.affectedRows); pInserter->submitRes.affectedRows);
tDecoderClear(&coder); tDecoderClear(&coder);
@ -97,7 +96,7 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) { static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int32_t msgLen, void* pTransporter, SEpSet* pEpset) {
// send the fetch remote task result reques // send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) { if (NULL == pMsgSendInfo) {
@ -112,7 +111,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
pMsgSendInfo->param = pParam; pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = ntohl(pMsg->length); pMsgSendInfo->msgInfo.len = msgLen;
pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
pMsgSendInfo->fp = inserterCallback; pMsgSendInfo->fp = inserterCallback;
@ -120,65 +119,85 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo); return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
} }
int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) { static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
const SArray* pBlocks = pInserter->pDataBlocks; int32_t code = TSDB_CODE_SUCCESS;
const STSchema* pTSchema = pInserter->pSchema; int32_t len = 0;
int64_t uid = pInserter->pNode->tableId; void* pBuf = NULL;
int64_t suid = pInserter->pNode->stableId; tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
int32_t vgId = pInserter->pNode->vgId; if (TSDB_CODE_SUCCESS == code) {
bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols); SEncoder encoder;
len += sizeof(SMsgHead);
SSubmitReq* ret = NULL; pBuf = taosMemoryMalloc(len);
int32_t sz = taosArrayGetSize(pBlocks); if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
// cal size }
int32_t cap = sizeof(SSubmitReq); ((SMsgHead*)pBuf)->vgId = htonl(vgId);
for (int32_t i = 0; i < sz; i++) { ((SMsgHead*)pBuf)->contLen = htonl(len);
SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
int32_t rows = pDataBlock->info.rows; code = tEncodeSSubmitReq2(&encoder, pReq);
// TODO min tEncoderClear(&encoder);
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen;
} }
// assign data if (TSDB_CODE_SUCCESS == code) {
// TODO *pData = pBuf;
ret = taosMemoryCalloc(1, cap); *pLen = len;
ret->header.vgId = htonl(vgId); } else {
ret->version = htonl(pTSchema->version); taosMemoryFree(pBuf);
ret->length = sizeof(SSubmitReq); }
ret->numOfBlocks = htonl(sz); return code;
}
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
blkHead->sversion = htonl(pTSchema->version); int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
// TODO int64_t uid, int32_t vgId, tb_uid_t suid) {
blkHead->suid = htobe64(suid); SSubmitReq2* pReq = *ppReq;
blkHead->uid = htobe64(uid); SArray* pVals = NULL;
blkHead->schemaLen = htonl(0); int32_t numOfBlks = 0;
bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols);
terrno = TSDB_CODE_SUCCESS;
if (NULL == pReq) {
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
}
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
SSubmitTbData tbData = {0};
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end;
}
tbData.suid = suid;
tbData.uid = uid;
tbData.sver = pTSchema->version;
if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
taosArrayDestroy(tbData.aRowP);
goto _end;
}
int32_t rows = 0;
int32_t dataLen = 0;
STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
int64_t lastTs = TSKEY_MIN; int64_t lastTs = TSKEY_MIN;
bool ignoreRow = false; bool ignoreRow = false;
for (int32_t j = 0; j < pDataBlock->info.rows; j++) { bool disorderTs = false;
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, rowData);
ignoreRow = false; for (int32_t j = 0; j < rows; ++j) { // iterate by row
for (int32_t k = 0; k < pTSchema->numOfCols; k++) { taosArrayClear(pVals);
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = NULL; int32_t offset = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; ++k) { // iterate by column
int16_t colIdx = k; int16_t colIdx = k;
const STColumn* pCol = &pTSchema->columns[k];
if (!fullCol) { if (!fullCol) {
int16_t* slotId = taosHashGet(pInserter->pCols, &pColumn->colId, sizeof(pColumn->colId)); int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
if (NULL == slotId) { if (NULL == slotId) {
continue; continue;
} }
@ -186,74 +205,147 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
colIdx = *slotId; colIdx = *slotId;
} }
pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
if (pColData->info.type != pColumn->type) { void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
qError("col type mis-match, schema type:%d, type in block:%d", pColumn->type, pColData->info.type);
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_APP_ERROR;
}
if (colDataIsNull_s(pColData, j)) { switch (pColInfoData->info.type) {
if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) { case TSDB_DATA_TYPE_NCHAR:
ignoreRow = true; case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type);
if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
void* data = colDataGetVarData(pColInfoData, j);
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
}
break; break;
} }
case TSDB_DATA_TYPE_VARBINARY:
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); case TSDB_DATA_TYPE_DECIMAL:
} else { case TSDB_DATA_TYPE_BLOB:
void* data = colDataGetData(pColData, j); case TSDB_DATA_TYPE_JSON:
if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) { case TSDB_DATA_TYPE_MEDIUMBLOB:
if (*(int64_t*)data == lastTs) { uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
ignoreRow = true; ASSERT(0);
break; break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);
} else { } else {
lastTs = *(int64_t*)data; if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
if (*(int64_t*)var == lastTs) {
ignoreRow = true;
} else if (*(int64_t*)var < lastTs) {
disorderTs = true;
} else {
lastTs = *(int64_t*)var;
} }
} }
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
SValue sv;
memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
taosArrayPush(pVals, &cv);
} }
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
ASSERT(0);
} }
if (!fullCol) { break;
rb.hasNone = true;
} }
tdSRowEnd(&rb);
if (ignoreRow) { if (ignoreRow) {
break;
}
}
if (ignoreRow) {
ignoreRow = false;
continue; continue;
} }
rows++; SRow* pRow = NULL;
int32_t rowLen = TD_ROW_LEN(rowData); if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
rowData = POINTER_SHIFT(rowData, rowLen); tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
dataLen += rowLen; goto _end;
}
taosArrayPush(tbData.aRowP, &pRow);
} }
blkHead->dataLen = htonl(dataLen); if (disorderTs) {
blkHead->numOfRows = htonl(rows); tRowSort(tbData.aRowP);
if ((terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
ret->length += sizeof(SSubmitBlk) + dataLen; goto _end;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen); }
} }
ret->length = htonl(ret->length); taosArrayPush(pReq->aSubmitTbData, &tbData);
*pReq = ret;
_end:
taosArrayDestroy(pVals);
if (terrno != 0) {
*ppReq = NULL;
if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
return TSDB_CODE_FAILED;
}
*ppReq = pReq;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
const SArray* pBlocks = pInserter->pDataBlocks;
const STSchema* pTSchema = pInserter->pSchema;
int64_t uid = pInserter->pNode->tableId;
int64_t suid = pInserter->pNode->stableId;
int32_t vgId = pInserter->pNode->vgId;
int32_t sz = taosArrayGetSize(pBlocks);
int32_t code = 0;
SSubmitReq2 *pReq = NULL;
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid);
if (code) {
if (pReq) {
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
return code;
}
}
code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
return code;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosArrayPush(pInserter->pDataBlocks, &pInput->pData); taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
SSubmitReq* pMsg = NULL; void* pMsg = NULL;
int32_t code = dataBlockToSubmit(pInserter, &pMsg); int32_t msgLen = 0;
int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
if (code) { if (code) {
return code; return code;
} }
taosArrayClear(pInserter->pDataBlocks); taosArrayClear(pInserter->pDataBlocks);
code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet); code = sendSubmitRequest(pInserter, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
if (code) { if (code) {
return code; return code;
} }