Merge pull request #13008 from taosdata/feature/stream
fix(stream): tb name error
This commit is contained in:
commit
d54d80f63b
|
@ -232,7 +232,8 @@ void blockDebugShowData(const SArray* dataBlocks);
|
|||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t uid, tb_uid_t suid);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId);
|
||||
|
||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
||||
|
|
|
@ -1646,8 +1646,8 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
|
||||
// deserialization
|
||||
// this message is sent from mnode to mnode(read thread to write thread),
|
||||
// so there is no need for serialization or deserialization
|
||||
typedef struct {
|
||||
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
|
||||
} SMqDoRebalanceMsg;
|
||||
|
|
|
@ -142,6 +142,7 @@ typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
|||
|
||||
typedef struct {
|
||||
int64_t stbUid;
|
||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
SSchemaWrapper* pSchemaWrapper;
|
||||
// not applicable to encoder and decoder
|
||||
void* vnode;
|
||||
|
|
|
@ -1630,7 +1630,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
}
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
|
||||
int32_t vgId) {
|
||||
const char* stbFullName, int32_t vgId) {
|
||||
SSubmitReq* ret = NULL;
|
||||
|
||||
// cal size
|
||||
|
@ -1646,10 +1646,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
|
||||
if (createTb) {
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
createTbReq.name = "a";
|
||||
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
|
||||
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
|
||||
createTbReq.name = cname;
|
||||
createTbReq.flags = 0;
|
||||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = htobe64(suid);
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
|
@ -1662,6 +1664,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
int32_t code;
|
||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||
if (code < 0) return NULL;
|
||||
taosMemoryFree(cname);
|
||||
}
|
||||
|
||||
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
|
||||
|
@ -1697,7 +1700,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
int32_t schemaLen = 0;
|
||||
if (createTb) {
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
createTbReq.name = "a";
|
||||
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
|
||||
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
|
||||
createTbReq.name = cname;
|
||||
createTbReq.flags = 0;
|
||||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
|
|
@ -3702,6 +3702,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
|||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->sourceDB) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||
|
@ -3727,6 +3728,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->sourceDB) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||
|
|
|
@ -127,7 +127,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
|
|||
|
||||
size_t tnameLen = strlen(name->tname);
|
||||
if (tnameLen > 0) {
|
||||
assert(name->type == TSDB_TABLE_NAME_T);
|
||||
/*assert(name->type == TSDB_TABLE_NAME_T);*/
|
||||
dst[len] = TS_PATH_DELIMITER[0];
|
||||
|
||||
memcpy(dst + len + 1, name->tname, tnameLen);
|
||||
|
@ -314,9 +314,9 @@ void buildChildTableName(RandTableName* rName) {
|
|||
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
|
||||
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
|
||||
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
|
||||
if(IS_VAR_DATA_TYPE(tagKv->type)){
|
||||
if (IS_VAR_DATA_TYPE(tagKv->type)) {
|
||||
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->length);
|
||||
}else{
|
||||
} else {
|
||||
taosStringBuilderAppendStringLen(&sb, (char*)(&(tagKv->value)), tagKv->length);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -206,6 +206,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
|||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
}
|
||||
|
@ -248,6 +249,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
|
|||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
}
|
||||
|
||||
|
@ -325,6 +327,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.stbUid = pStream->targetStbUid;
|
||||
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -456,7 +456,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
goto CREATE_STREAM_OVER;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbByStream(pMnode, createStreamReq.name);
|
||||
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto CREATE_STREAM_OVER;
|
||||
|
|
|
@ -748,7 +748,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
SVnode* pVnode = (SVnode*)vnode;
|
||||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId);
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
pTask->tbSink.stbFullName, pVnode->config.vgId);
|
||||
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
|
||||
// build write msg
|
||||
SRpcMsg msg = {
|
||||
|
|
|
@ -3386,9 +3386,9 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
pReq->igExists = pStmt->ignoreExists;
|
||||
|
||||
SName name;
|
||||
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
|
||||
// tNameGetFullDbName(&name, pReq->name);
|
||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name);
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
|
||||
tNameGetFullDbName(&name, pReq->name);
|
||||
// tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->streamName, &name), pReq->name);
|
||||
|
||||
if ('\0' != pStmt->targetTabName[0]) {
|
||||
strcpy(name.dbname, pStmt->targetDbName);
|
||||
|
|
|
@ -505,6 +505,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
|
||||
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
||||
|
@ -551,6 +552,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
|
||||
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
|
||||
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
|
|
Loading…
Reference in New Issue