more code

This commit is contained in:
Hongze Cheng 2022-11-29 17:29:49 +08:00
parent c519e51d0b
commit 25ec924c91
4 changed files with 131 additions and 89 deletions

View File

@ -3223,9 +3223,8 @@ typedef struct {
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData);
void tDestroySSubmitTbData(SSubmitTbData* pTbData); void tDestroySSubmitReq2(SSubmitReq2* pReq);
void tDestroySSubmitReq2(SSubmitReq2* pReq);
typedef struct { typedef struct {
int32_t code; int32_t code;

View File

@ -160,7 +160,7 @@ int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, SSubmitBlkRsp* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);

View File

@ -29,7 +29,7 @@
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags); static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData); static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,
SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp); SSubmitTbData *pSubmitTbData, int32_t *affectedRows);
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
int32_t code = 0; int32_t code = 0;
@ -95,7 +95,7 @@ STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t
return pTbData; return pTbData;
} }
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) { int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
int32_t code = 0; int32_t code = 0;
SMemTable *pMemTable = pTsdb->mem; SMemTable *pMemTable = pTsdb->mem;
STbData *pTbData = NULL; STbData *pTbData = NULL;
@ -133,7 +133,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi
} }
// do insert impl // do insert impl
code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pSubmitTbData, pRsp); code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pSubmitTbData, affectedRows);
if (code) { if (code) {
goto _err; goto _err;
} }
@ -539,7 +539,7 @@ _exit:
} }
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version, static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,
SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) { SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
int32_t code = 0; int32_t code = 0;
SRow **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); SRow **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
@ -608,8 +608,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey); pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
pMemTable->nRow += nRow; pMemTable->nRow += nRow;
if (pRsp) pRsp->numOfRows = nRow; if (affectedRows) *affectedRows = nRow;
if (pRsp) pRsp->affectedRows = nRow;
return code; return code;

View File

@ -851,8 +851,12 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
int32_t code = 0; int32_t code = 0;
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0}; SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
SSubmitRsp2 *pSubmitRsp = NULL; SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0};
SArray *newTbUids = NULL; SArray *newTbUids = NULL;
int32_t ret;
SEncoder ec = {0};
pRsp->code = TSDB_CODE_SUCCESS;
// decode // decode
SDecoder dc = {0}; SDecoder dc = {0};
@ -864,7 +868,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderClear(&dc); tDecoderClear(&dc);
// check // check
for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
if (pSubmitTbData->pCreateTbReq) { if (pSubmitTbData->pCreateTbReq) {
@ -895,42 +899,91 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
} }
// loop to handle // loop to handle
for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
// create table // create table
if (pSubmitTbData->pCreateTbReq) { if (pSubmitTbData->pCreateTbReq) {
if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, NULL /* todo */) < 0) { // check (TODO: move check to create table)
// TODO code = grantCheck(TSDB_GRANT_TIMESERIES);
} else { if (code) goto _exit;
// TODO
code = grantCheck(TSDB_GRANT_TABLE);
if (code) goto _exit;
// alloc if need
if (pSubmitRsp->aCreateTbRsp == NULL &&
(pSubmitRsp->aCreateTbRsp = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(SVCreateTbRsp))) ==
NULL) {
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _exit;
}
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
// create table
if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) ==
0) { // create table success
if (newTbUids == NULL &&
(newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _exit;
}
taosArrayPush(newTbUids, &pSubmitTbData->uid);
if (pCreateTbRsp->pMeta) {
// TODO
}
} else { // create table failed
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
code = terrno;
goto _exit;
}
} }
} }
// insert data // insert data
tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, NULL /* todo */); int32_t affectedRows;
// TODO: handle result code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, &affectedRows);
if (code) goto _exit;
pSubmitRsp->affectedRows += affectedRows;
}
// update table uid list
if (taosArrayGetSize(newTbUids) > 0) {
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
(int32_t)taosArrayGetSize(newTbUids));
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
} }
_exit: _exit:
if (code) { // message
} else { pRsp->code = code;
} tEncodeSize(tEncodeSSubmitRsp2, pSubmitRsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
tEncodeSSubmitRsp2(&ec, pSubmitRsp);
tEncoderClear(&ec);
// clear
taosArrayDestroy(newTbUids);
tDestroySSubmitReq2(pSubmitReq);
tDestroySSubmitRsp2(pSubmitRsp);
return code; return code;
#else #else
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0}; SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0}; int32_t nRows = 0;
SSubmitBlk *pBlock = NULL; int32_t tsize, ret;
SVCreateTbReq createTbReq = {0}; SEncoder encoder = {0};
SDecoder decoder = {0}; SArray *newTbUids = NULL;
int32_t nRows = 0; SVStatis statis = {0};
int32_t tsize, ret; bool tbCreated = false;
SEncoder encoder = {0};
SArray *newTbUids = NULL;
SVStatis statis = {0};
bool tbCreated = false;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pRsp->code = 0; pRsp->code = 0;
@ -942,12 +995,6 @@ _exit:
goto _exit; goto _exit;
} }
// handle the request
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp)); submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t)); newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
if (!submitRsp.pArray || !newTbUids) { if (!submitRsp.pArray || !newTbUids) {
@ -964,42 +1011,42 @@ _exit:
// create table for auto create table mode // create table for auto create table mode
if (msgIter.schemaLen > 0) { if (msgIter.schemaLen > 0) {
tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); // tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { // if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG; // pRsp->code = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder); // tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName); // taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit; // goto _exit;
} // }
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { // if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
pRsp->code = terrno; // pRsp->code = terrno;
tDecoderClear(&decoder); // tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName); // taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit; // goto _exit;
} // }
if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) { // if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) {
pRsp->code = terrno; // pRsp->code = terrno;
tDecoderClear(&decoder); // tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName); // taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit; // goto _exit;
} // }
if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) { if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { // if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
submitBlkRsp.code = terrno; // submitBlkRsp.code = terrno;
pRsp->code = terrno; // pRsp->code = terrno;
tDecoderClear(&decoder); // tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName); // taosArrayDestroy(createTbReq.ctb.tagName);
goto _exit; // goto _exit;
} // }
} else { } else {
if (NULL != submitBlkRsp.pMeta) { if (NULL != submitBlkRsp.pMeta) {
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
} }
taosArrayPush(newTbUids, &createTbReq.uid); // taosArrayPush(newTbUids, &createTbReq.uid);
submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
@ -1007,18 +1054,15 @@ _exit:
tbCreated = true; tbCreated = true;
} }
msgIter.uid = createTbReq.uid; // msgIter.uid = createTbReq.uid;
if (createTbReq.type == TSDB_CHILD_TABLE) { // if (createTbReq.type == TSDB_CHILD_TABLE) {
msgIter.suid = createTbReq.ctb.suid; // msgIter.suid = createTbReq.ctb.suid;
} else { // } else {
msgIter.suid = 0; // msgIter.suid = 0;
} // }
#ifdef TD_DEBUG_PRINT_ROW // tDecoderClear(&decoder);
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid"); // taosArrayDestroy(createTbReq.ctb.tagName);
#endif
tDecoderClear(&decoder);
taosArrayDestroy(createTbReq.ctb.tagName);
} }
if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
@ -1032,21 +1076,21 @@ _exit:
} }
} }
if (taosArrayGetSize(newTbUids) > 0) { // if (taosArrayGetSize(newTbUids) > 0) {
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), // vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
(int32_t)taosArrayGetSize(newTbUids)); // (int32_t)taosArrayGetSize(newTbUids));
} // }
tqUpdateTbUidList(pVnode->pTq, newTbUids, true); // tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
_exit: _exit:
taosArrayDestroy(newTbUids); taosArrayDestroy(newTbUids);
tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); // tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
pRsp->pCont = rpcMallocCont(tsize); // pRsp->pCont = rpcMallocCont(tsize);
pRsp->contLen = tsize; // pRsp->contLen = tsize;
tEncoderInit(&encoder, pRsp->pCont, tsize); // tEncoderInit(&encoder, pRsp->pCont, tsize);
tEncodeSSubmitRsp(&encoder, &submitRsp); // tEncodeSSubmitRsp(&encoder, &submitRsp);
tEncoderClear(&encoder); // tEncoderClear(&encoder);
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);