diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a82364f347..ede750b72f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3307,6 +3307,12 @@ typedef struct { SArray* aSubmitTbData; // SArray } SSubmitReq2; +typedef struct { + SMsgHead header; + int64_t version; + char data[]; // SSubmitReq2 +} SSubmitReq2Msg; + int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag); @@ -3323,6 +3329,7 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); #define TSDB_MSG_FLG_ENCODE 0x1 #define TSDB_MSG_FLG_DECODE 0x2 +#define TSDB_MSG_FLG_CMPT 0x3 typedef struct { union { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2ceeeefa5f..9c1589eacc 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6995,9 +6995,13 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) { return; } - if (flag == TSDB_MSG_FLG_ENCODE) { + if (flag == TSDB_MSG_FLG_ENCODE || flag == TSDB_MSG_FLG_CMPT) { if (pTbData->pCreateTbReq) { - tdDestroySVCreateTbReq(pTbData->pCreateTbReq); + if (flag == TSDB_MSG_FLG_ENCODE) { + tdDestroySVCreateTbReq(pTbData->pCreateTbReq); + } else { + tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE); + } taosMemoryFree(pTbData->pCreateTbReq); } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 1b191dd5a5..e7f03d668e 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -257,7 +257,6 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t rows = pDataBlock->info.rows; SSubmitTbData tbData = {0}; - if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) { goto _end; @@ -313,14 +312,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); if (TSDB_CODE_SUCCESS == terrno) { SEncoder encoder; - len += sizeof(SMsgHead); + len += sizeof(SSubmitReq2Msg); pBuf = rpcMallocCont(len); if (NULL == pBuf) { goto _end; } - ((SMsgHead *)pBuf)->vgId = TD_VID(pVnode); - ((SMsgHead *)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode); + ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg *)pBuf)->version = htobe64(1); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; /*vError("failed to encode submit req since %s", terrstr());*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 753c42ff33..e85dfc66de 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -683,13 +683,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHead->msgType == TDMT_VND_SUBMIT) { SPackedData submit = { - .msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)), - .msgLen = pHead->bodyLen - sizeof(SMsgHead), + .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), + .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), .ver = pHead->version, }; if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, - TD_VID(pTq->pVnode), req.subKey); + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode), + req.subKey); return -1; } if (taosxRsp.blockNum > 0 /* threshold */) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 559a3b76fe..2e3dc86ce9 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -207,8 +207,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead)); - int32_t len = msgLen - sizeof(SMsgHead); + void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); + int32_t len = msgLen - sizeof(SSubmitReq2Msg); tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 04af05cc44..414ffda544 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -311,8 +311,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); return -1; } - void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SMsgHead)); - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SMsgHead); + void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int64_t ver = pReader->pWalReader->pHead->head.version; #if 0 if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { @@ -560,7 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; - if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -1012,7 +1012,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; - if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -1022,7 +1022,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); if (pReader->pSchema == NULL) { tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 - "), version %d, possibly dropped table", + "), version %d, possibly dropped table", pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; @@ -1041,7 +1041,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema STSchema* pTschema = pReader->pSchema; SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; - int32_t numOfRows = 0; + int32_t numOfRows = 0; if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { SArray* pCols = pSubmitTbData->aCol; @@ -1054,7 +1054,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema int32_t curRow = 0; int32_t lastRow = 0; - char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); + char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); if (assigned == NULL) return -1; // convert and scan one block @@ -1064,9 +1064,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; - for (int32_t j = 0; j < numOfCols; j++){ + for (int32_t j = 0; j < numOfCols; j++) { SColData* pCol = taosArrayGet(pCols, j); - SColVal colVal; + SColVal colVal; tColDataGetValue(pCol, i, &colVal); if (curRow == 0) { assigned[j] = !COL_VAL_IS_NONE(&colVal); @@ -1087,9 +1087,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema lastRow = curRow; } - SSDataBlock block = {0}; + SSDataBlock block = {0}; SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if(pSW == NULL){ + if (pSW == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } @@ -1158,10 +1158,10 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema } else { SArray* pRows = pSubmitTbData->aRowP; for (int32_t i = 0; i < numOfRows; i++) { - SRow* pRow = taosArrayGetP(pRows, i); - bool buildNew = false; + SRow* pRow = taosArrayGetP(pRows, i); + bool buildNew = false; - for (int32_t j = 0; j < pTschema->numOfCols; j++){ + for (int32_t j = 0; j < pTschema->numOfCols; j++) { SColVal colVal; tRowGet(pRow, pTschema, j, &colVal); if (curRow == 0) { @@ -1183,9 +1183,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema lastRow = curRow; } - SSDataBlock block = {0}; + SSDataBlock block = {0}; SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if(pSW == NULL){ + if (pSW == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } @@ -1220,7 +1220,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema int32_t colActual = blockDataGetNumOfCols(pBlock); while (targetIdx < colActual) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); - SColVal colVal; + SColVal colVal; tRowGet(pRow, pTschema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { @@ -1256,7 +1256,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema taosMemoryFree(assigned); return 0; - FAIL: +FAIL: taosMemoryFree(assigned); return -1; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 29a25e4cd0..7a8d899a19 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -71,7 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl return 0; } - void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -324,7 +323,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d } static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { - int32_t ret = 0; + int32_t ret = 0; tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret); if (ret < 0) { @@ -340,7 +339,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t v ((SMsgHead*)(*pBuf))->vgId = vgId; ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen); SEncoder coder = {0}; - tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) ); + tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead)); if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) { rpcFreeCont(*pBuf); *pBuf = NULL; @@ -440,7 +439,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* goto _end; } for (int32_t rowId = 0; rowId < rows; rowId++) { - SVCreateTbReq createTbReq = {0}; + SVCreateTbReq createTbReq = {0}; SVCreateTbReq* pCreateTbReq = &createTbReq; if (!pCreateTbReq) { goto _end; @@ -482,9 +481,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); - STagVal tagVal = { - .cid = pTSchema->numOfCols + step, - .type = pTagData->info.type, + STagVal tagVal = { + .cid = pTSchema->numOfCols + step, + .type = pTagData->info.type, }; void* pData = colDataGetData(pTagData, rowId); if (colDataIsNull_s(pTagData, rowId)) { @@ -514,7 +513,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); if (colDataIsNull_s(pTbColInfo, rowId)) { SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); - void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); + void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); } else { void* pTbData = colDataGetData(pTbColInfo, rowId); @@ -639,16 +638,16 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* taosArrayClear(pVals); int32_t dataIndex = 0; for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn* pCol = &pTSchema->columns[k]; + const STColumn* pCol = &pTSchema->columns[k]; if (k == 0) { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); - void* colData = colDataGetData(pColData, j); + void* colData = colDataGetData(pColData, j); tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); } if (IS_SET_NULL(pCol)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); - } else{ + } else { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); if (colDataIsNull_s(pColData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); @@ -692,14 +691,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* int32_t code; tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code); SEncoder encoder; - len += sizeof(SMsgHead); + len += sizeof(SSubmitReq2Msg); pBuf = rpcMallocCont(len); if (NULL == pBuf) { goto _end; } - ((SMsgHead*)pBuf)->vgId = TD_VID(pVnode); - ((SMsgHead*)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode); + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to encode submit req since %s", terrstr()); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ae33da66b9..4c5202bc19 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -201,7 +201,7 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { SDecoder *pCoder = &(SDecoder){0}; - tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg)); if (tStartDecode(pCoder) < 0) { code = TSDB_CODE_INVALID_MSG; @@ -356,7 +356,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; /* TSDB */ case TDMT_VND_SUBMIT: - if (vnodeProcessSubmitReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; break; case TDMT_VND_DELETE: if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -981,6 +981,186 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, return TSDB_CODE_SUCCESS; } +typedef struct SSubmitReqConvertCxt { + SSubmitMsgIter msgIter; + SSubmitBlk *pBlock; + SSubmitBlkIter blkIter; + STSRow *pRow; + STSRowIter rowIter; + SSubmitTbData *pTbData; + STSchema *pTbSchema; + SArray *pColValues; +} SSubmitReqConvertCxt; + +static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) { + taosMemoryFreeClear(pCxt->pTbSchema); + pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid, pCxt->msgIter.sversion, 1); + if (NULL == pCxt->pTbSchema) { + return TSDB_CODE_INVALID_MSG; + } + tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema); + + tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE); + if (NULL == pCxt->pTbData) { + pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData)); + if (NULL == pCxt->pTbData) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + pCxt->pTbData->flags = 0; + pCxt->pTbData->suid = pCxt->msgIter.suid; + pCxt->pTbData->uid = pCxt->msgIter.uid; + pCxt->pTbData->sver = pCxt->msgIter.sversion; + pCxt->pTbData->pCreateTbReq = NULL; + pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES); + if (NULL == pCxt->pTbData->aRowP) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayDestroy(pCxt->pColValues); + pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal)); + if (NULL == pCxt->pColValues) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) { + SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type); + taosArrayPush(pCxt->pColValues, &val); + } + + return TSDB_CODE_SUCCESS; +} + +static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) { + taosMemoryFreeClear(pCxt->pTbSchema); + tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE); + taosMemoryFreeClear(pCxt->pTbData); + taosArrayDestroy(pCxt->pColValues); +} + +static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) { + if (tdValTypeIsNone(pCellVal->valType)) { + pColVal->flag = CV_FLAG_NONE; + return TSDB_CODE_SUCCESS; + } + + if (tdValTypeIsNull(pCellVal->valType)) { + pColVal->flag = CV_FLAG_NULL; + return TSDB_CODE_SUCCESS; + } + + if (IS_VAR_DATA_TYPE(pCol->type)) { + pColVal->value.nData = varDataLen(pCellVal->val); + pColVal->value.pData = varDataVal(pCellVal->val); + } else if (TSDB_DATA_TYPE_FLOAT == pCol->type) { + float f = GET_FLOAT_VAL(pCellVal->val); + memcpy(&pColVal->value.val, &f, sizeof(f)); + } else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) { + pColVal->value.val = *(int64_t *)pCellVal->val; + } else { + GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val); + } + + pColVal->flag = CV_FLAG_VALUE; + return TSDB_CODE_SUCCESS; +} + +static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) { + int32_t code = TSDB_CODE_SUCCESS; + tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow); + for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) { + STColumn *pCol = pCxt->pTbSchema->columns + i; + SCellVal cellVal = {0}; + if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) { + break; + } + code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i)); + } + return code; +} + +static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) { + if (pCxt->msgIter.schemaLen <= 0) { + return TSDB_CODE_SUCCESS; + } + + pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + if (NULL == pCxt->pTbData->pCreateTbReq) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SDecoder decoder = {0}; + tDecoderInit(&decoder, pCxt->pBlock->data, pCxt->msgIter.schemaLen); + int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq); + tDecoderClear(&decoder); + + return code; +} + +static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) { + pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); + if (NULL == pReq2->aSubmitTbData) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSubmitReqConvertCxt cxt = {0}; + + int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter); + while (TSDB_CODE_SUCCESS == code) { + code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock); + if (TSDB_CODE_SUCCESS == code) { + if (NULL == cxt.pBlock) { + break; + } + code = vnodeResetTableCxt(pVnode->pMeta, &cxt); + } + if (TSDB_CODE_SUCCESS == code) { + code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter); + } + if (TSDB_CODE_SUCCESS == code) { + code = vnodeDecodeCreateTbReq(&cxt); + } + while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) { + code = vnodeTSRowConvertToColValArray(&cxt); + if (TSDB_CODE_SUCCESS == code) { + SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1); + code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow); + } + } + if (TSDB_CODE_SUCCESS == code) { + code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS); + } + if (TSDB_CODE_SUCCESS == code) { + taosMemoryFreeClear(cxt.pTbData); + } + } + + vnodeDestroySubmitReqConvertCxt(&cxt); + return code; +} + +static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) { + int32_t code = TSDB_CODE_SUCCESS; + char *pMsg = NULL; + uint32_t msglen = 0; + tEncodeSize(tEncodeSSubmitReq2, pSubmitReq, msglen, code); + if (TSDB_CODE_SUCCESS == code) { + pMsg = taosMemoryMalloc(msglen); + if (NULL == pMsg) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + SEncoder encoder; + tEncoderInit(&encoder, pMsg, msglen); + code = tEncodeSSubmitReq2(&encoder, pSubmitReq); + tEncoderClear(&encoder); + } + if (TSDB_CODE_SUCCESS == code) { + *ppMsg = pMsg; + } + return code; +} + static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; terrno = 0; @@ -993,14 +1173,27 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = TSDB_CODE_SUCCESS; - // decode - SDecoder dc = {0}; - tDecoderInit(&dc, pReq, len); - if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; + SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq; + if (0 == pMsg->version) { + code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq); + if (TSDB_CODE_SUCCESS == code) { + code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq); + } + if (TSDB_CODE_SUCCESS != code) { + goto _exit; + } + } else { + // decode + pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg)); + len -= sizeof(SSubmitReq2Msg); + SDecoder dc = {0}; + tDecoderInit(&dc, pReq, len); + if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + tDecoderClear(&dc); } - tDecoderClear(&dc); for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); @@ -1140,11 +1333,15 @@ _exit: // clear taosArrayDestroy(newTbUids); - tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE); + tDestroySSubmitReq2(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE); tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE); if (code) terrno = code; + if (0 == pMsg->version) { + taosMemoryFree(pReq); + } + return code; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index a823baa2ae..16b43b560c 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -129,14 +129,15 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { SEncoder encoder; - len += sizeof(SMsgHead); + len += sizeof(SSubmitReq2Msg); pBuf = taosMemoryMalloc(len); if (NULL == pBuf) { return TSDB_CODE_OUT_OF_MEMORY; } - ((SMsgHead*)pBuf)->vgId = htonl(vgId); - ((SMsgHead*)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId); + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); code = tEncodeSSubmitReq2(&encoder, pReq); tEncoderClear(&encoder); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index fbdaf6fc9f..602d82cb38 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -290,7 +290,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* } int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode); if (TSDB_CODE_SUCCESS == code) { - void* pData = *pTableCxt; // deal scan coverity + void* pData = *pTableCxt; // deal scan coverity code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES); } return code; @@ -501,14 +501,15 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { SEncoder encoder; - len += sizeof(SMsgHead); + len += sizeof(SSubmitReq2Msg); pBuf = taosMemoryMalloc(len); if (NULL == pBuf) { return TSDB_CODE_OUT_OF_MEMORY; } - ((SMsgHead*)pBuf)->vgId = htonl(vgId); - ((SMsgHead*)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId); + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); code = tEncodeSSubmitReq2(&encoder, pReq); tEncoderClear(&encoder); } @@ -679,13 +680,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate pStart += BitmapLen(numOfRows); } char* pData = pStart; -// uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c])); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); fields += sizeof(int8_t) + sizeof(int32_t); - if(needChangeLength) { + if (needChangeLength) { pStart += htonl(colLength[c]); - }else{ + } else { pStart += colLength[c]; } }