Merge pull request #19214 from taosdata/feat/TD-21454
feat(stream):stream insert data into an existing table
This commit is contained in:
commit
8f9cf766a4
|
@ -1751,6 +1751,8 @@ typedef struct {
|
||||||
#define STREAM_FILL_HISTORY_ON 1
|
#define STREAM_FILL_HISTORY_ON 1
|
||||||
#define STREAM_FILL_HISTORY_OFF 0
|
#define STREAM_FILL_HISTORY_OFF 0
|
||||||
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
|
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
|
||||||
|
#define STREAM_CREATE_STABLE_TRUE 1
|
||||||
|
#define STREAM_CREATE_STABLE_FALSE 0
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
|
@ -1768,6 +1770,8 @@ typedef struct {
|
||||||
SArray* pTags; // array of SField
|
SArray* pTags; // array of SField
|
||||||
// 3.0.20
|
// 3.0.20
|
||||||
int64_t checkpointFreq; // ms
|
int64_t checkpointFreq; // ms
|
||||||
|
// 3.0.2.3
|
||||||
|
int8_t createStb;
|
||||||
} SCMCreateStreamReq;
|
} SCMCreateStreamReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -5424,6 +5424,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
|
||||||
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -5484,6 +5485,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
|
|
@ -637,7 +637,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
// create stb for stream
|
// create stb for stream
|
||||||
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
if (createStreamReq.createStb == STREAM_CREATE_STABLE_TRUE && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
|
||||||
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
|
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -5771,6 +5771,8 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
|
||||||
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue