diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 007c9c81e0..0c29f35f5b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3202,29 +3202,30 @@ int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); +#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 +#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 + typedef struct { - bool isColFmt; - int64_t suid; - int64_t uid; - int32_t sver; + int32_t flags; + SVCreateTbReq* pCreateTbReq; + int64_t suid; + int64_t uid; + int32_t sver; union { SArray* aRowP; SArray* aCol; }; } SSubmitTbData; -#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 -#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 typedef struct { - int32_t flag; - SArray* aCreateTbReq; - SArray* aSubmitTbData; + SArray* aSubmitTbData; // SArray } SSubmitReq2; int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); -int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq); -void tDestroySSubmitTbData(SSubmitTbData* pTbData); -void tDestroySSubmitReq2(SSubmitReq2* pReq); +int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); + +void tDestroySSubmitTbData(SSubmitTbData* pTbData); +void tDestroySSubmitReq2(SSubmitReq2* pReq); typedef struct { int32_t code; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7c865723f3..28de3edd5b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6651,20 +6651,28 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { return 0; } -static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData, int8_t colFmt) { +static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) { if (tStartEncode(pCoder) < 0) return -1; + if (tEncodeI32v(pCoder, pSubmitTbData->flags) < 0) return -1; + + // auto create table + if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + ASSERT(pSubmitTbData->pCreateTbReq); + if (tEncodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq) < 0) return -1; + } + + // submit data if (tEncodeI64(pCoder, pSubmitTbData->suid) < 0) return -1; if (tEncodeI64(pCoder, pSubmitTbData->uid) < 0) return -1; if (tEncodeI32v(pCoder, pSubmitTbData->sver) < 0) return -1; - if (colFmt) { - // todo - ASSERT(0); + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + ASSERT(0); // TODO } else { - if (tEncodeI64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1; - for (int32_t i = 0; i < taosArrayGetSize(pSubmitTbData->aRowP); i++) { - SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, i); + if (tEncodeU64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1; + for (int32_t iRow = 0; iRow < taosArrayGetSize(pSubmitTbData->aRowP); ++iRow) { + SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, iRow); if (pCoder->data) memcpy(pCoder->data + pCoder->pos, pRow, pRow->len); pCoder->pos += pRow->len; } @@ -6682,6 +6690,22 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa goto _exit; } + if (tDecodeI32v(pCoder, &pSubmitTbData->flags) < 0) return -1; + + if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + pSubmitTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + if (pSubmitTbData->pCreateTbReq == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + if (tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + } + + // submit data if (tDecodeI64(pCoder, &pSubmitTbData->suid) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; @@ -6695,24 +6719,24 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa goto _exit; } - if (colFmt) { - // todo - ASSERT(0); + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + ASSERT(0); // TODO } else { - int64_t nRows = 0; - if (tDecodeI64v(pCoder, &nRows) < 0) { + uint64_t nRow; + if (tDecodeU64v(pCoder, &nRow) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } - pSubmitTbData->aRowP = taosArrayInit(nRows, sizeof(SRow *)); + pSubmitTbData->aRowP = taosArrayInit(nRow, sizeof(SRow *)); if (pSubmitTbData->aRowP == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - for (int32_t i = 0; i < nRows; i++) { + for (int32_t iRow = 0; iRow < nRow; ++iRow) { SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); + *ppRow = (SRow *)(pCoder->data + pCoder->pos); pCoder->pos += (*ppRow)->len; } @@ -6722,7 +6746,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa _exit: if (code) { - // todo: do clear + // TODO: clear } return 0; } @@ -6730,35 +6754,19 @@ _exit: int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) { if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI32v(pCoder, pReq->flag) < 0) return -1; - - if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { - if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aCreateTbReq)) < 0) return -1; - for (int32_t i = 0; i < taosArrayGetSize(pReq->aCreateTbReq); ++i) { - SVCreateTbReq *pCreateTbReq = (SVCreateTbReq *)taosArrayGet(pReq->aCreateTbReq, i); - if (tEncodeSVCreateTbReq(pCoder, pCreateTbReq) < 0) return -1; - } - } - - if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1; - for (int32_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); ++i) { - SSubmitTbData *pSubmitTbData = (SSubmitTbData *)taosArrayGet(pReq->aSubmitTbData, i); - if (tEncodeSSubmitTbData(pCoder, pSubmitTbData, pReq->flag & SUBMIT_REQ_COLUMN_DATA_FORMAT) < 0) return -1; + if (tEncodeU64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1; + for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) { + if (tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)) < 0) return -1; } tEndEncode(pCoder); return 0; } -int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 **ppReq) { +int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 *pReq) { int32_t code = 0; - // alloc - SSubmitReq2 *pReq = (SSubmitReq2 *)taosMemoryCalloc(1, sizeof(SSubmitReq2)); - if (pReq == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } + memset(pReq, 0, sizeof(*pReq)); // decode if (tStartDecode(pCoder) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d3cc133be1..51defafe46 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -822,29 +822,12 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, return TSDB_CODE_SUCCESS; } -// static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { -// ASSERT(pMsg != NULL); -// SSubmitMsgIter msgIter = {0}; -// SMeta *pMeta = pVnode->pMeta; -// SSubmitBlk *pBlock = NULL; - -// if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; -// while (true) { -// if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; -// if (pBlock == NULL) break; - -// vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags); -// } - -// return 0; -// } - static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { #if 1 int32_t code = 0; - SSubmitRsp2 submitRsp = {0}; SSubmitReq2 *pSubmitReq = NULL; + SSubmitRsp2 *pSubmitRsp = NULL; SArray *newTbUids = NULL; // decode @@ -856,25 +839,39 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } tDecoderClear(&dc); - // auto create table - if (pSubmitReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { - for (int32_t iCreateTbReq = 0; iCreateTbReq < taosArrayGetSize(pSubmitReq->aCreateTbReq); iCreateTbReq++) { - SVCreateTbReq *pCreateTbReq = taosArrayGet(pSubmitReq->aCreateTbReq, iCreateTbReq); + // init + code = tCreateSSubmitRsp2(&pSubmitRsp); + if (code) goto _exit; - if (metaCreateTable(pVnode->pMeta, version, pCreateTbReq, NULL /* todo */) < 0) { - if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - // todo - } - } else { + // auto create table + for (int32_t iCreateTbReq = 0; iCreateTbReq < taosArrayGetSize(pSubmitReq->aCreateTbReq); iCreateTbReq++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(pSubmitReq->aCreateTbReq, iCreateTbReq); + + SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1); + if (pCreateTbRsp == NULL) { + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _exit; + } + + if (metaCreateTable(pVnode->pMeta, version, pCreateTbReq, &pCreateTbRsp->pMeta) < 0) { + if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { + // todo } + } else { + // todo } } - // check - for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { - SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); - // TODO - } + // // check + // for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { + // SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); + + // SMetaInfo info = {0}; + // code = metaGetInfo(pVnode->pMeta, pSubmitTbData->uid, &info, NULL); + // if (code) { + // // TODO + // } + // } // insert table data for (int32_t iSubmitTbData = 0; iSubmitTbData < taosArrayGetSize(pSubmitReq->aSubmitTbData); iSubmitTbData++) { @@ -882,7 +879,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SSubmitBlkRsp submitBlkRsp = {0}; - tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &submitBlkRsp); + code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &submitBlkRsp); + if (code) goto _exit; + + pSubmitRsp->affectedRows += taosArrayGetSize(pSubmitTbData->aRowP); } _exit: @@ -910,10 +910,6 @@ _exit: pSubmitReq->version = version; statis.nBatchInsert = 1; -#ifdef TD_DEBUG_PRINT_ROW - vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); -#endif - if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) { pRsp->code = terrno; goto _exit; diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index eccc9fd499..eb2d40d4a6 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -191,6 +191,8 @@ void* taosArrayReserve(SArray* pArray, int32_t num) { void* dst = TARRAY_GET_ELEM(pArray, pArray->size); pArray->size += num; + memset(dst, 0, num * pArray->elemSize); + return dst; } @@ -333,9 +335,9 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { } else { ASSERT(pSrc->elemSize == sizeof(void*)); - for(int32_t i = 0; i < pSrc->size; ++i) { + for (int32_t i = 0; i < pSrc->size; ++i) { void* p = fn(taosArrayGetP(pSrc, i)); - memcpy(((char*)dst->pData )+ i * dst->elemSize, &p, dst->elemSize); + memcpy(((char*)dst->pData) + i * dst->elemSize, &p, dst->elemSize); } }