From 3ae79a77c75b0a3a4443f7d1017342438743e005 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 7 May 2022 01:47:45 +0800 Subject: [PATCH] enh(stream): fix create stb in stream --- source/common/src/tdatablock.c | 29 +++--- source/dnode/mnode/impl/inc/mndStb.h | 5 ++ source/dnode/mnode/impl/src/mndStb.c | 112 +++++++++++++----------- source/dnode/mnode/impl/src/mndStream.c | 76 +++++++++++++++- source/libs/sync/src/syncRaftLog.c | 2 +- 5 files changed, 159 insertions(+), 65 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index a4b36dcb67..2c17f2c2fc 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1478,11 +1478,11 @@ void blockDebugShowData(const SArray* dataBlocks) { * @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in * SDataBlock->info.uid * @param suid // TODO: check with Liao whether suid response is reasonable - * + * * TODO: colId should be set */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid, - tb_uid_t suid) { +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, + tb_uid_t uid, tb_uid_t suid) { int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); for (int32_t i = 0; i < sz; ++i) { @@ -1494,16 +1494,16 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ASSERT(bufSize < 3 * 1024 * 1024); *pReq = taosMemoryCalloc(1, bufSize); - if(!(*pReq)) { + if (!(*pReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } void* pDataBuf = *pReq; - int32_t msgLen = sizeof(SSubmitReq); + int32_t msgLen = sizeof(SSubmitReq); int32_t numOfBlks = 0; SRowBuilder rb = {0}; - tdSRowInit(&rb, 0); // TODO: use the latest version + tdSRowInit(&rb, 0); // TODO: use the latest version for (int32_t i = 0; i < sz; ++i) { SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); @@ -1511,8 +1511,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks int32_t rows = pDataBlock->info.rows; int32_t rowSize = pDataBlock->info.rowSize; int64_t groupId = pDataBlock->info.groupId; - - if(rb.nCols != colNum) { + + if (rb.nCols != colNum) { tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); } @@ -1525,10 +1525,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks msgLen += sizeof(SSubmitBlk); int32_t dataLen = 0; - for (int32_t j = 0; j < rows; ++j) { // iterate by row - tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf + for (int32_t j = 0; j < rows; ++j) { // iterate by row + tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf printf("|"); - bool isStartKey = false; + bool isStartKey = false; for (int32_t k = 0; k < colNum; ++k) { // iterate by column SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); @@ -1536,7 +1536,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks case TSDB_DATA_TYPE_TIMESTAMP: if (!isStartKey) { isStartKey = true; - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 0, 0); + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, + 0, 0); } else { tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 8, k); break; @@ -1629,14 +1630,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { blkHead->uid = htobe64(pDataBlock->info.uid); int32_t rows = pDataBlock->info.rows; - int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + /*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/ /*blkHead->dataLen = htonl(rows * maxLen);*/ blkHead->dataLen = 0; void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); STSRow* rowData = blockData; - for (int32_t j = 0; j < pDataBlock->info.rows; j++) { + for (int32_t j = 0; j < rows; j++) { SRowBuilder rb = {0}; tdSRowInit(&rb, pTSchema->version); tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index a415d39434..28d3215b98 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -31,6 +31,11 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t n int32_t *pRspLen); int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs); +int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate); +SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName); +int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb); +int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8ae0d5d19c..cd76c7c8bb 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -342,7 +342,7 @@ void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) { sdbRelease(pSdb, pStb); } -static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { +SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { SName name = {0}; tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -463,7 +463,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, return pHead; } -static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { +int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { if (pCreate->igExists < 0 || pCreate->igExists > 1) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; return -1; @@ -634,91 +634,96 @@ static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) { return NULL; } -static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { - SStbObj stbObj = {0}; - memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); - memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN); - stbObj.createdTime = taosGetTimestampMs(); - stbObj.updateTime = stbObj.createdTime; - stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); - stbObj.dbUid = pDb->uid; - stbObj.version = 1; - stbObj.nextColId = 1; - stbObj.xFilesFactor = pCreate->xFilesFactor; - stbObj.delay = pCreate->delay; - stbObj.ttl = pCreate->ttl; - stbObj.numOfColumns = pCreate->numOfColumns; - stbObj.numOfTags = pCreate->numOfTags; - stbObj.commentLen = pCreate->commentLen; - if (stbObj.commentLen > 0) { - stbObj.comment = taosMemoryCalloc(stbObj.commentLen, 1); - if (stbObj.comment == NULL) { +int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb) { + memcpy(pDst->name, pCreate->name, TSDB_TABLE_FNAME_LEN); + memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN); + pDst->createdTime = taosGetTimestampMs(); + pDst->updateTime = pDst->createdTime; + pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + pDst->dbUid = pDb->uid; + pDst->version = 1; + pDst->nextColId = 1; + pDst->xFilesFactor = pCreate->xFilesFactor; + pDst->delay = pCreate->delay; + pDst->ttl = pCreate->ttl; + pDst->numOfColumns = pCreate->numOfColumns; + pDst->numOfTags = pCreate->numOfTags; + pDst->commentLen = pCreate->commentLen; + if (pDst->commentLen > 0) { + pDst->comment = taosMemoryCalloc(pDst->commentLen, 1); + if (pDst->comment == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen); + memcpy(pDst->comment, pCreate->comment, pDst->commentLen); } - stbObj.ast1Len = pCreate->ast1Len; - if (stbObj.ast1Len > 0) { - stbObj.pAst1 = taosMemoryCalloc(stbObj.ast1Len, 1); - if (stbObj.pAst1 == NULL) { + pDst->ast1Len = pCreate->ast1Len; + if (pDst->ast1Len > 0) { + pDst->pAst1 = taosMemoryCalloc(pDst->ast1Len, 1); + if (pDst->pAst1 == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(stbObj.pAst1, pCreate->pAst1, stbObj.ast1Len); + memcpy(pDst->pAst1, pCreate->pAst1, pDst->ast1Len); } - stbObj.ast2Len = pCreate->ast2Len; - if (stbObj.ast2Len > 0) { - stbObj.pAst2 = taosMemoryCalloc(stbObj.ast2Len, 1); - if (stbObj.pAst2 == NULL) { + pDst->ast2Len = pCreate->ast2Len; + if (pDst->ast2Len > 0) { + pDst->pAst2 = taosMemoryCalloc(pDst->ast2Len, 1); + if (pDst->pAst2 == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(stbObj.pAst2, pCreate->pAst2, stbObj.ast2Len); + memcpy(pDst->pAst2, pCreate->pAst2, pDst->ast2Len); } - stbObj.pColumns = taosMemoryCalloc(1, stbObj.numOfColumns * sizeof(SSchema)); - stbObj.pTags = taosMemoryCalloc(1, stbObj.numOfTags * sizeof(SSchema)); - if (stbObj.pColumns == NULL || stbObj.pTags == NULL) { + pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema)); + pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema)); + if (pDst->pColumns == NULL || pDst->pTags == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - for (int32_t i = 0; i < stbObj.numOfColumns; ++i) { + for (int32_t i = 0; i < pDst->numOfColumns; ++i) { SField *pField = taosArrayGet(pCreate->pColumns, i); - SSchema *pSchema = &stbObj.pColumns[i]; + SSchema *pSchema = &pDst->pColumns[i]; pSchema->type = pField->type; pSchema->bytes = pField->bytes; pSchema->flags = pField->flags; memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); - pSchema->colId = stbObj.nextColId; - stbObj.nextColId++; + pSchema->colId = pDst->nextColId; + pDst->nextColId++; } - for (int32_t i = 0; i < stbObj.numOfTags; ++i) { + for (int32_t i = 0; i < pDst->numOfTags; ++i) { SField *pField = taosArrayGet(pCreate->pTags, i); - SSchema *pSchema = &stbObj.pTags[i]; + SSchema *pSchema = &pDst->pTags[i]; pSchema->type = pField->type; pSchema->bytes = pField->bytes; memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); - pSchema->colId = stbObj.nextColId; - stbObj.nextColId++; + pSchema->colId = pDst->nextColId; + pDst->nextColId++; } + return 0; +} + +static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { + SStbObj stbObj = {0}; int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg); if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); - mndTransSetDbInfo(pTrans, pDb); - if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; - if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; - if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; - if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; - if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER; + if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) { + goto _OVER; + } + + if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -727,6 +732,15 @@ _OVER: mndTransDrop(pTrans); return code; } +int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + mndTransSetDbInfo(pTrans, pDb); + if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; + if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; + if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; + if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) return -1; + if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb) != 0) return -1; + return 0; +} static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5c6e2ce771..1404b1cd94 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -290,6 +290,72 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return 0; } +static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { + SStbObj *pStb = NULL; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; + + SMCreateStbReq createReq = {0}; + tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); + createReq.numOfColumns = pStream->outputSchema.nCols; + createReq.numOfTags = 1; // group id + createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField)); + // build fields + // build tags + + if (mndCheckCreateStbReq(&createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + pStb = mndAcquireStb(pMnode, createReq.name); + if (pStb != NULL) { + terrno = TSDB_CODE_MND_STB_ALREADY_EXIST; + goto _OVER; + } + + pDb = mndAcquireDbByStb(pMnode, createReq.name); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + goto _OVER; + } + + pUser = mndAcquireUser(pMnode, user); + if (pUser == NULL) { + goto _OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto _OVER; + } + + int32_t numOfStbs = -1; + mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs); + if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { + terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; + goto _OVER; + } + + SStbObj stbObj = {0}; + + if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) { + goto _OVER; + } + + if (mndBuildStbFromReq(pMnode, pStb, &createReq, pDb) != 0) { + goto _OVER; + } + + if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER; + + return pStb; +_OVER: + mndReleaseStb(pMnode, pStb); + mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); + return NULL; +} + static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; @@ -311,13 +377,21 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.trigger = pCreate->triggerType; streamObj.waterMark = pCreate->watermark; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); +#if 0 + if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->user) < 0) { + mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } +#endif + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 5fa77c9964..6bf17b8a82 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -57,7 +57,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); - perror("wal write error: "); + if (code < 0) perror("wal write error: "); assert(code == 0); walFsync(pWal, true);