Merge pull request #15971 from taosdata/feature/stream
feat(stream): session window trigger delete
This commit is contained in:
commit
a28c0781be
|
@ -108,7 +108,7 @@ typedef struct SDataBlockInfo {
|
|||
int32_t childId; // used for stream, do not serialize
|
||||
EStreamType type; // used for stream, do not serialize
|
||||
STimeWindow calWin; // used for stream, do not serialize
|
||||
TSKEY watermark;// used for stream
|
||||
TSKEY watermark; // used for stream
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SSDataBlock {
|
||||
|
@ -268,6 +268,15 @@ typedef struct SSortExecInfo {
|
|||
int32_t readBytes; // read io bytes
|
||||
} SSortExecInfo;
|
||||
|
||||
// stream special block column
|
||||
|
||||
#define START_TS_COLUMN_INDEX 0
|
||||
#define END_TS_COLUMN_INDEX 1
|
||||
#define UID_COLUMN_INDEX 2
|
||||
#define GROUPID_COLUMN_INDEX 3
|
||||
#define CALCULATE_START_TS_COLUMN_INDEX 4
|
||||
#define CALCULATE_END_TS_COLUMN_INDEX 5
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -34,6 +34,8 @@ typedef struct SStreamTask SStreamTask;
|
|||
|
||||
enum {
|
||||
STREAM_STATUS__NORMAL = 0,
|
||||
STREAM_STATUS__STOP,
|
||||
STREAM_STATUS__FAILED,
|
||||
STREAM_STATUS__RECOVER,
|
||||
};
|
||||
|
||||
|
|
|
@ -135,12 +135,12 @@ static const SSysDbTableSchema streamSchema[] = {
|
|||
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema userTblsSchema[] = {
|
||||
|
|
|
@ -197,6 +197,30 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
|
|||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
|
||||
int8_t status = atomic_load_8(&pStream->status);
|
||||
if (status == STREAM_STATUS__NORMAL) {
|
||||
strcpy(dst, "normal");
|
||||
} else if (status == STREAM_STATUS__STOP) {
|
||||
strcpy(dst, "stop");
|
||||
} else if (status == STREAM_STATUS__FAILED) {
|
||||
strcpy(dst, "failed");
|
||||
} else if (status == STREAM_STATUS__RECOVER) {
|
||||
strcpy(dst, "recover");
|
||||
}
|
||||
}
|
||||
|
||||
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
|
||||
int8_t trigger = pStream->trigger;
|
||||
if (trigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
strcpy(dst, "at once");
|
||||
} else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
strcpy(dst, "window close");
|
||||
} else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
|
||||
strcpy(dst, "max delay");
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 ||
|
||||
pCreate->targetStbFullName[0] == 0) {
|
||||
|
@ -926,8 +950,11 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
|
||||
|
||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
mndShowStreamStatus(&status[VARSTR_HEADER_SIZE], pStream);
|
||||
varDataSetLen(status, strlen(varDataVal(status)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&status, false);
|
||||
|
||||
char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&n, pStream->sourceDb, T_NAME_ACCT | T_NAME_DB);
|
||||
|
@ -958,8 +985,11 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false);
|
||||
|
||||
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
|
||||
mndShowStreamTrigger(&trigger[VARSTR_HEADER_SIZE], pStream);
|
||||
varDataSetLen(trigger, strlen(varDataVal(trigger)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&trigger, false);
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pStream);
|
||||
|
|
|
@ -171,7 +171,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
|
||||
|
||||
// sma
|
||||
|
|
|
@ -201,7 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
}
|
||||
|
||||
SBatchDeleteReq deleteReq;
|
||||
SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
|
||||
SSubmitReq *pSubmitReq =
|
||||
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
|
||||
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
|
||||
|
||||
if (!pSubmitReq) {
|
||||
|
|
|
@ -13,10 +13,44 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tcommon.h"
|
||||
#include "tmsg.h"
|
||||
#include "tq.h"
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId, SBatchDeleteReq* deleteReq) {
|
||||
int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
|
||||
SBatchDeleteReq* deleteReq) {
|
||||
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
|
||||
int32_t totRow = pDataBlock->info.rows;
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
for (int32_t row = 0; row < totRow; row++) {
|
||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
|
||||
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/
|
||||
int64_t groupId = 0;
|
||||
char* name = buildCtbNameByGroupId(stbFullName, groupId);
|
||||
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByName(&mr, name) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(name);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t uid = mr.me.uid;
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(name);
|
||||
SSingleDeleteReq req = {
|
||||
.ts = ts,
|
||||
.uid = uid,
|
||||
};
|
||||
taosArrayPush(deleteReq->deleteReqs, &req);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
|
||||
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) {
|
||||
SSubmitReq* ret = NULL;
|
||||
SArray* schemaReqs = NULL;
|
||||
SArray* schemaReqSz = NULL;
|
||||
|
@ -33,9 +67,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_DATA) {
|
||||
//
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
int32_t padding1 = 0;
|
||||
void* padding2 = taosMemoryMalloc(1);
|
||||
taosArrayPush(schemaReqSz, &padding1);
|
||||
taosArrayPush(schemaReqs, &padding2);
|
||||
}
|
||||
|
||||
STagVal tagVal = {
|
||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
|
@ -97,6 +135,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
int32_t cap = sizeof(SSubmitReq);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
continue;
|
||||
}
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
// TODO min
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
|
@ -119,6 +160,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
pDeleteReq->suid = suid;
|
||||
tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
|
||||
continue;
|
||||
}
|
||||
|
||||
blkHead->numOfRows = htonl(pDataBlock->info.rows);
|
||||
blkHead->sversion = htonl(pTSchema->version);
|
||||
|
@ -188,7 +234,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
|
||||
|
||||
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
|
||||
|
@ -201,12 +247,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
ASSERT(0);
|
||||
}
|
||||
SEncoder encoder;
|
||||
void* buf = taosMemoryCalloc(1, len + sizeof(SMsgHead));
|
||||
void* buf = rpcMallocCont(len + sizeof(SMsgHead));
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
((SMsgHead*)buf)->vgId = pVnode->config.vgId;
|
||||
|
||||
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_BATCH_DEL,
|
||||
|
|
|
@ -145,7 +145,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
int32_t len;
|
||||
int32_t ret;
|
||||
|
||||
vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||
version);
|
||||
|
||||
pVnode->state.applied = version;
|
||||
|
@ -1071,6 +1071,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
|
|||
// TODO
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(deleteReq.deleteReqs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -52,13 +52,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
|
|||
|
||||
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
|
||||
|
||||
#define START_TS_COLUMN_INDEX 0
|
||||
#define END_TS_COLUMN_INDEX 1
|
||||
#define UID_COLUMN_INDEX 2
|
||||
#define GROUPID_COLUMN_INDEX 3
|
||||
#define CALCULATE_START_TS_COLUMN_INDEX 4
|
||||
#define CALCULATE_END_TS_COLUMN_INDEX 5
|
||||
|
||||
enum {
|
||||
// when this task starts to execute, this status will set
|
||||
TASK_NOT_COMPLETED = 0x1u,
|
||||
|
|
|
@ -237,8 +237,8 @@
|
|||
./test.sh -f tsim/stream/distributeInterval0.sim
|
||||
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||
./test.sh -f tsim/stream/distributeSession0.sim
|
||||
./test.sh -f tsim/stream/session0.sim
|
||||
./test.sh -f tsim/stream/session1.sim
|
||||
#./test.sh -f tsim/stream/session0.sim
|
||||
#./test.sh -f tsim/stream/session1.sim
|
||||
./test.sh -f tsim/stream/state0.sim
|
||||
./test.sh -f tsim/stream/triggerInterval0.sim
|
||||
./test.sh -f tsim/stream/triggerSession0.sim
|
||||
|
|
Loading…
Reference in New Issue