opti:consume performance if all data is in format of "insert into using"
This commit is contained in:
parent
31b8410c33
commit
a74f12e2e1
|
@ -1400,7 +1400,7 @@ void tFreeSCompactDbReq(SCompactDbReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t compactId;
|
int32_t compactId;
|
||||||
int8_t bAccepted;
|
int8_t bAccepted;
|
||||||
} SCompactDbRsp;
|
} SCompactDbRsp;
|
||||||
|
|
||||||
int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp);
|
int32_t tSerializeSCompactDbRsp(void* buf, int32_t bufLen, SCompactDbRsp* pRsp);
|
||||||
|
@ -1414,7 +1414,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tSerializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq);
|
int32_t tSerializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq);
|
||||||
int32_t tDeserializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq);
|
int32_t tDeserializeSKillCompactReq(void* buf, int32_t bufLen, SKillCompactReq* pReq);
|
||||||
void tFreeSKillCompactReq(SKillCompactReq *pReq);
|
void tFreeSKillCompactReq(SKillCompactReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_FUNC_NAME_LEN];
|
char name[TSDB_FUNC_NAME_LEN];
|
||||||
|
@ -1751,9 +1751,9 @@ int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq*
|
||||||
int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t compactId;
|
int32_t compactId;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
} SVKillCompactReq;
|
} SVKillCompactReq;
|
||||||
|
|
||||||
int32_t tSerializeSVKillCompactReq(void* buf, int32_t bufLen, SVKillCompactReq* pReq);
|
int32_t tSerializeSVKillCompactReq(void* buf, int32_t bufLen, SVKillCompactReq* pReq);
|
||||||
|
@ -1954,9 +1954,9 @@ typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
char tb[TSDB_TABLE_NAME_LEN];
|
char tb[TSDB_TABLE_NAME_LEN];
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char filterTb[TSDB_TABLE_NAME_LEN]; // for ins_columns
|
char filterTb[TSDB_TABLE_NAME_LEN]; // for ins_columns
|
||||||
int64_t showId;
|
int64_t showId;
|
||||||
int64_t compactId; // for compact
|
int64_t compactId; // for compact
|
||||||
} SRetrieveTableReq;
|
} SRetrieveTableReq;
|
||||||
|
|
||||||
typedef struct SSysTableSchema {
|
typedef struct SSysTableSchema {
|
||||||
|
@ -3784,12 +3784,12 @@ typedef struct {
|
||||||
} SMqHbReq;
|
} SMqHbReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
int8_t noPrivilege;
|
int8_t noPrivilege;
|
||||||
} STopicPrivilege;
|
} STopicPrivilege;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SArray* topicPrivileges; // SArray<STopicPrivilege>
|
SArray* topicPrivileges; // SArray<STopicPrivilege>
|
||||||
} SMqHbRsp;
|
} SMqHbRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -3927,8 +3927,8 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
||||||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||||
#define SUBMIT_REQ_FROM_FILE 0x4
|
#define SUBMIT_REQ_FROM_FILE 0x4
|
||||||
|
|
||||||
#define SOURCE_NULL 0
|
#define SOURCE_NULL 0
|
||||||
#define SOURCE_TAOSX 1
|
#define SOURCE_TAOSX 1
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t flags;
|
int32_t flags;
|
||||||
|
@ -3940,8 +3940,8 @@ typedef struct {
|
||||||
SArray* aRowP;
|
SArray* aRowP;
|
||||||
SArray* aCol;
|
SArray* aCol;
|
||||||
};
|
};
|
||||||
int64_t ctimeMs;
|
int64_t ctimeMs;
|
||||||
int8_t source;
|
int8_t source;
|
||||||
} SSubmitTbData;
|
} SSubmitTbData;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -50,21 +50,21 @@ extern "C" {
|
||||||
(_t)->hTaskInfo.id.streamId = 0; \
|
(_t)->hTaskInfo.id.streamId = 0; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1)
|
#define STREAM_EXEC_T_EXTRACT_WAL_DATA (-1)
|
||||||
#define STREAM_EXEC_T_START_ALL_TASKS (-2)
|
#define STREAM_EXEC_T_START_ALL_TASKS (-2)
|
||||||
#define STREAM_EXEC_T_START_ONE_TASK (-3)
|
#define STREAM_EXEC_T_START_ONE_TASK (-3)
|
||||||
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
#define STREAM_EXEC_T_RESTART_ALL_TASKS (-4)
|
||||||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||||
#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7)
|
#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7)
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
typedef struct SStreamQueue SStreamQueue;
|
typedef struct SStreamQueue SStreamQueue;
|
||||||
typedef struct SStreamTaskSM SStreamTaskSM;
|
typedef struct SStreamTaskSM SStreamTaskSM;
|
||||||
|
|
||||||
#define SSTREAM_TASK_VER 3
|
#define SSTREAM_TASK_VER 3
|
||||||
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
|
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
|
||||||
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
||||||
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -405,8 +405,8 @@ typedef struct SHistoryTaskInfo {
|
||||||
int32_t tickCount;
|
int32_t tickCount;
|
||||||
int32_t retryTimes;
|
int32_t retryTimes;
|
||||||
int32_t waitInterval;
|
int32_t waitInterval;
|
||||||
int64_t haltVer; // offset in wal when halt the stream task
|
int64_t haltVer; // offset in wal when halt the stream task
|
||||||
bool operatorOpen; // false by default
|
bool operatorOpen; // false by default
|
||||||
} SHistoryTaskInfo;
|
} SHistoryTaskInfo;
|
||||||
|
|
||||||
typedef struct STaskOutputInfo {
|
typedef struct STaskOutputInfo {
|
||||||
|
@ -469,15 +469,15 @@ struct SStreamTask {
|
||||||
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
|
typedef int32_t (*startComplete_fn_t)(struct SStreamMeta*);
|
||||||
|
|
||||||
typedef struct STaskStartInfo {
|
typedef struct STaskStartInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t readyTs;
|
int64_t readyTs;
|
||||||
int32_t tasksWillRestart;
|
int32_t tasksWillRestart;
|
||||||
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
||||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||||
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
int32_t restartCount; // restart task counter
|
int32_t restartCount; // restart task counter
|
||||||
startComplete_fn_t completeFn; // complete callback function
|
startComplete_fn_t completeFn; // complete callback function
|
||||||
} STaskStartInfo;
|
} STaskStartInfo;
|
||||||
|
|
||||||
typedef struct STaskUpdateInfo {
|
typedef struct STaskUpdateInfo {
|
||||||
|
@ -504,7 +504,7 @@ typedef struct SStreamMeta {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
int32_t role;
|
int32_t role;
|
||||||
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
|
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
|
||||||
STaskStartInfo startInfo;
|
STaskStartInfo startInfo;
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock lock;
|
||||||
SScanWalInfo scanInfo;
|
SScanWalInfo scanInfo;
|
||||||
|
@ -678,18 +678,18 @@ typedef struct STaskStatusEntry {
|
||||||
int32_t statusLastDuration; // to record the last duration of current status
|
int32_t statusLastDuration; // to record the last duration of current status
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
int32_t nodeId;
|
int32_t nodeId;
|
||||||
int64_t verStart; // start version in WAL, only valid for source task
|
int64_t verStart; // start version in WAL, only valid for source task
|
||||||
int64_t verEnd; // end version in WAL, only valid for source task
|
int64_t verEnd; // end version in WAL, only valid for source task
|
||||||
int64_t processedVer; // only valid for source task
|
int64_t processedVer; // only valid for source task
|
||||||
int64_t checkpointId; // current active checkpoint id
|
int64_t checkpointId; // current active checkpoint id
|
||||||
int32_t chkpointTransId; // checkpoint trans id
|
int32_t chkpointTransId; // checkpoint trans id
|
||||||
int8_t checkpointFailed; // denote if the checkpoint is failed or not
|
int8_t checkpointFailed; // denote if the checkpoint is failed or not
|
||||||
bool inputQChanging; // inputQ is changing or not
|
bool inputQChanging; // inputQ is changing or not
|
||||||
int64_t inputQUnchangeCounter;
|
int64_t inputQUnchangeCounter;
|
||||||
double inputQUsed; // in MiB
|
double inputQUsed; // in MiB
|
||||||
double inputRate;
|
double inputRate;
|
||||||
double sinkQuota; // existed quota size for sink task
|
double sinkQuota; // existed quota size for sink task
|
||||||
double sinkDataSize; // sink to dst data size
|
double sinkDataSize; // sink to dst data size
|
||||||
} STaskStatusEntry;
|
} STaskStatusEntry;
|
||||||
|
|
||||||
typedef struct SStreamHbMsg {
|
typedef struct SStreamHbMsg {
|
||||||
|
@ -720,8 +720,8 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
|
||||||
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
|
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg);
|
||||||
|
|
||||||
typedef struct SStreamTaskState {
|
typedef struct SStreamTaskState {
|
||||||
ETaskStatus state;
|
ETaskStatus state;
|
||||||
char* name;
|
char* name;
|
||||||
} SStreamTaskState;
|
} SStreamTaskState;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -839,7 +839,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
|
||||||
// stream task meta
|
// stream task meta
|
||||||
void streamMetaInit();
|
void streamMetaInit();
|
||||||
void streamMetaCleanup();
|
void streamMetaCleanup();
|
||||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn);
|
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage,
|
||||||
|
startComplete_fn_t fn);
|
||||||
void streamMetaClose(SStreamMeta* streamMeta);
|
void streamMetaClose(SStreamMeta* streamMeta);
|
||||||
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); // save to stream meta store
|
||||||
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
|
||||||
|
@ -858,22 +859,22 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||||
void streamMetaStartHb(SStreamMeta* pMeta);
|
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||||
int64_t endTs, bool ready);
|
int64_t endTs, bool ready);
|
||||||
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta);
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWLock(SStreamMeta* pMeta);
|
void streamMetaWLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWUnLock(SStreamMeta* pMeta);
|
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||||
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
||||||
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
||||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||||
tmr_h streamTimerGetInstance();
|
tmr_h streamTimerGetInstance();
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
@ -890,8 +891,8 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
|
||||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
||||||
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
||||||
|
|
||||||
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req);
|
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq* req);
|
||||||
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
|
void sendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -776,8 +776,8 @@ typedef enum {
|
||||||
GRANT_STATE_REASON_MAX,
|
GRANT_STATE_REASON_MAX,
|
||||||
} EGrantStateReason;
|
} EGrantStateReason;
|
||||||
|
|
||||||
#define GRANT_STATE_NUM 30
|
#define GRANT_STATE_NUM 30
|
||||||
#define GRANT_ACTIVE_NUM 10
|
#define GRANT_ACTIVE_NUM 10
|
||||||
#define GRANT_ACTIVE_HEAD_LEN 30
|
#define GRANT_ACTIVE_HEAD_LEN 30
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -812,7 +812,7 @@ typedef struct {
|
||||||
int64_t id : 24;
|
int64_t id : 24;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
char machine[TSDB_MACHINE_ID_LEN + 1];
|
char machine[TSDB_MACHINE_ID_LEN + 1];
|
||||||
} SGrantMachine;
|
} SGrantMachine;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -14,13 +14,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "mndScheduler.h"
|
#include "mndScheduler.h"
|
||||||
#include "tmisce.h"
|
|
||||||
#include "mndMnode.h"
|
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
|
#include "mndMnode.h"
|
||||||
#include "mndSnode.h"
|
#include "mndSnode.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
#include "tmisce.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
#include "tuuid.h"
|
#include "tuuid.h"
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pStream->indexForMultiAggBalance == -1){
|
if (pStream->indexForMultiAggBalance == -1) {
|
||||||
taosSeedRand(taosSafeRand());
|
taosSeedRand(taosSafeRand());
|
||||||
pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
|
pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
|
||||||
}
|
}
|
||||||
|
@ -204,7 +204,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
sdbRelease(pMnode->pSdb, pVgroup);
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (index++ == pStream->indexForMultiAggBalance){
|
if (index++ == pStream->indexForMultiAggBalance) {
|
||||||
pStream->indexForMultiAggBalance++;
|
pStream->indexForMultiAggBalance++;
|
||||||
pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
|
pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
|
@ -217,12 +217,12 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
return pVgroup;
|
return pVgroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup,
|
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
|
||||||
SEpSet* pEpset, bool isFillhistory) {
|
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||||
int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
|
||||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||||
|
|
||||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
|
SStreamTask* pTask =
|
||||||
|
tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -235,12 +235,12 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
|
||||||
return mndSetSinkTaskInfo(pStream, pTask);
|
return mndSetSinkTaskInfo(pStream, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj){
|
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
|
||||||
int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
|
int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if(pStream->conf.fillHistory){
|
if (pStream->conf.fillHistory) {
|
||||||
code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
|
code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -267,7 +267,7 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
|
int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
|
||||||
if(code != 0){
|
if (code != 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -279,7 +279,7 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
|
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||||
SVgroupVer* pVer = taosArrayGet(pList, i);
|
SVgroupVer* pVer = taosArrayGet(pList, i);
|
||||||
if (pVer->vgId == vgId) {
|
if (pVer->vgId == vgId) {
|
||||||
return pVer->ver;
|
return pVer->ver;
|
||||||
|
@ -315,19 +315,18 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe
|
||||||
pRange->range.minVer = latestVer + 1;
|
pRange->range.minVer = latestVer + 1;
|
||||||
pRange->range.maxVer = INT64_MAX;
|
pRange->range.maxVer = INT64_MAX;
|
||||||
|
|
||||||
mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
|
mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
|
||||||
pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
|
pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset,
|
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) {
|
||||||
bool isFillhistory, bool useTriggerParam) {
|
|
||||||
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||||
|
|
||||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset,
|
SStreamTask* pTask =
|
||||||
isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
|
tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||||
*pTaskList, pStream->conf.fillHistory);
|
*pTaskList, pStream->conf.fillHistory);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -335,7 +334,7 @@ static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset,
|
||||||
return pTask;
|
return pTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addNewTaskList(SStreamObj* pStream){
|
static void addNewTaskList(SStreamObj* pStream) {
|
||||||
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
|
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
|
||||||
taosArrayPush(pStream->tasks, &pTaskList);
|
taosArrayPush(pStream->tasks, &pTaskList);
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
|
@ -364,11 +363,11 @@ static void setHTasksId(SStreamObj* pStream) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
|
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
|
||||||
int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){
|
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
|
||||||
// new stream task
|
// new stream task
|
||||||
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||||
if(pTask == NULL){
|
if (pTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -377,15 +376,15 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre
|
||||||
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
|
||||||
|
|
||||||
int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
|
int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
|
||||||
if(code != 0){
|
if (code != 0) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){
|
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
|
||||||
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
|
||||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
|
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
|
||||||
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -400,7 +399,7 @@ static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){
|
||||||
return plan;
|
return plan;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
|
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
|
||||||
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
|
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
|
||||||
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
if (LIST_LENGTH(inner->pNodeList) != 1) {
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
@ -415,8 +414,8 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
|
||||||
return plan;
|
return plan;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream,
|
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
|
||||||
SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
|
int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
|
||||||
addNewTaskList(pStream);
|
addNewTaskList(pStream);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
|
@ -433,15 +432,16 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
|
int code =
|
||||||
if(code != 0){
|
doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
|
||||||
|
if (code != 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
|
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
|
||||||
if(code != 0){
|
if (code != 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -461,9 +461,9 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
|
||||||
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||||
|
|
||||||
SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory,
|
SStreamTask* pAggTask =
|
||||||
useTriggerParam ? pStream->conf.triggerParam : 0,
|
tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||||
*pTaskList, pStream->conf.fillHistory);
|
*pTaskList, pStream->conf.fillHistory);
|
||||||
if (pAggTask == NULL) {
|
if (pAggTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -472,8 +472,8 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
|
||||||
return pAggTask;
|
return pAggTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset,
|
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
|
||||||
SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam){
|
SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
@ -490,7 +490,7 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam){
|
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
SSnodeObj* pSnode = NULL;
|
SSnodeObj* pSnode = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -504,20 +504,20 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
|
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
|
||||||
if(code != 0){
|
if (code != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
|
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
|
||||||
if(code != 0){
|
if (code != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
setHTasksId(pStream);
|
setHTasksId(pStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
if (pSnode != NULL) {
|
if (pSnode != NULL) {
|
||||||
sdbRelease(pMnode->pSdb, pSnode);
|
sdbRelease(pMnode->pSdb, pSnode);
|
||||||
} else {
|
} else {
|
||||||
|
@ -526,7 +526,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){
|
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
addNewTaskList(pStream);
|
addNewTaskList(pStream);
|
||||||
|
|
||||||
|
@ -548,9 +548,9 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){
|
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
|
||||||
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task);
|
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task);
|
||||||
for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
|
for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
|
||||||
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
|
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
|
||||||
streamTaskSetUpstreamInfo(pSinkTask, task);
|
streamTaskSetUpstreamInfo(pSinkTask, task);
|
||||||
}
|
}
|
||||||
|
@ -558,10 +558,10 @@ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSin
|
||||||
}
|
}
|
||||||
|
|
||||||
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
|
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
|
||||||
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
|
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
|
||||||
SArray** pAggTaskList = taosArrayGetLast(tasks);
|
SArray** pAggTaskList = taosArrayGetLast(tasks);
|
||||||
|
|
||||||
for(int i = 0; i < taosArrayGetSize(*pAggTaskList); i++){
|
for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
|
||||||
SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
|
SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
|
||||||
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
|
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
|
||||||
mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
|
mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
|
||||||
|
@ -572,7 +572,7 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b
|
||||||
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
|
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
|
||||||
SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
|
SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
|
||||||
|
|
||||||
for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){
|
for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
|
||||||
SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
|
SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
|
||||||
mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
|
mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
|
||||||
|
|
||||||
|
@ -591,8 +591,8 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
|
||||||
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
|
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
|
||||||
|
|
||||||
SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
|
SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
|
||||||
end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end;
|
end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
|
||||||
for(int i = begin; i < end; i++){
|
for (int i = begin; i < end; i++) {
|
||||||
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
|
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
|
||||||
pUpTask->info.selfChildId = i - begin;
|
pUpTask->info.selfChildId = i - begin;
|
||||||
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
|
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
|
||||||
|
@ -616,8 +616,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
|
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
|
||||||
sdbRelease(pSdb, pDbObj);
|
sdbRelease(pSdb, pDbObj);
|
||||||
|
|
||||||
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s",
|
mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
|
||||||
numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
|
externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
|
||||||
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
||||||
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -632,7 +632,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
|
|
||||||
pStream->totalLevel = numOfPlanLevel + hasExtraSink;
|
pStream->totalLevel = numOfPlanLevel + hasExtraSink;
|
||||||
|
|
||||||
SSubplan* plan = getScanSubPlan(pPlan); // source plan
|
SSubplan* plan = getScanSubPlan(pPlan); // source plan
|
||||||
if (plan == NULL) {
|
if (plan == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -649,32 +649,32 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(numOfPlanLevel == 3){
|
if (numOfPlanLevel == 3) {
|
||||||
plan = getAggSubPlan(pPlan, 1); // middle agg plan
|
plan = getAggSubPlan(pPlan, 1); // middle agg plan
|
||||||
if (plan == NULL) {
|
if (plan == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
do{
|
do {
|
||||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||||
float size = (float)taosArrayGetSize(*list);
|
float size = (float)taosArrayGetSize(*list);
|
||||||
size_t cnt = (size_t)ceil(size/tsStreamAggCnt);
|
size_t cnt = (size_t)ceil(size / tsStreamAggCnt);
|
||||||
if(cnt <= 1) break;
|
if (cnt <= 1) break;
|
||||||
|
|
||||||
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
|
mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
|
||||||
addNewTaskList(pStream);
|
addNewTaskList(pStream);
|
||||||
|
|
||||||
for(int j = 0; j < cnt; j++){
|
for (int j = 0; j < cnt; j++) {
|
||||||
code = addAggTask(pStream, pMnode, plan, pEpset, false);
|
code = addAggTask(pStream, pMnode, plan, pEpset, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bindTwoLevel(pStream->tasks, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
|
bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
bindTwoLevel(pStream->pHTasksList, j*tsStreamAggCnt, (j+1)*tsStreamAggCnt);
|
bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}while(1);
|
} while (1);
|
||||||
}
|
}
|
||||||
|
|
||||||
plan = getAggSubPlan(pPlan, 0);
|
plan = getAggSubPlan(pPlan, 0);
|
||||||
|
@ -684,7 +684,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
|
|
||||||
mDebug("doScheduleStream add final agg");
|
mDebug("doScheduleStream add final agg");
|
||||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||||
size_t size = taosArrayGetSize(*list);
|
size_t size = taosArrayGetSize(*list);
|
||||||
addNewTaskList(pStream);
|
addNewTaskList(pStream);
|
||||||
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -898,11 +898,11 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
|
|
|
@ -27,7 +27,7 @@
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
#define MND_STREAM_MAX_NUM 60
|
#define MND_STREAM_MAX_NUM 60
|
||||||
|
|
||||||
typedef struct SMStreamNodeCheckMsg {
|
typedef struct SMStreamNodeCheckMsg {
|
||||||
int8_t placeHolder; // // to fix windows compile error, define place holder
|
int8_t placeHolder; // // to fix windows compile error, define place holder
|
||||||
|
@ -192,7 +192,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||||
STREAM_DECODE_OVER:
|
STREAM_DECODE_OVER:
|
||||||
taosMemoryFreeClear(buf);
|
taosMemoryFreeClear(buf);
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
char* p = (pStream == NULL) ? "null" : pStream->name;
|
char *p = (pStream == NULL) ? "null" : pStream->name;
|
||||||
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr());
|
mError("stream:%s, failed to decode from raw:%p since %s", p, pRaw, terrstr());
|
||||||
taosMemoryFreeClear(pRow);
|
taosMemoryFreeClear(pRow);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -802,8 +802,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
|
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
|
||||||
int64_t streamId, int32_t taskId, int32_t transId,
|
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
|
||||||
int8_t mndTrigger) {
|
|
||||||
SStreamCheckpointSourceReq req = {0};
|
SStreamCheckpointSourceReq req = {0};
|
||||||
req.checkpointId = checkpointId;
|
req.checkpointId = checkpointId;
|
||||||
req.nodeId = nodeId;
|
req.nodeId = nodeId;
|
||||||
|
@ -878,11 +877,10 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
int64_t ts = taosGetTimestampMs();
|
int64_t ts = taosGetTimestampMs();
|
||||||
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
||||||
// mWarn("checkpoint interval less than the threshold, ignore it");
|
// mWarn("checkpoint interval less than the threshold, ignore it");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
|
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
|
||||||
if (conflict) {
|
if (conflict) {
|
||||||
mndAddtoCheckpointWaitingList(pStream, checkpointId);
|
mndAddtoCheckpointWaitingList(pStream, checkpointId);
|
||||||
|
@ -1144,7 +1142,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream");
|
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
@ -1570,7 +1568,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream");
|
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
|
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
@ -1590,7 +1588,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
// pause stream
|
// pause stream
|
||||||
taosWLockLatch(&pStream->lock);
|
taosWLockLatch(&pStream->lock);
|
||||||
pStream->status = STREAM_STATUS__PAUSE;
|
pStream->status = STREAM_STATUS__PAUSE;
|
||||||
if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) {
|
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
@ -1617,7 +1615,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
|
||||||
if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){
|
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
|
||||||
terrno = TSDB_CODE_GRANT_EXPIRED;
|
terrno = TSDB_CODE_GRANT_EXPIRED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1659,7 +1657,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
|
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
|
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
@ -2106,10 +2104,10 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
||||||
int32_t num = taosArrayGetSize(pList);
|
int32_t num = taosArrayGetSize(pList);
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
int32_t* pId = taosArrayGet(pList, i);
|
int32_t *pId = taosArrayGet(pList, i);
|
||||||
if (taskId == *pId) {
|
if (taskId == *pId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2122,7 +2120,7 @@ static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numO
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamTaskCheckpointReq req = {0};
|
SStreamTaskCheckpointReq req = {0};
|
||||||
|
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
|
@ -2143,7 +2141,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
mError("failed to find the stream:0x%"PRIx64" not handle the checkpoint req", req.streamId);
|
mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId);
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
|
@ -2151,13 +2149,13 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||||
SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||||
if (pReqTaskList == NULL) {
|
if (pReqTaskList == NULL) {
|
||||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||||
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
|
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
|
||||||
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
|
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
|
||||||
|
|
||||||
pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||||
} else {
|
} else {
|
||||||
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
|
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,15 +33,18 @@ static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSData
|
||||||
int64_t suid);
|
int64_t suid);
|
||||||
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
|
static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks);
|
||||||
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
|
static int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen);
|
||||||
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id);
|
static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock,
|
||||||
|
const char* id);
|
||||||
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo,
|
||||||
const char* dstTableName, int64_t* uid);
|
const char* dstTableName, int64_t* uid);
|
||||||
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id);
|
static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId,
|
||||||
|
const char* id);
|
||||||
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
|
static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid);
|
||||||
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags);
|
static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName,
|
||||||
|
int32_t numOfTags);
|
||||||
static SArray* createDefaultTagColName();
|
static SArray* createDefaultTagColName();
|
||||||
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
||||||
int64_t gid, bool newSubTableRule);
|
int64_t gid, bool newSubTableRule);
|
||||||
|
|
||||||
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||||
const char* pIdStr, bool newSubTableRule) {
|
const char* pIdStr, bool newSubTableRule) {
|
||||||
|
@ -68,10 +71,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
|
||||||
if (varTbName != NULL && varTbName != (void*)-1) {
|
if (varTbName != NULL && varTbName != (void*)-1) {
|
||||||
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||||
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
||||||
if(newSubTableRule &&
|
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) {
|
||||||
!isAutoTableName(name) &&
|
|
||||||
!alreadyAddGroupId(name) &&
|
|
||||||
groupId != 0) {
|
|
||||||
buildCtbNameAddGruopId(name, groupId);
|
buildCtbNameAddGruopId(name, groupId);
|
||||||
}
|
}
|
||||||
} else if (stbFullName) {
|
} else if (stbFullName) {
|
||||||
|
@ -134,7 +134,7 @@ end:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) {
|
static bool tqGetTableInfo(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) {
|
||||||
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
|
void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
|
||||||
if (pVal) {
|
if (pVal) {
|
||||||
*pInfo = *(STableSinkInfo**)pVal;
|
*pInfo = *(STableSinkInfo**)pVal;
|
||||||
|
@ -149,7 +149,7 @@ static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
|
encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
|
||||||
|
|
||||||
SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen };
|
SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen};
|
||||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||||
tqError("failed to put into write-queue since %s", terrstr());
|
tqError("failed to put into write-queue since %s", terrstr());
|
||||||
}
|
}
|
||||||
|
@ -181,14 +181,12 @@ SArray* createDefaultTagColName() {
|
||||||
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
|
||||||
int64_t gid, bool newSubTableRule) {
|
int64_t gid, bool newSubTableRule) {
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
if(newSubTableRule &&
|
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
|
||||||
!isAutoTableName(pDataBlock->info.parTbName) &&
|
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) {
|
||||||
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
|
|
||||||
gid != 0) {
|
|
||||||
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||||
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
|
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
|
||||||
buildCtbNameAddGruopId(pCreateTableReq->name, gid);
|
buildCtbNameAddGruopId(pCreateTableReq->name, gid);
|
||||||
}else{
|
} else {
|
||||||
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -196,17 +194,18 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask,
|
static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock,
|
||||||
int64_t suid) {
|
SStreamTask* pTask, int64_t suid) {
|
||||||
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
|
tqDebug("s-task:%s build create table msg", pTask->id.idStr);
|
||||||
|
|
||||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));;
|
SArray* tagArray = taosArrayInit(4, sizeof(STagVal));
|
||||||
int32_t code = 0;
|
;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SVCreateTbBatchReq reqs = {0};
|
SVCreateTbBatchReq reqs = {0};
|
||||||
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq));
|
||||||
if (NULL == reqs.pArray) {
|
if (NULL == reqs.pArray) {
|
||||||
tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno));
|
tqError("s-task:%s failed to init create table msg, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -262,7 +261,8 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
||||||
ASSERT(gid == *(int64_t*)pGpIdData);
|
ASSERT(gid == *(int64_t*)pGpIdData);
|
||||||
}
|
}
|
||||||
|
|
||||||
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
|
setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
|
||||||
|
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
|
||||||
|
|
||||||
taosArrayPush(reqs.pArray, pCreateTbReq);
|
taosArrayPush(reqs.pArray, pCreateTbReq);
|
||||||
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
|
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
|
||||||
|
@ -274,7 +274,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
|
||||||
tqError("s-task:%s failed to send create table msg", pTask->id.idStr);
|
tqError("s-task:%s failed to send create table msg", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
taosArrayDestroy(tagArray);
|
taosArrayDestroy(tagArray);
|
||||||
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
|
taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
|
||||||
return code;
|
return code;
|
||||||
|
@ -361,7 +361,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
||||||
pExisted->aRowP = pFinal;
|
pExisted->aRowP = pFinal;
|
||||||
|
|
||||||
tqTrace("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
|
tqTrace("s-task:%s rows merged, final rows:%d, uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
|
||||||
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL));
|
(int32_t)taosArrayGetSize(pFinal), pExisted->uid, (pExisted->pCreateTbReq != NULL),
|
||||||
|
(pNew->pCreateTbReq != NULL));
|
||||||
|
|
||||||
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
|
tdDestroySVCreateTbReq(pNew->pCreateTbReq);
|
||||||
taosMemoryFree(pNew->pCreateTbReq);
|
taosMemoryFree(pNew->pCreateTbReq);
|
||||||
|
@ -416,8 +417,8 @@ bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbNam
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->me.ctbEntry.suid != suid) {
|
if (pReader->me.ctbEntry.suid != suid) {
|
||||||
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64,
|
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId,
|
||||||
vgId, ctbName, suid, pReader->me.ctbEntry.suid);
|
ctbName, suid, pReader->me.ctbEntry.suid);
|
||||||
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
|
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -437,10 +438,10 @@ SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, in
|
||||||
taosArrayClear(pTagArray);
|
taosArrayClear(pTagArray);
|
||||||
initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
|
initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
|
||||||
|
|
||||||
STagVal tagVal = { .cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
|
STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
|
||||||
taosArrayPush(pTagArray, &tagVal);
|
taosArrayPush(pTagArray, &tagVal);
|
||||||
|
|
||||||
tTagNew(pTagArray, 1, false, (STag**) &pCreateTbReq->ctb.pTag);
|
tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
|
||||||
|
|
||||||
if (pCreateTbReq->ctb.pTag == NULL) {
|
if (pCreateTbReq->ctb.pTag == NULL) {
|
||||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
|
@ -513,13 +514,13 @@ int32_t buildSubmitMsgImpl(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, i
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
|
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
|
||||||
SRow* pRow1 = *(SRow**) p1;
|
SRow* pRow1 = *(SRow**)p1;
|
||||||
SRow* pRow2 = *(SRow**) p2;
|
SRow* pRow2 = *(SRow**)p2;
|
||||||
|
|
||||||
if (pRow1->ts == pRow2->ts) {
|
if (pRow1->ts == pRow2->ts) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return pRow1->ts > pRow2->ts? 1:-1;
|
return pRow1->ts > pRow2->ts ? 1 : -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -563,7 +564,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
|
||||||
void* colData = colDataGetData(pColData, j);
|
void* colData = colDataGetData(pColData, j);
|
||||||
if (IS_STR_DATA_TYPE(pCol->type)) {
|
if (IS_STR_DATA_TYPE(pCol->type)) {
|
||||||
// address copy, no value
|
// address copy, no value
|
||||||
SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*) varDataVal(colData)};
|
SValue sv = (SValue){.nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
|
||||||
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
|
SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
|
||||||
taosArrayPush(pVals, &cv);
|
taosArrayPush(pVals, &cv);
|
||||||
} else {
|
} else {
|
||||||
|
@ -666,11 +667,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
if (dstTableName[0] == 0) {
|
if (dstTableName[0] == 0) {
|
||||||
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
|
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
|
||||||
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
|
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
|
||||||
}else{
|
} else {
|
||||||
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
|
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && !isAutoTableName(dstTableName) &&
|
||||||
!isAutoTableName(dstTableName) &&
|
!alreadyAddGroupId(dstTableName) && groupId != 0) {
|
||||||
!alreadyAddGroupId(dstTableName) &&
|
|
||||||
groupId != 0) {
|
|
||||||
buildCtbNameAddGruopId(dstTableName, groupId);
|
buildCtbNameAddGruopId(dstTableName, groupId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -693,7 +692,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
|
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
|
||||||
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
|
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
|
||||||
} else {
|
} else {
|
||||||
tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid);
|
tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The auto-create option will always set to be open for those submit messages, which arrive during the period
|
// The auto-create option will always set to be open for those submit messages, which arrive during the period
|
||||||
|
@ -713,8 +712,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal));
|
||||||
|
|
||||||
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||||
pTableData->pCreateTbReq =
|
pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock,
|
||||||
buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
|
pTagArray, pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER);
|
||||||
taosArrayDestroy(pTagArray);
|
taosArrayDestroy(pTagArray);
|
||||||
|
|
||||||
if (pTableData->pCreateTbReq == NULL) {
|
if (pTableData->pCreateTbReq == NULL) {
|
||||||
|
@ -746,12 +745,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
SSubmitTbData* pTableData, const char* id) {
|
SSubmitTbData* pTableData, const char* id) {
|
||||||
int32_t numOfRows = pDataBlock->info.rows;
|
int32_t numOfRows = pDataBlock->info.rows;
|
||||||
|
|
||||||
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64,
|
tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
|
||||||
id, blockIndex + 1, numOfRows, suid);
|
blockIndex + 1, numOfRows, suid);
|
||||||
char* dstTableName = pDataBlock->info.parTbName;
|
char* dstTableName = pDataBlock->info.parTbName;
|
||||||
|
|
||||||
// convert all rows
|
// convert all rows
|
||||||
|
@ -767,14 +766,14 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
|
bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
|
||||||
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
SSDataBlock* p = taosArrayGet(pBlocks, i);
|
||||||
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
|
@ -793,7 +792,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
|
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
|
||||||
numOfBlocks);
|
numOfBlocks);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfBlocks; ++i) {
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
if (streamTaskShouldStop(pTask)) {
|
if (streamTaskShouldStop(pTask)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -832,7 +831,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks);
|
||||||
SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
SHashObj* pTableIndexMap =
|
||||||
|
taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))};
|
||||||
if (submitReq.aSubmitTbData == NULL) {
|
if (submitReq.aSubmitTbData == NULL) {
|
||||||
|
|
|
@ -72,10 +72,11 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
|
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
SRpcMsg* pMsg, bool* pBlockReturned) {
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
*pBlockReturned = false;
|
*pBlockReturned = false;
|
||||||
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
||||||
|
@ -132,7 +133,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, *pOffset);
|
tqInitDataRsp(&dataRsp, *pOffset);
|
||||||
|
@ -156,7 +157,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists
|
dataRsp.reqOffset = *pOffset; // reqOffset represents the current date offset, may be changed if wal not exists
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
|
|
||||||
end : {
|
end : {
|
||||||
|
@ -167,15 +168,15 @@ end : {
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||||
tDeleteMqDataRsp(&dataRsp);
|
tDeleteMqDataRsp(&dataRsp);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, *offset);
|
tqInitTaosxRsp(&taosxRsp, *offset);
|
||||||
|
|
||||||
if (offset->type != TMQ_OFFSET__LOG) {
|
if (offset->type != TMQ_OFFSET__LOG) {
|
||||||
|
@ -211,14 +212,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
int64_t fetchVer = offset->version;
|
int64_t fetchVer = offset->version;
|
||||||
|
|
||||||
uint64_t st = taosGetTimestampMs();
|
uint64_t st = taosGetTimestampMs();
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
ASSERT(savedEpoch <= pRequest->epoch);
|
ASSERT(savedEpoch <= pRequest->epoch);
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(
|
||||||
|
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
|
||||||
|
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +233,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
if (pHead->msgType != TDMT_VND_SUBMIT) {
|
if (pHead->msgType != TDMT_VND_SUBMIT) {
|
||||||
if (totalRows > 0) {
|
if (totalRows > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(
|
||||||
|
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
|
||||||
|
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,9 +262,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 1000)) {
|
if (totalRows >= 4096 || (taosGetTimestampMs() - st > 1000)) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(
|
||||||
|
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,
|
||||||
|
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
} else {
|
} else {
|
||||||
fetchVer++;
|
fetchVer++;
|
||||||
|
@ -279,7 +286,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
||||||
// 1. reset the offset if needed
|
// 1. reset the offset if needed
|
||||||
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
|
if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
|
||||||
// handle the reset offset cases, according to the consumer's choice.
|
// handle the reset offset cases, according to the consumer's choice.
|
||||||
bool blockReturned = false;
|
bool blockReturned = false;
|
||||||
int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -289,7 +296,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
||||||
if (blockReturned) {
|
if (blockReturned) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
} else if(reqOffset.type == 0){ // use the consumer specified offset
|
} else if (reqOffset.type == 0) { // use the consumer specified offset
|
||||||
uError("req offset type is 0");
|
uError("req offset type is 0");
|
||||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
@ -452,8 +459,8 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
|
||||||
|
|
||||||
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
|
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
|
||||||
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (pDelay != NULL) {
|
if (pDelay != NULL) {
|
||||||
*pDelay = 0;
|
*pDelay = 0;
|
||||||
|
|
|
@ -482,7 +482,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
|
|
||||||
pTask->execInfo.created = taosGetTimestampMs();
|
pTask->execInfo.created = taosGetTimestampMs();
|
||||||
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
SDataRange* pRange = &pTask->dataRange;
|
SDataRange* pRange = &pTask->dataRange;
|
||||||
|
|
||||||
// only set the version info for stream tasks without fill-history task
|
// only set the version info for stream tasks without fill-history task
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
@ -756,8 +756,8 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
|
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||||
if (pTask->info.fillHistory == 0) {
|
if (pTask->info.fillHistory == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -864,7 +864,7 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
|
|
||||||
void streamTaskResume(SStreamTask* pTask) {
|
void streamTaskResume(SStreamTask* pTask) {
|
||||||
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
|
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
|
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
|
||||||
streamTaskRestoreStatus(pTask);
|
streamTaskRestoreStatus(pTask);
|
||||||
|
@ -881,9 +881,7 @@ void streamTaskResume(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool streamTaskIsSinkTask(const SStreamTask* pTask) {
|
bool streamTaskIsSinkTask(const SStreamTask* pTask) { return pTask->info.taskLevel == TASK_LEVEL__SINK; }
|
||||||
return pTask->info.taskLevel == TASK_LEVEL__SINK;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
|
@ -311,44 +311,57 @@ class TDTestCase:
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def consumeExcluded(self):
|
def consumeTest(self):
|
||||||
tdSql.execute(f'create topic topic_excluded as database db_taosx')
|
tdSql.execute(f'create database if not exists d1 vgroups 1')
|
||||||
|
tdSql.execute(f'use d1')
|
||||||
|
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
|
||||||
|
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
|
||||||
|
tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)')
|
||||||
|
|
||||||
|
tdSql.query("select * from st")
|
||||||
|
tdSql.checkRows(8)
|
||||||
|
|
||||||
|
tdSql.execute(f'create topic topic_excluded with meta as database d1')
|
||||||
consumer_dict = {
|
consumer_dict = {
|
||||||
"group.id": "g1",
|
"group.id": "g1",
|
||||||
"td.connect.user": "root",
|
"td.connect.user": "root",
|
||||||
"td.connect.pass": "taosdata",
|
"td.connect.pass": "taosdata",
|
||||||
"auto.offset.reset": "earliest",
|
"auto.offset.reset": "earliest",
|
||||||
"msg.consume.excluded": 1
|
|
||||||
}
|
}
|
||||||
consumer = Consumer(consumer_dict)
|
consumer = Consumer(consumer_dict)
|
||||||
|
|
||||||
tdLog.debug("test subscribe topic created by other user")
|
|
||||||
exceptOccured = False
|
|
||||||
try:
|
try:
|
||||||
consumer.subscribe(["topic_excluded"])
|
consumer.subscribe(["topic_excluded"])
|
||||||
except TmqError:
|
except TmqError:
|
||||||
exceptOccured = True
|
|
||||||
|
|
||||||
if exceptOccured:
|
|
||||||
tdLog.exit(f"subscribe error")
|
tdLog.exit(f"subscribe error")
|
||||||
|
|
||||||
|
index = 0
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
res = consumer.poll(1)
|
res = consumer.poll(1)
|
||||||
if not res:
|
if not res:
|
||||||
|
if index != 1:
|
||||||
|
tdLog.exit("consume error")
|
||||||
break
|
break
|
||||||
err = res.error()
|
|
||||||
if err is not None:
|
|
||||||
raise err
|
|
||||||
val = res.value()
|
val = res.value()
|
||||||
|
if val is None:
|
||||||
|
continue
|
||||||
|
cnt = 0;
|
||||||
for block in val:
|
for block in val:
|
||||||
print(block.fetchall())
|
cnt += len(block.fetchall())
|
||||||
|
|
||||||
|
if cnt != 8:
|
||||||
|
tdLog.exit("consume error")
|
||||||
|
|
||||||
|
index += 1
|
||||||
finally:
|
finally:
|
||||||
consumer.close()
|
consumer.close()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
self.consumeTest()
|
||||||
|
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
self.checkWal1VgroupOnlyMeta()
|
self.checkWal1VgroupOnlyMeta()
|
||||||
|
|
||||||
|
@ -362,7 +375,6 @@ class TDTestCase:
|
||||||
self.checkSnapshotMultiVgroups()
|
self.checkSnapshotMultiVgroups()
|
||||||
|
|
||||||
self.checkWalMultiVgroupsWithDropTable()
|
self.checkWalMultiVgroupsWithDropTable()
|
||||||
# self.consumeExcluded()
|
|
||||||
|
|
||||||
self.checkSnapshotMultiVgroupsWithDropTable()
|
self.checkSnapshotMultiVgroupsWithDropTable()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue