Merge branch '3.0' into feature/TD-13066-3.0
This commit is contained in:
commit
96091ee10f
|
@ -81,9 +81,10 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(
|
pRes =
|
||||||
pConn,
|
taos_query(pConn,
|
||||||
"create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)");
|
"create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
||||||
|
"from tu1 interval(10m)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -64,8 +64,7 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
||||||
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
|
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
|
||||||
// SColumnInfoData, rowNumber
|
// SColumnInfoData, rowNumber
|
||||||
#define colDataGetData(p1_, r_) \
|
#define colDataGetData(p1_, r_) \
|
||||||
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) \
|
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
|
||||||
: colDataGetNumData(p1_, r_))
|
|
||||||
|
|
||||||
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
@ -228,6 +227,8 @@ void blockDebugShowData(const SArray* dataBlocks);
|
||||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||||
tb_uid_t uid, tb_uid_t suid);
|
tb_uid_t uid, tb_uid_t suid);
|
||||||
|
|
||||||
|
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
|
||||||
|
|
||||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,10 +62,10 @@ extern "C" {
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct {
|
typedef struct {
|
||||||
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
||||||
int32_t type : 8; // column type
|
int8_t type; // column type
|
||||||
int32_t bytes : 24; // column bytes (0~16M)
|
int8_t flags; // flags: 0 no index, 1 SCHEMA_SMA_ON, 2 SCHEMA_IDX_ON
|
||||||
int32_t flags : 8; // flags: 0 no index, 1 SCHEMA_SMA_ON, 2 SCHEMA_IDX_ON
|
int32_t bytes; // column bytes (0~16M)
|
||||||
int32_t offset : 24; // point offset in STpRow after the header part.
|
int32_t offset; // point offset in STpRow after the header part.
|
||||||
} STColumn;
|
} STColumn;
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
|
|
@ -245,6 +245,8 @@ int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
||||||
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
||||||
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
||||||
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
||||||
|
// for debug
|
||||||
|
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t index; // index of failed block in submit blocks
|
int32_t index; // index of failed block in submit blocks
|
||||||
|
@ -2130,7 +2132,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp
|
||||||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||||
|
|
||||||
pSW->pSchema = (SSchema*)tCoderMalloc(pDecoder, sizeof(SSchema) * pSW->nCols);
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) return -1;
|
if (pSW->pSchema == NULL) return -1;
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
||||||
|
|
|
@ -71,7 +71,9 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t reserved;
|
int8_t reserved;
|
||||||
|
SSchemaWrapper* pSchemaWrapper;
|
||||||
// not applicable to encoder and decoder
|
// not applicable to encoder and decoder
|
||||||
|
STSchema* pTSchema;
|
||||||
SHashObj* pHash; // groupId to tbuid
|
SHashObj* pHash; // groupId to tbuid
|
||||||
} STaskSinkTb;
|
} STaskSinkTb;
|
||||||
|
|
||||||
|
|
|
@ -1589,3 +1589,68 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
||||||
|
SSubmitReq* ret = NULL;
|
||||||
|
|
||||||
|
// cal size
|
||||||
|
int32_t cap = sizeof(SSubmitReq);
|
||||||
|
int32_t sz = taosArrayGetSize(pBlocks);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
// TODO min
|
||||||
|
int32_t rowSize = pDataBlock->info.rowSize;
|
||||||
|
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||||
|
cap += sizeof(SSubmitBlk) + rows * maxLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
// assign data
|
||||||
|
ret = taosMemoryCalloc(1, cap);
|
||||||
|
ret->version = htonl(1);
|
||||||
|
ret->length = htonl(cap - sizeof(SSubmitReq));
|
||||||
|
ret->numOfBlocks = htonl(sz);
|
||||||
|
|
||||||
|
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
|
||||||
|
SSubmitBlk* blkHead = submitBlk;
|
||||||
|
blkHead->numOfRows = htons(pDataBlock->info.rows);
|
||||||
|
blkHead->schemaLen = 0;
|
||||||
|
blkHead->sversion = htonl(pTSchema->version);
|
||||||
|
// TODO
|
||||||
|
blkHead->suid = 0;
|
||||||
|
blkHead->uid = htobe64(pDataBlock->info.uid);
|
||||||
|
|
||||||
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||||
|
/*blkHead->dataLen = htonl(rows * maxLen);*/
|
||||||
|
blkHead->dataLen = 0;
|
||||||
|
|
||||||
|
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
|
||||||
|
STSRow* rowData = blockData;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
|
||||||
|
SRowBuilder rb = {0};
|
||||||
|
tdSRowInit(&rb, pTSchema->version);
|
||||||
|
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
||||||
|
tdSRowResetBuf(&rb, rowData);
|
||||||
|
|
||||||
|
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||||
|
const STColumn* pColumn = &pTSchema->columns[k];
|
||||||
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
|
void* data = colDataGetData(pColData, j);
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
||||||
|
}
|
||||||
|
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||||
|
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||||
|
blkHead->dataLen += rowLen;
|
||||||
|
}
|
||||||
|
int32_t len = blkHead->dataLen;
|
||||||
|
blkHead->dataLen = htonl(len);
|
||||||
|
blkHead = POINTER_SHIFT(blkHead, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
||||||
|
@ -128,6 +129,50 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
int32_t tEncodeSTColumn(SCoder *pEncoder, const STColumn *pCol) {
|
||||||
|
if (tEncodeI16(pEncoder, pCol->colId) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pCol->type) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pCol->sma) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pCol->bytes) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pCol->offset) < 0) return -1;
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSTColumn(SCoder *pDecoder, STColumn *pCol) {
|
||||||
|
if (tDecodeI16(pDecoder, &pCol->colId) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pCol->type) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pCol->sma) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pCol->bytes) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pCol->offset) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSchema(SCoder *pEncoder, const STSchema *pSchema) {
|
||||||
|
if (tEncodeI32(pEncoder, pSchema->numOfCols) < 0) return -1;
|
||||||
|
if (tEncodeI16(pEncoder, pSchema->version) < 0) return -1;
|
||||||
|
if (tEncodeU16(pEncoder, pSchema->flen) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pSchema->vlen) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pSchema->tlen) < 0) return -1;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < schemaNCols(pSchema); i++) {
|
||||||
|
const STColumn *pCol = schemaColAt(pSchema, i);
|
||||||
|
if (tEncodeSTColumn(pEncoder, pCol) < 0) return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSchema(SCoder *pDecoder, STSchema *pSchema) {
|
||||||
|
if (tDecodeI32(pDecoder, &pSchema->numOfCols) < 0) return -1;
|
||||||
|
if (tDecodeI16(pDecoder, &pSchema->version) < 0) return -1;
|
||||||
|
if (tDecodeU16(pDecoder, &pSchema->flen) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pSchema->vlen) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pSchema->tlen) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
||||||
if (pBuilder == NULL) return -1;
|
if (pBuilder == NULL) return -1;
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,25 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) {
|
||||||
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
|
||||||
|
while (true) {
|
||||||
|
SSubmitBlk *pBlock = NULL;
|
||||||
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||||
|
if (pBlock == NULL) break;
|
||||||
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||||
|
STSRowIter rowIter = {0};
|
||||||
|
tdSTSRowIterInit(&rowIter, pTschema);
|
||||||
|
STSRow *row;
|
||||||
|
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||||
|
tdSRowPrint(row, pTschema, "stream");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
|
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
|
||||||
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
|
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
|
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
|
||||||
|
|
|
@ -419,6 +419,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char key[TSDB_PARTITION_KEY_LEN];
|
char key[TSDB_PARTITION_KEY_LEN];
|
||||||
|
int64_t dbUid;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
} SMqOffsetObj;
|
} SMqOffsetObj;
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,8 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
|
||||||
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -31,6 +31,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||||
|
|
||||||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||||
|
|
||||||
|
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -17,9 +17,12 @@
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndAuth.h"
|
#include "mndAuth.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
|
#include "mndOffset.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSma.h"
|
#include "mndSma.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
#include "mndSubscribe.h"
|
||||||
|
#include "mndTopic.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
@ -1027,6 +1030,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
|
||||||
|
|
||||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
|
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
|
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
|
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
|
|
||||||
int32_t rspLen = 0;
|
int32_t rspLen = 0;
|
||||||
|
|
|
@ -231,3 +231,36 @@ static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
|
||||||
|
SSdbRaw *pCommitRaw = mndOffsetActionEncode(pOffset);
|
||||||
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
|
int32_t code = -1;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
void *pIter = NULL;
|
||||||
|
SMqOffsetObj *pOffset = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pOffset->dbUid != pDb->uid) {
|
||||||
|
sdbRelease(pSdb, pOffset);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
END:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -204,6 +204,8 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
|
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||||
|
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch
|
// dispatch
|
||||||
|
@ -242,6 +244,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
|
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
// dispatch
|
// dispatch
|
||||||
|
@ -316,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
|
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
|
@ -926,6 +926,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pTask->ahandle = pTq->pVnode;
|
pTask->ahandle = pTq->pVnode;
|
||||||
if (pTask->sinkType == TASK_SINK__SMA) {
|
if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
pTask->smaSink.smaHandle = smaHandleRes;
|
pTask->smaSink.smaHandle = smaHandleRes;
|
||||||
|
} else if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
|
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||||
|
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||||
|
pTask->tbSink.pTSchema =
|
||||||
|
tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
|
||||||
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
||||||
|
|
|
@ -152,8 +152,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
//
|
/*blockDebugShowData(pRes);*/
|
||||||
blockDebugShowData(pRes);
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
|
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema);
|
||||||
|
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||||
//
|
//
|
||||||
|
@ -274,7 +276,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
|
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
|
||||||
|
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||||
|
@ -318,7 +321,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
|
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
|
||||||
|
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||||
|
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
|
||||||
|
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||||
|
|
Loading…
Reference in New Issue