fix:[TD-28567] do not add md5 to the end of subtable if create stream by sma

This commit is contained in:
wangmm0220 2024-02-19 13:58:58 +08:00
parent 8a4b79bf2c
commit ac9582b24a
12 changed files with 556 additions and 545 deletions

View File

@ -91,6 +91,7 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -146,7 +147,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code);
```
</TabItem>
<TabItem value="java" label="Java">
@ -304,7 +304,6 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
void Close()
```
</TabItem>
</Tabs>
@ -335,8 +334,8 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
对于不同编程语言,其设置方式如下:
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C">
```c
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
@ -353,8 +352,8 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```
</TabItem>
<TabItem value="java" label="Java">
对于 Java 程序,还可以使用如下配置项:
@ -388,7 +387,6 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
```
</TabItem>
<TabItem label="Go" value="Go">
@ -502,6 +500,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
一个 consumer 支持同时订阅多个 topic。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -581,6 +580,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
以下代码展示了不同语言下如何对 TMQ 消息进行消费。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c
@ -717,6 +717,7 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
消费结束后,应当取消订阅。
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
```c

View File

@ -463,7 +463,8 @@ struct SStreamTask {
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
void* pBackend;
char reserve[256];
int8_t subtableWithoutMd5;
char reserve[255];
};
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
@ -532,7 +533,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory);
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
@ -839,7 +840,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
// stream task meta
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn);
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
startComplete_fn_t fn);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);

View File

@ -699,6 +699,7 @@ typedef struct {
int64_t checkpointId;
int32_t indexForMultiAggBalance;
int8_t subTableWithoutMd5;
char reserve[256];
} SStreamObj;

View File

@ -24,7 +24,7 @@ extern "C" {
#endif
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_VER_NUMBER 5
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"

View File

@ -85,6 +85,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.50 ver = 3
if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->subTableWithoutMd5) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1;
@ -168,6 +169,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (sver >= 3) {
if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1;
}
if (sver >= 5) {
if (tDecodeI8(pDecoder, &pObj->subTableWithoutMd5) < 0) return -1;
}
if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1;
tEndDecode(pDecoder);

View File

@ -14,13 +14,13 @@
*/
#include "mndScheduler.h"
#include "tmisce.h"
#include "mndMnode.h"
#include "mndDb.h"
#include "mndMnode.h"
#include "mndSnode.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tcompare.h"
#include "tmisce.h"
#include "tname.h"
#include "tuuid.h"
@ -217,12 +217,12 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
return pVgroup;
}
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup,
SEpSet* pEpset, bool isFillhistory) {
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList,
pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
@ -315,19 +315,18 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
pRange->range.minVer = latestVer + 1;
pRange->range.maxVer = INT64_MAX;
mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
}
}
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset,
bool isFillhistory, bool useTriggerParam) {
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) {
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset,
isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory);
SStreamTask* pTask =
tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pTask == NULL) {
return NULL;
}
@ -364,8 +363,8 @@ static void setHTasksId(SStreamObj* pStream) {
}
}
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
// new stream task
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
if (pTask == NULL) {
@ -415,8 +414,8 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
return plan;
}
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream,
SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
addNewTaskList(pStream);
void* pIter = NULL;
@ -433,7 +432,8 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
continue;
}
int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
int code =
doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
if (code != 0) {
sdbRelease(pSdb, pVgroup);
return code;
@ -461,9 +461,9 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory,
useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory);
SStreamTask* pAggTask =
tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5);
if (pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@ -472,8 +472,8 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
return pAggTask;
}
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset,
SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam){
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
int32_t code = 0;
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam);
if (pTask == NULL) {
@ -616,8 +616,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
sdbRelease(pSdb, pDbObj);
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s",
numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);

View File

@ -567,6 +567,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.conf.triggerParam = pCreate->maxDelay;
streamObj.ast = taosStrdup(smaObj.ast);
streamObj.indexForMultiAggBalance = -1;
streamObj.subTableWithoutMd5 = 1;
// check the maxDelay
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {

View File

@ -802,8 +802,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
}
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId, int32_t transId,
int8_t mndTrigger) {
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId;
req.nodeId = nodeId;
@ -882,7 +881,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);

View File

@ -33,12 +33,15 @@ static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSData
int64_t suid);
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
const char* id);
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
const char* dstTableName, int64_t* uid);
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id);
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
const char* id);
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags);
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
int32_t numOfTags);
static SArray* createDefaultTagColName();
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule);
@ -68,10 +71,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
if(newSubTableRule &&
!isAutoTableName(name) &&
!alreadyAddGroupId(name) &&
groupId != 0) {
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) {
buildCtbNameAddGruopId(name, groupId);
}
} else if (stbFullName) {
@ -181,10 +181,8 @@ SArray* createDefaultTagColName() {
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) {
if(newSubTableRule &&
!isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
gid != 0) {
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGruopId(pCreateTableReq->name, gid);
@ -196,13 +194,14 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
}
}
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
int64_t suid) {
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
SStreamTask* pTask, int64_t suid) {
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
int32_t rows = pDataBlock->info.rows;
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));;
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
;
int32_t code = 0;
SVCreateTbBatchReq reqs = {0};
@ -262,7 +261,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
ASSERT(gid == *(int64_t*)pGpIdData);
}
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayPush(reqs.pArray, pCreateTbReq);
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
@ -361,7 +361,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
pExisted->aRowP = pFinal;
tqTrace("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL));
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL),
(pNew->pCreateTbReq != NULL));
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
taosMemoryFree(pNew->pCreateTbReq);
@ -373,7 +374,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock*
SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};
int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -416,8 +417,8 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
}
if (pReader->me.ctbEntry.suid != suid) {
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64,
vgId, ctbName, suid, pReader->me.ctbEntry.suid);
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
ctbName, suid, pReader->me.ctbEntry.suid);
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
return false;
}
@ -667,10 +668,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
} else {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
!isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName) &&
groupId != 0) {
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) {
buildCtbNameAddGruopId(dstTableName, groupId);
}
}
@ -714,7 +713,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
pTableData->pCreateTbReq =
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray,
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayDestroy(pTagArray);
if (pTableData->pCreateTbReq == NULL) {
@ -750,8 +750,8 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
SSubmitTbData* pTableData, const char* id) {
int32_t numOfRows = pDataBlock->info.rows;
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
id, blockIndex + 1, numOfRows, suid);
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
blockIndex + 1, numOfRows, suid);
char* dstTableName = pDataBlock->info.parTbName;
// convert all rows
@ -832,7 +832,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
}
} else {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SHashObj* pTableIndexMap =
taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
if (submitReq.aSubmitTbData == NULL) {

View File

@ -569,6 +569,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
groupId != 0){

View File

@ -80,7 +80,7 @@ static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory) {
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -96,6 +96,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory;
pTask->info.triggerParam = triggerParam;
pTask->subtableWithoutMd5 = subtableWithoutMd5;
pTask->status.pSM = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL) {
@ -205,6 +206,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
tEndEncode(pEncoder);
@ -287,6 +289,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
}
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
tEndDecode(pDecoder);
@ -881,9 +884,7 @@ void streamTaskResume(SStreamTask* pTask) {
}
}
bool streamTaskIsSinkTask(const SStreamTask* pTask) {
return pTask->info.taskLevel == TASK_LEVEL__SINK;
}
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
int32_t code;