enh(stream): fix create stb in stream
This commit is contained in:
parent
6b18948884
commit
3ae79a77c7
|
@ -1478,11 +1478,11 @@ void blockDebugShowData(const SArray* dataBlocks) {
|
|||
* @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in
|
||||
* SDataBlock->info.uid
|
||||
* @param suid // TODO: check with Liao whether suid response is reasonable
|
||||
*
|
||||
*
|
||||
* TODO: colId should be set
|
||||
*/
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid,
|
||||
tb_uid_t suid) {
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t uid, tb_uid_t suid) {
|
||||
int32_t sz = taosArrayGetSize(pDataBlocks);
|
||||
int32_t bufSize = sizeof(SSubmitReq);
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
|
@ -1494,16 +1494,16 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
ASSERT(bufSize < 3 * 1024 * 1024);
|
||||
|
||||
*pReq = taosMemoryCalloc(1, bufSize);
|
||||
if(!(*pReq)) {
|
||||
if (!(*pReq)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
void* pDataBuf = *pReq;
|
||||
|
||||
int32_t msgLen = sizeof(SSubmitReq);
|
||||
int32_t msgLen = sizeof(SSubmitReq);
|
||||
int32_t numOfBlks = 0;
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, 0); // TODO: use the latest version
|
||||
tdSRowInit(&rb, 0); // TODO: use the latest version
|
||||
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
|
||||
|
@ -1511,8 +1511,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
int64_t groupId = pDataBlock->info.groupId;
|
||||
|
||||
if(rb.nCols != colNum) {
|
||||
|
||||
if (rb.nCols != colNum) {
|
||||
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
|
||||
}
|
||||
|
||||
|
@ -1525,10 +1525,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
|
||||
msgLen += sizeof(SSubmitBlk);
|
||||
int32_t dataLen = 0;
|
||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
|
||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
|
||||
printf("|");
|
||||
bool isStartKey = false;
|
||||
bool isStartKey = false;
|
||||
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
|
@ -1536,7 +1536,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
if (!isStartKey) {
|
||||
isStartKey = true;
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 0, 0);
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
|
||||
0, 0);
|
||||
} else {
|
||||
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 8, k);
|
||||
break;
|
||||
|
@ -1629,14 +1630,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
|||
blkHead->uid = htobe64(pDataBlock->info.uid);
|
||||
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
|
||||
/*blkHead->dataLen = htonl(rows * maxLen);*/
|
||||
blkHead->dataLen = 0;
|
||||
|
||||
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
|
||||
STSRow* rowData = blockData;
|
||||
|
||||
for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, pTSchema->version);
|
||||
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
||||
|
|
|
@ -31,6 +31,11 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t n
|
|||
int32_t *pRspLen);
|
||||
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
|
||||
|
||||
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
|
||||
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
|
||||
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -342,7 +342,7 @@ void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
|
|||
sdbRelease(pSdb, pStb);
|
||||
}
|
||||
|
||||
static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
|
||||
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
|
@ -463,7 +463,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
|
|||
return pHead;
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||
int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||
if (pCreate->igExists < 0 || pCreate->igExists > 1) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
|
@ -634,91 +634,96 @@ static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
SStbObj stbObj = {0};
|
||||
memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
stbObj.createdTime = taosGetTimestampMs();
|
||||
stbObj.updateTime = stbObj.createdTime;
|
||||
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
stbObj.dbUid = pDb->uid;
|
||||
stbObj.version = 1;
|
||||
stbObj.nextColId = 1;
|
||||
stbObj.xFilesFactor = pCreate->xFilesFactor;
|
||||
stbObj.delay = pCreate->delay;
|
||||
stbObj.ttl = pCreate->ttl;
|
||||
stbObj.numOfColumns = pCreate->numOfColumns;
|
||||
stbObj.numOfTags = pCreate->numOfTags;
|
||||
stbObj.commentLen = pCreate->commentLen;
|
||||
if (stbObj.commentLen > 0) {
|
||||
stbObj.comment = taosMemoryCalloc(stbObj.commentLen, 1);
|
||||
if (stbObj.comment == NULL) {
|
||||
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
memcpy(pDst->name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
pDst->createdTime = taosGetTimestampMs();
|
||||
pDst->updateTime = pDst->createdTime;
|
||||
pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDst->dbUid = pDb->uid;
|
||||
pDst->version = 1;
|
||||
pDst->nextColId = 1;
|
||||
pDst->xFilesFactor = pCreate->xFilesFactor;
|
||||
pDst->delay = pCreate->delay;
|
||||
pDst->ttl = pCreate->ttl;
|
||||
pDst->numOfColumns = pCreate->numOfColumns;
|
||||
pDst->numOfTags = pCreate->numOfTags;
|
||||
pDst->commentLen = pCreate->commentLen;
|
||||
if (pDst->commentLen > 0) {
|
||||
pDst->comment = taosMemoryCalloc(pDst->commentLen, 1);
|
||||
if (pDst->comment == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen);
|
||||
memcpy(pDst->comment, pCreate->comment, pDst->commentLen);
|
||||
}
|
||||
|
||||
stbObj.ast1Len = pCreate->ast1Len;
|
||||
if (stbObj.ast1Len > 0) {
|
||||
stbObj.pAst1 = taosMemoryCalloc(stbObj.ast1Len, 1);
|
||||
if (stbObj.pAst1 == NULL) {
|
||||
pDst->ast1Len = pCreate->ast1Len;
|
||||
if (pDst->ast1Len > 0) {
|
||||
pDst->pAst1 = taosMemoryCalloc(pDst->ast1Len, 1);
|
||||
if (pDst->pAst1 == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.pAst1, pCreate->pAst1, stbObj.ast1Len);
|
||||
memcpy(pDst->pAst1, pCreate->pAst1, pDst->ast1Len);
|
||||
}
|
||||
|
||||
stbObj.ast2Len = pCreate->ast2Len;
|
||||
if (stbObj.ast2Len > 0) {
|
||||
stbObj.pAst2 = taosMemoryCalloc(stbObj.ast2Len, 1);
|
||||
if (stbObj.pAst2 == NULL) {
|
||||
pDst->ast2Len = pCreate->ast2Len;
|
||||
if (pDst->ast2Len > 0) {
|
||||
pDst->pAst2 = taosMemoryCalloc(pDst->ast2Len, 1);
|
||||
if (pDst->pAst2 == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.pAst2, pCreate->pAst2, stbObj.ast2Len);
|
||||
memcpy(pDst->pAst2, pCreate->pAst2, pDst->ast2Len);
|
||||
}
|
||||
|
||||
stbObj.pColumns = taosMemoryCalloc(1, stbObj.numOfColumns * sizeof(SSchema));
|
||||
stbObj.pTags = taosMemoryCalloc(1, stbObj.numOfTags * sizeof(SSchema));
|
||||
if (stbObj.pColumns == NULL || stbObj.pTags == NULL) {
|
||||
pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema));
|
||||
pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema));
|
||||
if (pDst->pColumns == NULL || pDst->pTags == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
|
||||
for (int32_t i = 0; i < pDst->numOfColumns; ++i) {
|
||||
SField *pField = taosArrayGet(pCreate->pColumns, i);
|
||||
SSchema *pSchema = &stbObj.pColumns[i];
|
||||
SSchema *pSchema = &pDst->pColumns[i];
|
||||
pSchema->type = pField->type;
|
||||
pSchema->bytes = pField->bytes;
|
||||
pSchema->flags = pField->flags;
|
||||
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||
pSchema->colId = stbObj.nextColId;
|
||||
stbObj.nextColId++;
|
||||
pSchema->colId = pDst->nextColId;
|
||||
pDst->nextColId++;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stbObj.numOfTags; ++i) {
|
||||
for (int32_t i = 0; i < pDst->numOfTags; ++i) {
|
||||
SField *pField = taosArrayGet(pCreate->pTags, i);
|
||||
SSchema *pSchema = &stbObj.pTags[i];
|
||||
SSchema *pSchema = &pDst->pTags[i];
|
||||
pSchema->type = pField->type;
|
||||
pSchema->bytes = pField->bytes;
|
||||
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||
pSchema->colId = stbObj.nextColId;
|
||||
stbObj.nextColId++;
|
||||
pSchema->colId = pDst->nextColId;
|
||||
pDst->nextColId++;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
SStbObj stbObj = {0};
|
||||
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||
mndTransSetDbInfo(pTrans, pDb);
|
||||
|
||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
@ -727,6 +732,15 @@ _OVER:
|
|||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
mndTransSetDbInfo(pTrans, pDb);
|
||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pNode;
|
||||
|
|
|
@ -290,6 +290,72 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SStbObj *mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
||||
SStbObj *pStb = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SUserObj *pUser = NULL;
|
||||
|
||||
SMCreateStbReq createReq = {0};
|
||||
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
createReq.numOfColumns = pStream->outputSchema.nCols;
|
||||
createReq.numOfTags = 1; // group id
|
||||
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
||||
// build fields
|
||||
// build tags
|
||||
|
||||
if (mndCheckCreateStbReq(&createReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pStb = mndAcquireStb(pMnode, createReq.name);
|
||||
if (pStb != NULL) {
|
||||
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbByStb(pMnode, createReq.name);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pUser = mndAcquireUser(pMnode, user);
|
||||
if (pUser == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckWriteAuth(pUser, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int32_t numOfStbs = -1;
|
||||
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
|
||||
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
|
||||
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SStbObj stbObj = {0};
|
||||
|
||||
if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndBuildStbFromReq(pMnode, pStb, &createReq, pDb) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
||||
|
||||
return pStb;
|
||||
_OVER:
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
|
||||
mDebug("stream:%s to create", pCreate->name);
|
||||
SStreamObj streamObj = {0};
|
||||
|
@ -311,13 +377,21 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
|||
streamObj.trigger = pCreate->triggerType;
|
||||
streamObj.waterMark = pCreate->watermark;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||
|
||||
#if 0
|
||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->user) < 0) {
|
||||
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) {
|
||||
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
|
|
|
@ -57,7 +57,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
|||
syncMeta.seqNum = pEntry->seqNum;
|
||||
syncMeta.term = pEntry->term;
|
||||
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
||||
perror("wal write error: ");
|
||||
if (code < 0) perror("wal write error: ");
|
||||
assert(code == 0);
|
||||
|
||||
walFsync(pWal, true);
|
||||
|
|
Loading…
Reference in New Issue