diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0c29f35f5b..8ba2fee3fe 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3223,9 +3223,8 @@ typedef struct { int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); - -void tDestroySSubmitTbData(SSubmitTbData* pTbData); -void tDestroySSubmitReq2(SSubmitReq2* pReq); +void tDestroySSubmitTbData(SSubmitTbData* pTbData); +void tDestroySSubmitReq2(SSubmitReq2* pReq); typedef struct { int32_t code; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 49a1f5fdad..64a2712c13 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -160,7 +160,7 @@ int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, SSubmitBlkRsp* pRsp); +int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 6bb4111be5..bb6ee6d7c0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -29,7 +29,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, - SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp); + SSubmitTbData *pSubmitTbData, int32_t *affectedRows); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { int32_t code = 0; @@ -95,7 +95,7 @@ STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t return pTbData; } -int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) { +int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) { int32_t code = 0; SMemTable *pMemTable = pTsdb->mem; STbData *pTbData = NULL; @@ -133,7 +133,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi } // do insert impl - code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pSubmitTbData, pRsp); + code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pSubmitTbData, affectedRows); if (code) { goto _err; } @@ -539,7 +539,7 @@ _exit: } static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, - SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) { + SSubmitTbData *pSubmitTbData, int32_t *affectedRows) { int32_t code = 0; SRow **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); @@ -608,8 +608,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey); pMemTable->nRow += nRow; - if (pRsp) pRsp->numOfRows = nRow; - if (pRsp) pRsp->affectedRows = nRow; + if (affectedRows) *affectedRows = nRow; return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7bf9fb5f04..2f77da1489 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -851,8 +851,12 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq int32_t code = 0; SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0}; - SSubmitRsp2 *pSubmitRsp = NULL; + SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0}; SArray *newTbUids = NULL; + int32_t ret; + SEncoder ec = {0}; + + pRsp->code = TSDB_CODE_SUCCESS; // decode SDecoder dc = {0}; @@ -864,7 +868,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq tDecoderClear(&dc); // check - for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { + for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); if (pSubmitTbData->pCreateTbReq) { @@ -895,42 +899,91 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } // loop to handle - for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { + for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); // create table if (pSubmitTbData->pCreateTbReq) { - if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, NULL /* todo */) < 0) { - // TODO - } else { - // TODO + // check (TODO: move check to create table) + code = grantCheck(TSDB_GRANT_TIMESERIES); + if (code) goto _exit; + + code = grantCheck(TSDB_GRANT_TABLE); + if (code) goto _exit; + + // alloc if need + if (pSubmitRsp->aCreateTbRsp == NULL && + (pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) == + NULL) { + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _exit; + } + + SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1); + + // create table + if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == + 0) { // create table success + + if (newTbUids == NULL && + (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) { + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _exit; + } + + taosArrayPush(newTbUids, &pSubmitTbData->uid); + + if (pCreateTbRsp->pMeta) { + // TODO + } + } else { // create table failed + if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { + code = terrno; + goto _exit; + } } } // insert data - tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, NULL /* todo */); - // TODO: handle result + int32_t affectedRows; + code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &affectedRows); + if (code) goto _exit; + + pSubmitRsp->affectedRows += affectedRows; + } + + // update table uid list + if (taosArrayGetSize(newTbUids) > 0) { + vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), + (int32_t)taosArrayGetSize(newTbUids)); + tqUpdateTbUidList(pVnode->pTq, newTbUids, true); } _exit: - if (code) { - } else { - } + // message + pRsp->code = code; + tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret); + pRsp->pCont = rpcMallocCont(pRsp->contLen); + tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); + tEncodeSSubmitRsp2(&ec, pSubmitRsp); + tEncoderClear(&ec); + + // clear + taosArrayDestroy(newTbUids); + tDestroySSubmitReq2(pSubmitReq); + tDestroySSubmitRsp2(pSubmitRsp); + return code; #else - SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - SSubmitRsp submitRsp = {0}; - SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock = NULL; - SVCreateTbReq createTbReq = {0}; - SDecoder decoder = {0}; - int32_t nRows = 0; - int32_t tsize, ret; - SEncoder encoder = {0}; - SArray *newTbUids = NULL; - SVStatis statis = {0}; - bool tbCreated = false; + SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; + SSubmitRsp submitRsp = {0}; + int32_t nRows = 0; + int32_t tsize, ret; + SEncoder encoder = {0}; + SArray *newTbUids = NULL; + SVStatis statis = {0}; + bool tbCreated = false; terrno = TSDB_CODE_SUCCESS; pRsp->code = 0; @@ -942,12 +995,6 @@ _exit: goto _exit; } - // handle the request - if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) { - pRsp->code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp)); newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t)); if (!submitRsp.pArray || !newTbUids) { @@ -964,42 +1011,42 @@ _exit: // create table for auto create table mode if (msgIter.schemaLen > 0) { - tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); - if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { - pRsp->code = TSDB_CODE_INVALID_MSG; - tDecoderClear(&decoder); - taosArrayDestroy(createTbReq.ctb.tagName); - goto _exit; - } + // tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); + // if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { + // pRsp->code = TSDB_CODE_INVALID_MSG; + // tDecoderClear(&decoder); + // taosArrayDestroy(createTbReq.ctb.tagName); + // goto _exit; + // } - if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { - pRsp->code = terrno; - tDecoderClear(&decoder); - taosArrayDestroy(createTbReq.ctb.tagName); - goto _exit; - } + // if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { + // pRsp->code = terrno; + // tDecoderClear(&decoder); + // taosArrayDestroy(createTbReq.ctb.tagName); + // goto _exit; + // } - if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) { - pRsp->code = terrno; - tDecoderClear(&decoder); - taosArrayDestroy(createTbReq.ctb.tagName); - goto _exit; - } + // if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) { + // pRsp->code = terrno; + // tDecoderClear(&decoder); + // taosArrayDestroy(createTbReq.ctb.tagName); + // goto _exit; + // } if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) { - if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - submitBlkRsp.code = terrno; - pRsp->code = terrno; - tDecoderClear(&decoder); - taosArrayDestroy(createTbReq.ctb.tagName); - goto _exit; - } + // if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { + // submitBlkRsp.code = terrno; + // pRsp->code = terrno; + // tDecoderClear(&decoder); + // taosArrayDestroy(createTbReq.ctb.tagName); + // goto _exit; + // } } else { if (NULL != submitBlkRsp.pMeta) { vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); } - taosArrayPush(newTbUids, &createTbReq.uid); + // taosArrayPush(newTbUids, &createTbReq.uid); submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); @@ -1007,18 +1054,15 @@ _exit: tbCreated = true; } - msgIter.uid = createTbReq.uid; - if (createTbReq.type == TSDB_CHILD_TABLE) { - msgIter.suid = createTbReq.ctb.suid; - } else { - msgIter.suid = 0; - } + // msgIter.uid = createTbReq.uid; + // if (createTbReq.type == TSDB_CHILD_TABLE) { + // msgIter.suid = createTbReq.ctb.suid; + // } else { + // msgIter.suid = 0; + // } -#ifdef TD_DEBUG_PRINT_ROW - vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid"); -#endif - tDecoderClear(&decoder); - taosArrayDestroy(createTbReq.ctb.tagName); + // tDecoderClear(&decoder); + // taosArrayDestroy(createTbReq.ctb.tagName); } if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { @@ -1032,21 +1076,21 @@ _exit: } } - if (taosArrayGetSize(newTbUids) > 0) { - vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), - (int32_t)taosArrayGetSize(newTbUids)); - } + // if (taosArrayGetSize(newTbUids) > 0) { + // vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), + // (int32_t)taosArrayGetSize(newTbUids)); + // } - tqUpdateTbUidList(pVnode->pTq, newTbUids, true); + // tqUpdateTbUidList(pVnode->pTq, newTbUids, true); _exit: taosArrayDestroy(newTbUids); - tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); - pRsp->pCont = rpcMallocCont(tsize); - pRsp->contLen = tsize; - tEncoderInit(&encoder, pRsp->pCont, tsize); - tEncodeSSubmitRsp(&encoder, &submitRsp); - tEncoderClear(&encoder); + // tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); + // pRsp->pCont = rpcMallocCont(tsize); + // pRsp->contLen = tsize; + // tEncoderInit(&encoder, pRsp->pCont, tsize); + // tEncodeSSubmitRsp(&encoder, &submitRsp); + // tEncoderClear(&encoder); taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);