Merge pull request #12128 from taosdata/feature/tq
feat(stream): insert data
This commit is contained in:
commit
ea2247e289
|
@ -81,9 +81,10 @@ int32_t create_stream() {
|
|||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||
pRes = taos_query(
|
||||
pConn,
|
||||
"create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)");
|
||||
pRes =
|
||||
taos_query(pConn,
|
||||
"create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
||||
"from tu1 interval(10m)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
|
|
@ -54,8 +54,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
|||
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
||||
} while (0)
|
||||
|
||||
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
|
||||
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
|
||||
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
|
||||
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
|
||||
|
||||
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
|
||||
|
||||
|
@ -63,16 +63,15 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
|||
|
||||
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
|
||||
// SColumnInfoData, rowNumber
|
||||
#define colDataGetData(p1_, r_) \
|
||||
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) \
|
||||
: colDataGetNumData(p1_, r_))
|
||||
#define colDataGetData(p1_, r_) \
|
||||
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON){
|
||||
if(colDataIsNull_var(pColumnInfoData, row)){
|
||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
if (colDataIsNull_var(pColumnInfoData, row)) {
|
||||
return true;
|
||||
}
|
||||
char *data = colDataGetVarData(pColumnInfoData, row);
|
||||
char* data = colDataGetVarData(pColumnInfoData, row);
|
||||
return (*data == TSDB_DATA_TYPE_NULL);
|
||||
}
|
||||
|
||||
|
@ -80,7 +79,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
|
|||
return false;
|
||||
}
|
||||
|
||||
if (pColumnInfoData->info.type== TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||
return colDataIsNull_var(pColumnInfoData, row);
|
||||
} else {
|
||||
if (pColumnInfoData->nullbitmap == NULL) {
|
||||
|
@ -132,7 +131,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
|
|||
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
for (int32_t i = start; i < start + nRows; ++i) {
|
||||
colDataSetNull_var(pColumnInfoData,i); // it is a null value of VAR type.
|
||||
colDataSetNull_var(pColumnInfoData, i); // it is a null value of VAR type.
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = start; i < start + nRows; ++i) {
|
||||
|
@ -228,6 +227,8 @@ void blockDebugShowData(const SArray* dataBlocks);
|
|||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t uid, tb_uid_t suid);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
|
||||
|
||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
||||
}
|
||||
|
@ -241,10 +242,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
|
|||
|
||||
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
||||
int8_t needCompress) {
|
||||
int32_t* actualLen = (int32_t*) data;
|
||||
int32_t* actualLen = (int32_t*)data;
|
||||
data += sizeof(int32_t);
|
||||
|
||||
uint64_t* groupId = (uint64_t*) data;
|
||||
uint64_t* groupId = (uint64_t*)data;
|
||||
data += sizeof(uint64_t);
|
||||
|
||||
int32_t* colSizes = (int32_t*)data;
|
||||
|
|
|
@ -61,11 +61,11 @@ extern "C" {
|
|||
// ----------------- TSDB COLUMN DEFINITION
|
||||
#pragma pack(push, 1)
|
||||
typedef struct {
|
||||
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
||||
int32_t type : 8; // column type
|
||||
int32_t bytes : 24; // column bytes (0~16M)
|
||||
int32_t flags : 8; // flags: 0 no index, 1 SCHEMA_SMA_ON, 2 SCHEMA_IDX_ON
|
||||
int32_t offset : 24; // point offset in STpRow after the header part.
|
||||
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
||||
int8_t type; // column type
|
||||
int8_t flags; // flags: 0 no index, 1 SCHEMA_SMA_ON, 2 SCHEMA_IDX_ON
|
||||
int32_t bytes; // column bytes (0~16M)
|
||||
int32_t offset; // point offset in STpRow after the header part.
|
||||
} STColumn;
|
||||
#pragma pack(pop)
|
||||
|
||||
|
|
|
@ -245,6 +245,8 @@ int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
|||
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
||||
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
||||
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
||||
// for debug
|
||||
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
|
||||
|
||||
typedef struct {
|
||||
int32_t index; // index of failed block in submit blocks
|
||||
|
@ -2130,7 +2132,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp
|
|||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||
|
||||
pSW->pSchema = (SSchema*)tCoderMalloc(pDecoder, sizeof(SSchema) * pSW->nCols);
|
||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||
if (pSW->pSchema == NULL) return -1;
|
||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
||||
|
|
|
@ -622,7 +622,6 @@ static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief To judge row type: STpRow/SKvRow
|
||||
*
|
||||
|
@ -758,7 +757,6 @@ static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
|
||||
*
|
||||
|
@ -1250,16 +1248,16 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param colType
|
||||
* @param flen
|
||||
* @param offset
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param colType
|
||||
* @param flen
|
||||
* @param offset
|
||||
* @param colIdx start from 0
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
*/
|
||||
static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset,
|
||||
col_id_t colIdx, SCellVal *pVal) {
|
||||
|
@ -1273,14 +1271,14 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param offset
|
||||
* @brief
|
||||
*
|
||||
* @param pRow
|
||||
* @param colId
|
||||
* @param offset
|
||||
* @param colIdx start from 0
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
* @param pVal
|
||||
* @return FORCE_INLINE
|
||||
*/
|
||||
static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx,
|
||||
SCellVal *pVal) {
|
||||
|
@ -1397,14 +1395,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) {
|
||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) {
|
||||
STSRowIter iter = {0};
|
||||
tdSTSRowIterInit(&iter, pSchema);
|
||||
tdSTSRowIterReset(&iter, row);
|
||||
printf("%s >>>", tag);
|
||||
for (int i = 0; i < pSchema->numOfCols; ++i) {
|
||||
STColumn *stCol = pSchema->columns + i;
|
||||
SCellVal sVal = { 255, NULL};
|
||||
SCellVal sVal = {255, NULL};
|
||||
if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -70,8 +70,10 @@ typedef struct {
|
|||
} STaskDispatcherShuffle;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
int8_t reserved;
|
||||
SSchemaWrapper* pSchemaWrapper;
|
||||
// not applicable to encoder and decoder
|
||||
STSchema* pTSchema;
|
||||
SHashObj* pHash; // groupId to tbuid
|
||||
} STaskSinkTb;
|
||||
|
||||
|
|
|
@ -1589,3 +1589,68 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
||||
SSubmitReq* ret = NULL;
|
||||
|
||||
// cal size
|
||||
int32_t cap = sizeof(SSubmitReq);
|
||||
int32_t sz = taosArrayGetSize(pBlocks);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
// TODO min
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||
cap += sizeof(SSubmitBlk) + rows * maxLen;
|
||||
}
|
||||
|
||||
// assign data
|
||||
ret = taosMemoryCalloc(1, cap);
|
||||
ret->version = htonl(1);
|
||||
ret->length = htonl(cap - sizeof(SSubmitReq));
|
||||
ret->numOfBlocks = htonl(sz);
|
||||
|
||||
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
|
||||
SSubmitBlk* blkHead = submitBlk;
|
||||
blkHead->numOfRows = htons(pDataBlock->info.rows);
|
||||
blkHead->schemaLen = 0;
|
||||
blkHead->sversion = htonl(pTSchema->version);
|
||||
// TODO
|
||||
blkHead->suid = 0;
|
||||
blkHead->uid = htobe64(pDataBlock->info.uid);
|
||||
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||
/*blkHead->dataLen = htonl(rows * maxLen);*/
|
||||
blkHead->dataLen = 0;
|
||||
|
||||
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
|
||||
STSRow* rowData = blockData;
|
||||
|
||||
for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, pTSchema->version);
|
||||
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
||||
tdSRowResetBuf(&rb, rowData);
|
||||
|
||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||
const STColumn* pColumn = &pTSchema->columns[k];
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
void* data = colDataGetData(pColData, j);
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
||||
}
|
||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||
blkHead->dataLen += rowLen;
|
||||
}
|
||||
int32_t len = blkHead->dataLen;
|
||||
blkHead->dataLen = htonl(len);
|
||||
blkHead = POINTER_SHIFT(blkHead, len);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "tdataformat.h"
|
||||
#include "tcoding.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tlog.h"
|
||||
|
||||
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
||||
|
@ -128,6 +129,50 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tEncodeSTColumn(SCoder *pEncoder, const STColumn *pCol) {
|
||||
if (tEncodeI16(pEncoder, pCol->colId) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pCol->type) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pCol->sma) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pCol->bytes) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pCol->offset) < 0) return -1;
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSTColumn(SCoder *pDecoder, STColumn *pCol) {
|
||||
if (tDecodeI16(pDecoder, &pCol->colId) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pCol->type) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pCol->sma) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pCol->bytes) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pCol->offset) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSchema(SCoder *pEncoder, const STSchema *pSchema) {
|
||||
if (tEncodeI32(pEncoder, pSchema->numOfCols) < 0) return -1;
|
||||
if (tEncodeI16(pEncoder, pSchema->version) < 0) return -1;
|
||||
if (tEncodeU16(pEncoder, pSchema->flen) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pSchema->vlen) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pSchema->tlen) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < schemaNCols(pSchema); i++) {
|
||||
const STColumn *pCol = schemaColAt(pSchema, i);
|
||||
if (tEncodeSTColumn(pEncoder, pCol) < 0) return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSchema(SCoder *pDecoder, STSchema *pSchema) {
|
||||
if (tDecodeI32(pDecoder, &pSchema->numOfCols) < 0) return -1;
|
||||
if (tDecodeI16(pDecoder, &pSchema->version) < 0) return -1;
|
||||
if (tDecodeU16(pDecoder, &pSchema->flen) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pSchema->vlen) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pSchema->tlen) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
||||
if (pBuilder == NULL) return -1;
|
||||
|
||||
|
@ -908,4 +953,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
|
|||
taosArrayDestroy(stashRow);
|
||||
return buffer;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -103,6 +103,25 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) {
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||
if (pBlock == NULL) break;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||
STSRowIter rowIter = {0};
|
||||
tdSTSRowIterInit(&rowIter, pTschema);
|
||||
STSRow *row;
|
||||
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
tdSRowPrint(row, pTschema, "stream");
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
|
||||
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
|
||||
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
|
||||
|
|
|
@ -419,6 +419,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char key[TSDB_PARTITION_KEY_LEN];
|
||||
int64_t dbUid;
|
||||
int64_t offset;
|
||||
} SMqOffsetObj;
|
||||
|
||||
|
|
|
@ -37,6 +37,8 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
|
|||
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
||||
}
|
||||
|
||||
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -31,6 +31,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
|||
|
||||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||
|
||||
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
#include "mndDb.h"
|
||||
#include "mndAuth.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndOffset.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSma.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
|
@ -1027,6 +1030,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
|
|||
|
||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
|
||||
int32_t rspLen = 0;
|
||||
|
@ -1387,7 +1393,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
|
|||
bool sysDb) {
|
||||
int32_t cols = 0;
|
||||
|
||||
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
|
||||
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
|
||||
char *buf = taosMemoryMalloc(bytes);
|
||||
const char *name = mndGetDbStr(pDb->name);
|
||||
if (name != NULL) {
|
||||
|
|
|
@ -231,3 +231,36 @@ static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
|
||||
SSdbRaw *pCommitRaw = mndOffsetActionEncode(pOffset);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
SMqOffsetObj *pOffset = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pOffset->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pOffset);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
code = 0;
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -204,6 +204,8 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
|||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
}
|
||||
|
||||
// dispatch
|
||||
|
@ -242,6 +244,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
|
|||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
}
|
||||
//
|
||||
// dispatch
|
||||
|
@ -316,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
|||
pTask->smaSink.smaId = pStream->smaId;
|
||||
} else {
|
||||
pTask->sinkType = TASK_SINK__TABLE;
|
||||
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -926,6 +926,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pTask->ahandle = pTq->pVnode;
|
||||
if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
pTask->smaSink.smaHandle = smaHandleRes;
|
||||
} else if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||
pTask->tbSink.pTSchema =
|
||||
tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
}
|
||||
|
||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
||||
|
|
|
@ -152,8 +152,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
|||
|
||||
// sink
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
//
|
||||
blockDebugShowData(pRes);
|
||||
/*blockDebugShowData(pRes);*/
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema);
|
||||
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||
//
|
||||
|
@ -274,7 +276,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
|
||||
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
|
||||
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||
|
@ -318,7 +321,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
|
||||
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
|
||||
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
|
||||
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||
|
|
Loading…
Reference in New Issue