commit
8989579525
|
@ -1281,8 +1281,9 @@ typedef struct {
|
||||||
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
char outputSTbName[TSDB_TABLE_FNAME_LEN];
|
char sourceDB[TSDB_DB_FNAME_LEN];
|
||||||
|
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
|
|
|
@ -739,7 +739,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
|
||||||
.sql = (char*)sql,
|
.sql = (char*)sql,
|
||||||
};
|
};
|
||||||
tNameExtractFullName(&name, req.name);
|
tNameExtractFullName(&name, req.name);
|
||||||
strcpy(req.outputSTbName, tbName);
|
strcpy(req.targetStbFullName, tbName);
|
||||||
|
|
||||||
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
|
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
|
||||||
void* buf = taosMemoryMalloc(tlen);
|
void* buf = taosMemoryMalloc(tlen);
|
||||||
|
|
|
@ -3569,10 +3569,12 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->outputSTbName) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
if (tEncodeI32(&encoder, astLen) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1;
|
||||||
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||||
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
|
||||||
|
|
||||||
|
@ -3592,7 +3594,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->outputSTbName) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
|
||||||
|
|
|
@ -295,7 +295,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||||
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
tstrncpy(streamObj.targetSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(streamObj.targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||||
streamObj.createTime = taosGetTimestampMs();
|
streamObj.createTime = taosGetTimestampMs();
|
||||||
streamObj.updateTime = streamObj.createTime;
|
streamObj.updateTime = streamObj.createTime;
|
||||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||||
|
|
|
@ -2658,7 +2658,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
|
||||||
if ('\0' != pStmt->targetTabName[0]) {
|
if ('\0' != pStmt->targetTabName[0]) {
|
||||||
strcpy(name.dbname, pStmt->targetDbName);
|
strcpy(name.dbname, pStmt->targetDbName);
|
||||||
strcpy(name.tname, pStmt->targetTabName);
|
strcpy(name.tname, pStmt->targetTabName);
|
||||||
tNameExtractFullName(&name, createReq.outputSTbName);
|
tNameExtractFullName(&name, createReq.targetStbFullName);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = translateQuery(pCxt, pStmt->pQuery);
|
int32_t code = translateQuery(pCxt, pStmt->pQuery);
|
||||||
|
|
Loading…
Reference in New Issue