add stream sink

This commit is contained in:
Liu Jicong 2022-03-24 11:34:43 +08:00
parent 492cb92348
commit b0ebd174c1
8 changed files with 38 additions and 15 deletions

View File

@ -2379,7 +2379,7 @@ typedef struct {
int64_t streamId; int64_t streamId;
int64_t version; int64_t version;
SArray* res; // SArray<SSDataBlock> SArray* res; // SArray<SSDataBlock>
} SStreamSmaSinkReq; } SStreamSinkReq;
#pragma pack(pop) #pragma pack(pop)

View File

@ -17,7 +17,9 @@
#define _TD_MND_H_ #define _TD_MND_H_
#include "monitor.h" #include "monitor.h"
#include "tmsg.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "trpc.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {

View File

@ -693,11 +693,13 @@ typedef struct {
SRWLatch lock; SRWLatch lock;
int8_t status; int8_t status;
// int32_t sqlLen; // int32_t sqlLen;
int32_t sinkVgId; // 0 for automatic
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>> SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* outputName; SArray* ColAlias;
char* outputSTbName;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);

View File

@ -45,12 +45,12 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
tEncodeI32(pEncoder, 0); tEncodeI32(pEncoder, 0);
} }
if (pObj->outputName != NULL) { if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->outputName); outputNameSz = taosArrayGetSize(pObj->ColAlias);
} }
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1; if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
for (int32_t i = 0; i < outputNameSz; i++) { for (int32_t i = 0; i < outputNameSz; i++) {
char *name = taosArrayGetP(pObj->outputName, i); char *name = taosArrayGetP(pObj->ColAlias, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1; if (tEncodeCStr(pEncoder, name) < 0) return -1;
} }
return pEncoder->pos; return pEncoder->pos;
@ -88,14 +88,14 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
} }
int32_t outputNameSz; int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
pObj->outputName = taosArrayInit(outputNameSz, sizeof(void *)); pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->outputName == NULL) { if (pObj->ColAlias == NULL) {
return -1; return -1;
} }
for (int32_t i = 0; i < outputNameSz; i++) { for (int32_t i = 0; i < outputNameSz; i++) {
char *name; char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->outputName, &name); taosArrayPush(pObj->ColAlias, &name);
} }
return 0; return 0;
} }

View File

@ -292,7 +292,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
} }
printf("\n=======================================================\n"); printf("\n=======================================================\n");
streamObj.outputName = names; streamObj.ColAlias = names;
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());

View File

@ -161,15 +161,16 @@ typedef struct {
struct STQ { struct STQ {
// the collection of groups // the collection of groups
// the handle of meta kvstore // the handle of meta kvstore
bool writeTrigger;
char* path; char* path;
STqCfg* tqConfig; STqCfg* tqConfig;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
STqPushMgr* tqPushMgr; // STqPushMgr* tqPushMgr;
SHashObj* pStreamTasks; SHashObj* pStreamTasks;
SVnode* pVnode; SVnode* pVnode;
SWal* pWal; SWal* pWal;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
}; };
typedef struct { typedef struct {

View File

@ -55,6 +55,21 @@ typedef struct SVnodeMgr {
TD_DLIST(SVnodeTask) queue; TD_DLIST(SVnodeTask) queue;
} SVnodeMgr; } SVnodeMgr;
typedef struct {
int8_t streamType; // sma or other
int8_t dstType;
int16_t padding;
int32_t smaId;
int64_t tbUid;
int64_t lastReceivedVer;
int64_t lastCommittedVer;
} SStreamSinkInfo;
typedef struct {
SVnode* pVnode;
SHashObj* pHash; // streamId -> SStreamSinkInfo
} SSink;
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
// SVState // SVState
@ -72,8 +87,9 @@ struct SVnode {
SVBufPool* pBufPool; SVBufPool* pBufPool;
SMeta* pMeta; SMeta* pMeta;
STsdb* pTsdb; STsdb* pTsdb;
STQ* pTq;
SWal* pWal; SWal* pWal;
STQ* pTq;
SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
SQHandle* pQuery; SQHandle* pQuery;
SMsgCb msgCb; SMsgCb msgCb;

View File

@ -52,12 +52,14 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq
return NULL; return NULL;
} }
#if 0
pTq->tqPushMgr = tqPushMgrOpen(); pTq->tqPushMgr = tqPushMgrOpen();
if (pTq->tqPushMgr == NULL) { if (pTq->tqPushMgr == NULL) {
// free store // free store
free(pTq); free(pTq);
return NULL; return NULL;
} }
#endif
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);