fix:conflict from 3.0
This commit is contained in:
commit
ed16b1e8db
|
@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
|
|||
|
||||
static inline bool tmsgIsValid(tmsg_t type) {
|
||||
// static int8_t sz = sizeof(tMsgRangeDict) / sizeof(tMsgRangeDict[0]);
|
||||
int8_t maxSegIdx = TMSG_SEG_CODE(TDMT_MAX_MSG);
|
||||
int8_t maxSegIdx = TMSG_SEG_CODE(TDMT_MAX_MSG_MIN);
|
||||
int segIdx = TMSG_SEG_CODE(type);
|
||||
if (segIdx >= 0 && segIdx < maxSegIdx) {
|
||||
return type < tMsgRangeDict[segIdx];
|
||||
|
@ -3261,6 +3261,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
int64_t reqId;
|
||||
SArray* reqs; // SArray<SClientHbReq>
|
||||
int64_t ipWhiteList;
|
||||
} SClientHbBatchReq;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -41,7 +41,7 @@
|
|||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE)
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
|
||||
#define TD_CLOSE_MSG_SEG(TYPE) TYPE,
|
||||
#define TD_CLOSE_MSG_SEG(TYPE) TYPE##_MAX,
|
||||
int32_t tMsgRangeDict[] = {
|
||||
|
||||
#elif defined(TD_MSG_NUMBER_)
|
||||
|
@ -49,7 +49,7 @@
|
|||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM_MIN,
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE##_NUM, TYPE##_RSP_NUM,
|
||||
#define TD_CLOSE_MSG_SEG(TYPE)
|
||||
|
||||
|
@ -60,7 +60,7 @@
|
|||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM,
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_NUM_MIN,
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP)
|
||||
#define TD_CLOSE_MSG_SEG(type)
|
||||
|
||||
|
@ -99,9 +99,9 @@
|
|||
#undef TD_NEW_MSG_SEG
|
||||
#undef TD_DEF_MSG_TYPE
|
||||
#undef TD_CLOSE_MSG_SEG
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE = ((TYPE##_SEG_CODE) << 8),
|
||||
#define TD_NEW_MSG_SEG(TYPE) TYPE##_MIN = ((TYPE##_SEG_CODE) << 8),
|
||||
#define TD_DEF_MSG_TYPE(TYPE, MSG, REQ, RSP) TYPE, TYPE##_RSP,
|
||||
#define TD_CLOSE_MSG_SEG(TYPE) TYPE,
|
||||
#define TD_CLOSE_MSG_SEG(TYPE) TYPE##_MAX,
|
||||
|
||||
enum { // WARN: new msg should be appended to segment tail
|
||||
#endif
|
||||
|
@ -125,8 +125,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_ENCRYPT_KEY, "create-encrypt-key", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_DND_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_DND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
|
||||
|
@ -225,9 +224,9 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, "stream-checkpoint-remain", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_TIMER, "trim-db-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)
|
||||
|
@ -249,8 +248,9 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_MND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
|
||||
|
@ -305,8 +305,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_ARB_HEARTBEAT, "vnode-arb-hb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_VND_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_VND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
|
||||
|
@ -320,8 +319,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_TASK_NOTIFY, "task-notify", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_SCH_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_SCH_MSG)
|
||||
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_STREAM_MSG) //4 << 8
|
||||
|
@ -341,13 +339,10 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MON_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_MON_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_SYNC_MSG) //6 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
|
||||
|
@ -380,8 +375,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_UNUSED_CODE, "sync-unused", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_ASSIGNED_LEADER, "sync-set-assigned-leader", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_SYNC_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_SYNC_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
||||
|
@ -390,10 +384,8 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_UNUSED, "vnd-stream-unused", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_VND_STREAM_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
|
||||
|
@ -406,19 +398,17 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committedinfo", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_TMQ_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_VND_TMQ_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_MND_ARB_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MAX_MSG)
|
||||
TD_CLOSE_MSG_SEG(TDMT_MAX_MSG)
|
||||
|
||||
#if defined(TD_MSG_NUMBER_)
|
||||
TDMT_MAX
|
||||
|
|
|
@ -41,7 +41,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen);
|
||||
|
||||
void tqSetRestoreVersionInfo(SStreamTask* pTask);
|
||||
int32_t tqExpandStreamTask(SStreamTask* pTask);
|
||||
|
|
|
@ -681,7 +681,8 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI
|
|||
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
||||
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
||||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pInfo, int32_t code);
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pInfo, int32_t code);
|
||||
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
|
||||
|
@ -770,7 +771,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
|||
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
|
||||
int32_t setCode);
|
||||
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
|
||||
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq);
|
||||
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq);
|
||||
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
|
||||
|
||||
// stream task state machine, and event handling
|
||||
|
|
|
@ -85,8 +85,8 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected"
|
||||
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
|
||||
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
|
||||
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
|
||||
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)
|
||||
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
|
||||
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)
|
||||
|
||||
|
||||
|
||||
|
@ -274,8 +274,8 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_MND_PRIVILEDGE_EXIST TAOS_DEF_ERROR_CODE(0, 0x0359)
|
||||
#define TSDB_CODE_MND_USER_HOST_EXIST TAOS_DEF_ERROR_CODE(0, 0x035A)
|
||||
#define TSDB_CODE_MND_USER_HOST_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x035B)
|
||||
#define TSDB_CODE_MND_TOO_MANY_USER_HOST TAOS_DEF_ERROR_CODE(0, 0x035C)
|
||||
#define TSDB_CODE_MND_USER_LOCAL_HOST_NOT_DROP TAOS_DEF_ERROR_CODE(0, 0x035D)
|
||||
#define TSDB_CODE_MND_TOO_MANY_USER_HOST TAOS_DEF_ERROR_CODE(0, 0x035C)
|
||||
#define TSDB_CODE_MND_USER_LOCAL_HOST_NOT_DROP TAOS_DEF_ERROR_CODE(0, 0x035D)
|
||||
|
||||
// mnode-stable-part1
|
||||
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
|
||||
|
@ -294,7 +294,7 @@ int32_t taosGetErrSize();
|
|||
// #define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) // 2.x
|
||||
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
|
||||
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
|
||||
#define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370)
|
||||
// #define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370) // unused
|
||||
|
||||
|
||||
// mnode-func
|
||||
|
@ -516,7 +516,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_VND_DIR_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0534)
|
||||
#define TSDB_CODE_VND_META_DATA_UNSAFE_DELETE TAOS_DEF_ERROR_CODE(0, 0x0535)
|
||||
#define TSDB_CODE_VND_COLUMN_COMPRESS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0536)
|
||||
#define TSDB_CODE_VND_ARB_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0536) // internal
|
||||
#define TSDB_CODE_VND_ARB_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0537) // internal
|
||||
|
||||
// tsdb
|
||||
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
|
||||
|
@ -927,7 +927,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x5101)
|
||||
|
||||
// UTIL
|
||||
#define TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x6000)
|
||||
#define TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x6000)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -135,8 +135,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
|
|||
if (pTscObj->whiteListInfo.fp) {
|
||||
SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
|
||||
int64_t oldVer = atomic_load_64(&whiteListInfo->ver);
|
||||
|
||||
if (oldVer < pRsp->whiteListVer || pRsp->whiteListVer == 0) {
|
||||
if (oldVer != pRsp->whiteListVer) {
|
||||
atomic_store_64(&whiteListInfo->ver, pRsp->whiteListVer);
|
||||
if (whiteListInfo->fp) {
|
||||
(*whiteListInfo->fp)(whiteListInfo->param, &pRsp->whiteListVer, TAOS_NOTIFY_WHITELIST_VER);
|
||||
|
@ -144,6 +143,14 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
|
|||
tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user,
|
||||
oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id);
|
||||
}
|
||||
} else {
|
||||
// Need to update version information to prevent frequent fetching of authentication
|
||||
// information.
|
||||
SWhiteListInfo *whiteListInfo = &pTscObj->whiteListInfo;
|
||||
int64_t oldVer = atomic_load_64(&whiteListInfo->ver);
|
||||
atomic_store_64(&whiteListInfo->ver, pRsp->whiteListVer);
|
||||
tscDebug("update whitelist version of user %s from %" PRId64 " to %" PRId64 ", tscRid:%" PRIi64, pRsp->user,
|
||||
oldVer, atomic_load_64(&whiteListInfo->ver), pTscObj->id);
|
||||
}
|
||||
releaseTscObj(pReq->connKey.tscRid);
|
||||
}
|
||||
|
@ -1052,6 +1059,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int64_t maxIpWhiteVer = 0;
|
||||
void *pIter = NULL;
|
||||
SHbParam param = {0};
|
||||
while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
|
||||
|
@ -1087,8 +1095,11 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
}
|
||||
}
|
||||
|
||||
int64_t ver = atomic_load_64(&pTscObj->whiteListInfo.ver);
|
||||
maxIpWhiteVer = TMAX(maxIpWhiteVer, ver);
|
||||
releaseTscObj(connKey->tscRid);
|
||||
}
|
||||
pBatchReq->ipWhiteList = maxIpWhiteVer;
|
||||
|
||||
return pBatchReq;
|
||||
}
|
||||
|
|
|
@ -1258,8 +1258,12 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
|||
pStmt->bInfo.sBindRowNum = bind->num;
|
||||
}
|
||||
|
||||
qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx,
|
||||
pStmt->bInfo.sBindRowNum);
|
||||
code = qBindStmtSingleColValue(*pDataBlock, pCols, bind, pStmt->exec.pRequest->msgBuf,
|
||||
pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum);
|
||||
if (code) {
|
||||
tscError("qBindStmtSingleColValue failed, error:%s", tstrerror(code));
|
||||
STMT_ERR_RET(code);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t startUs4 = taosGetTimestampUs();
|
||||
|
|
|
@ -485,6 +485,8 @@ int32_t tSerializeSClientHbBatchReq(void *buf, int32_t bufLen, const SClientHbBa
|
|||
SClientHbReq *pReq = taosArrayGet(pBatchReq->reqs, i);
|
||||
if (tSerializeSClientHbReq(&encoder, pReq) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tEncodeI64(&encoder, pBatchReq->ipWhiteList) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -511,6 +513,10 @@ int32_t tDeserializeSClientHbBatchReq(void *buf, int32_t bufLen, SClientHbBatchR
|
|||
taosArrayPush(pBatchReq->reqs, &req);
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
tDecodeI64(&decoder, &pBatchReq->ipWhiteList);
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
|
|
@ -36,8 +36,6 @@ TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP = 35
|
|||
TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP = 36
|
||||
TDMT_DND_CREATE_ENCRYPT_KEY = 37
|
||||
TDMT_DND_CREATE_ENCRYPT_KEY_RSP = 38
|
||||
TDMT_DND_MAX_MSG = 39
|
||||
TDMT_DND_MAX_MSG_RSP = 40
|
||||
TDMT_MND_CONNECT = 257
|
||||
TDMT_MND_CONNECT_RSP = 258
|
||||
TDMT_MND_CREATE_ACCT = 259
|
||||
|
@ -228,12 +226,12 @@ TDMT_MND_PAUSE_STREAM = 443
|
|||
TDMT_MND_PAUSE_STREAM_RSP = 444
|
||||
TDMT_MND_RESUME_STREAM = 445
|
||||
TDMT_MND_RESUME_STREAM_RSP = 446
|
||||
TDMT_MND_STREAM_UPDATE_CHKPT_EVT = 447
|
||||
TDMT_MND_STREAM_UPDATE_CHKPT_EVT_RSP = 448
|
||||
TDMT_MND_STREAM_CHECKPOINT_TIMER = 447
|
||||
TDMT_MND_STREAM_CHECKPOINT_TIMER_RSP = 448
|
||||
TDMT_MND_STREAM_BEGIN_CHECKPOINT = 449
|
||||
TDMT_MND_STREAM_BEGIN_CHECKPOINT_RSP = 450
|
||||
TDMT_MND_STREAM_CHKPT_REPORT = 451
|
||||
TDMT_MND_STREAM_CHKPT_REPORT_RSP = 452
|
||||
TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE = 451
|
||||
TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE_RSP = 452
|
||||
TDMT_MND_STREAM_NODECHANGE_CHECK = 453
|
||||
TDMT_MND_STREAM_NODECHANGE_CHECK_RSP = 454
|
||||
TDMT_MND_TRIM_DB_TIMER = 455
|
||||
|
@ -276,8 +274,10 @@ TDMT_MND_GET_TSMA = 491
|
|||
TDMT_MND_GET_TSMA_RSP = 492
|
||||
TDMT_MND_DROP_TB_WITH_TSMA = 493
|
||||
TDMT_MND_DROP_TB_WITH_TSMA_RSP = 494
|
||||
TDMT_MND_MAX_MSG = 495
|
||||
TDMT_MND_MAX_MSG_RSP = 496
|
||||
TDMT_MND_STREAM_UPDATE_CHKPT_EVT = 495
|
||||
TDMT_MND_STREAM_UPDATE_CHKPT_EVT_RSP = 496
|
||||
TDMT_MND_STREAM_CHKPT_REPORT = 497
|
||||
TDMT_MND_STREAM_CHKPT_REPORT_RSP = 498
|
||||
TDMT_VND_SUBMIT = 513
|
||||
TDMT_VND_SUBMIT_RSP = 514
|
||||
TDMT_VND_CREATE_TABLE = 515
|
||||
|
@ -382,8 +382,6 @@ TDMT_VND_ARB_CHECK_SYNC = 613
|
|||
TDMT_VND_ARB_CHECK_SYNC_RSP = 614
|
||||
TDMT_VND_FETCH_TTL_EXPIRED_TBS = 615
|
||||
TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP = 616
|
||||
TDMT_VND_MAX_MSG = 617
|
||||
TDMT_VND_MAX_MSG_RSP = 618
|
||||
TDMT_SCH_QUERY = 769
|
||||
TDMT_SCH_QUERY_RSP = 770
|
||||
TDMT_SCH_MERGE_QUERY = 771
|
||||
|
@ -406,8 +404,6 @@ TDMT_SCH_LINK_BROKEN = 787
|
|||
TDMT_SCH_LINK_BROKEN_RSP = 788
|
||||
TDMT_SCH_TASK_NOTIFY = 789
|
||||
TDMT_SCH_TASK_NOTIFY_RSP = 790
|
||||
TDMT_SCH_MAX_MSG = 791
|
||||
TDMT_SCH_MAX_MSG_RSP = 792
|
||||
TDMT_STREAM_TASK_DEPLOY = 1025
|
||||
TDMT_STREAM_TASK_DEPLOY_RSP = 1026
|
||||
TDMT_STREAM_TASK_DROP = 1027
|
||||
|
@ -440,10 +436,6 @@ TDMT_STREAM_DROP = 1053
|
|||
TDMT_STREAM_DROP_RSP = 1054
|
||||
TDMT_STREAM_RETRIEVE_TRIGGER = 1055
|
||||
TDMT_STREAM_RETRIEVE_TRIGGER_RSP = 1056
|
||||
TDMT_STREAM_MAX_MSG = 1057
|
||||
TDMT_STREAM_MAX_MSG_RSP = 1058
|
||||
TDMT_MON_MAX_MSG = 1281
|
||||
TDMT_MON_MAX_MSG_RSP = 1282
|
||||
TDMT_SYNC_TIMEOUT = 1537
|
||||
TDMT_SYNC_TIMEOUT_RSP = 1538
|
||||
TDMT_SYNC_TIMEOUT_ELECTION = 1539
|
||||
|
@ -504,8 +496,6 @@ TDMT_SYNC_FORCE_FOLLOWER = 1593
|
|||
TDMT_SYNC_FORCE_FOLLOWER_RSP = 1594
|
||||
TDMT_SYNC_SET_ASSIGNED_LEADER = 1595
|
||||
TDMT_SYNC_SET_ASSIGNED_LEADER_RSP = 1596
|
||||
TDMT_SYNC_MAX_MSG = 1597
|
||||
TDMT_SYNC_MAX_MSG_RSP = 1598
|
||||
TDMT_VND_STREAM_SCAN_HISTORY = 1793
|
||||
TDMT_VND_STREAM_SCAN_HISTORY_RSP = 1794
|
||||
TDMT_VND_STREAM_CHECK_POINT_SOURCE = 1795
|
||||
|
@ -520,8 +510,6 @@ TDMT_VND_STREAM_UNUSED = 1803
|
|||
TDMT_VND_STREAM_UNUSED_RSP = 1804
|
||||
TDMT_VND_GET_STREAM_PROGRESS = 1805
|
||||
TDMT_VND_GET_STREAM_PROGRESS_RSP = 1806
|
||||
TDMT_VND_STREAM_MAX_MSG = 1807
|
||||
TDMT_VND_STREAM_MAX_MSG_RSP = 1808
|
||||
TDMT_VND_TMQ_SUBSCRIBE = 2049
|
||||
TDMT_VND_TMQ_SUBSCRIBE_RSP = 2050
|
||||
TDMT_VND_TMQ_DELETE_SUB = 2051
|
||||
|
@ -542,8 +530,6 @@ TDMT_VND_TMQ_VG_WALINFO = 2065
|
|||
TDMT_VND_TMQ_VG_WALINFO_RSP = 2066
|
||||
TDMT_VND_TMQ_VG_COMMITTEDINFO = 2067
|
||||
TDMT_VND_TMQ_VG_COMMITTEDINFO_RSP = 2068
|
||||
TDMT_VND_TMQ_MAX_MSG = 2069
|
||||
TDMT_VND_TMQ_MAX_MSG_RSP = 2070
|
||||
TDMT_MND_ARB_HEARTBEAT_TIMER = 2305
|
||||
TDMT_MND_ARB_HEARTBEAT_TIMER_RSP = 2306
|
||||
TDMT_MND_ARB_CHECK_SYNC_TIMER = 2307
|
||||
|
@ -552,5 +538,3 @@ TDMT_MND_ARB_UPDATE_GROUP = 2309
|
|||
TDMT_MND_ARB_UPDATE_GROUP_RSP = 2310
|
||||
TDMT_MND_ARB_UPDATE_GROUP_BATCH = 2311
|
||||
TDMT_MND_ARB_UPDATE_GROUP_BATCH_RSP = 2312
|
||||
TDMT_MND_ARB_MAX_MSG = 2313
|
||||
TDMT_MND_ARB_MAX_MSG_RSP = 2314
|
||||
|
|
|
@ -58,8 +58,8 @@ static void dmConvertErrCode(tmsg_t msgType) {
|
|||
if (terrno != TSDB_CODE_APP_IS_STOPPING) {
|
||||
return;
|
||||
}
|
||||
if ((msgType > TDMT_VND_MSG && msgType < TDMT_VND_MAX_MSG) ||
|
||||
(msgType > TDMT_SCH_MSG && msgType < TDMT_SCH_MAX_MSG)) {
|
||||
if ((msgType > TDMT_VND_MSG_MIN && msgType < TDMT_VND_MSG_MAX) ||
|
||||
(msgType > TDMT_SCH_MSG_MIN && msgType < TDMT_SCH_MSG_MAX)) {
|
||||
terrno = TSDB_CODE_VND_STOPPED;
|
||||
}
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
|||
|
||||
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
SDnode *pDnode = dmInstance();
|
||||
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) {
|
||||
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
if (pDnode->status == DND_STAT_INIT) {
|
||||
|
@ -296,7 +296,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
|||
}
|
||||
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
SDnode *pDnode = dmInstance();
|
||||
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) {
|
||||
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG_MIN) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
if (pDnode->status == DND_STAT_INIT) {
|
||||
|
|
|
@ -38,7 +38,7 @@ SHashObj *mndDupDbHash(SHashObj *pOld);
|
|||
SHashObj *mndDupTableHash(SHashObj *pOld);
|
||||
SHashObj *mndDupTopicHash(SHashObj *pOld);
|
||||
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
|
||||
int32_t *pRspLen);
|
||||
int32_t *pRspLen, int64_t ipWhiteListVer);
|
||||
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
|
||||
int32_t mndUserRemoveStb(SMnode *pMnode, STrans *pTrans, char *stb);
|
||||
int32_t mndUserRemoveView(SMnode *pMnode, STrans *pTrans, char *view);
|
||||
|
|
|
@ -542,6 +542,8 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
|
|||
int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
|
||||
if (contLen < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
sdbCancelFetch(pMnode->pSdb, pDetail);
|
||||
sdbRelease(pMnode->pSdb, pDetail);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -1125,11 +1125,22 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
if (pNewDb->cfg.withArbitrator) {
|
||||
SArbGroup arbGroup = {0};
|
||||
mndArbGroupInitFromVgObj(&newVgroup, &arbGroup);
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1;
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
taosArrayDestroy(pArray);
|
||||
return -1;
|
||||
}
|
||||
|
||||
} else {
|
||||
SArbGroup arbGroup = {0};
|
||||
mndArbGroupInitFromVgObj(pVgroup, &arbGroup);
|
||||
if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1;
|
||||
if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
taosArrayDestroy(pArray);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -537,7 +537,7 @@ SMqSubscribeObj *tNewSubscribeObj(const char *key) {
|
|||
memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
taosInitRWLatch(&pSubObj->lock);
|
||||
pSubObj->vgNum = 0;
|
||||
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
|
@ -557,7 +557,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
|||
pSubNew->withMeta = pSub->withMeta;
|
||||
|
||||
pSubNew->vgNum = pSub->vgNum;
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
void *pIter = NULL;
|
||||
|
|
|
@ -1351,7 +1351,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int8_t updateIpWhiteList = 0;
|
||||
mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
|
||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE) != 0) {
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
|
@ -1386,6 +1386,9 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
goto _err_out;
|
||||
}
|
||||
if (strncasecmp(dcfgReq.config, "enableWhiteList", strlen("enableWhiteList")) == 0) {
|
||||
updateIpWhiteList = 1;
|
||||
}
|
||||
|
||||
if (cfgCheckRangeForDynUpdate(taosGetCfg(), dcfgReq.config, dcfgReq.value, true) != 0) goto _err_out;
|
||||
}
|
||||
|
@ -1399,7 +1402,11 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
|
||||
return mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
|
||||
int32_t code = mndSendCfgDnodeReq(pMnode, cfgReq.dnodeId, &dcfgReq);
|
||||
|
||||
// dont care suss or succ;
|
||||
if (updateIpWhiteList) mndRefreshUserIpWhiteList(pMnode);
|
||||
return code;
|
||||
|
||||
_err_out:
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
|
|
|
@ -50,7 +50,7 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
|
|||
pRsp->sysInfo = pUser->sysInfo;
|
||||
pRsp->version = pUser->authVersion;
|
||||
pRsp->passVer = pUser->passVersion;
|
||||
pRsp->whiteListVer = mndGetUserIpWhiteListVer(pMnode, pUser);
|
||||
pRsp->whiteListVer = pMnode->ipWhiteVer;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ typedef struct {
|
|||
int32_t onlineDnodes;
|
||||
SEpSet epSet;
|
||||
SArray *pQnodeList;
|
||||
int64_t ipWhiteListVer;
|
||||
} SConnPreparedObj;
|
||||
|
||||
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
|
||||
|
@ -299,12 +300,12 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
connectRsp.svrTimestamp = taosGetTimestampSec();
|
||||
connectRsp.passVer = pUser->passVersion;
|
||||
connectRsp.authVer = pUser->authVersion;
|
||||
connectRsp.whiteListVer = mndGetUserIpWhiteListVer(pMnode, pUser);
|
||||
connectRsp.monitorParas.tsEnableMonitor = tsEnableMonitor;
|
||||
connectRsp.monitorParas.tsMonitorInterval = tsMonitorInterval;
|
||||
connectRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
|
||||
connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
|
||||
connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
|
||||
connectRsp.whiteListVer = pUser->ipWhiteListVer;
|
||||
|
||||
strcpy(connectRsp.sVer, version);
|
||||
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
|
||||
|
@ -572,7 +573,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
case HEARTBEAT_KEY_USER_AUTHINFO: {
|
||||
void *rspMsg = NULL;
|
||||
int32_t rspLen = 0;
|
||||
mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
|
||||
mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen,
|
||||
pObj->ipWhiteListVer);
|
||||
if (rspMsg && rspLen > 0) {
|
||||
SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
|
||||
taosArrayPush(hbRsp.info, &kv1);
|
||||
|
@ -654,6 +656,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
|
|||
|
||||
SConnPreparedObj obj = {0};
|
||||
obj.totalDnodes = mndGetDnodeSize(pMnode);
|
||||
obj.ipWhiteListVer = batchReq.ipWhiteList;
|
||||
mndGetOnlineDnodeNum(pMnode, &obj.onlineDnodes);
|
||||
mndGetMnodeEpSet(pMnode, &obj.epSet);
|
||||
mndCreateQnodeList(pMnode, &obj.pQnodeList, -1);
|
||||
|
|
|
@ -884,7 +884,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
|||
}
|
||||
|
||||
if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
|
||||
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1148,7 +1148,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
|||
}
|
||||
|
||||
if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
|
||||
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1414,7 +1414,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
|||
}
|
||||
|
||||
if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags) {
|
||||
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1806,7 +1806,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
|
|||
}
|
||||
|
||||
if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols) {
|
||||
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
|
||||
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -1075,7 +1075,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
|||
mWarn("not all vnodes ready, quit from vnodes status check");
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
return 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||
|
@ -1911,9 +1911,51 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
bool updated = taskNodeIsUpdated(pMnode);
|
||||
if (updated) {
|
||||
mError("tasks are not ready for pause, node update detected");
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
{ // check for tasks, if tasks are not ready, not allowed to pause
|
||||
bool found = false;
|
||||
bool readyToPause = true;
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||
|
||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->id.streamId != pStream->uid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status == TASK_STATUS__UNINIT || pEntry->status == TASK_STATUS__CK) {
|
||||
mError("stream:%s uid:0x%" PRIx64 " vgId:%d task:0x%" PRIx64 " status:%s, not ready for pause", pStream->name,
|
||||
pStream->uid, pEntry->nodeId, pEntry->id.taskId, streamTaskGetStatusStr(pEntry->status));
|
||||
readyToPause = false;
|
||||
}
|
||||
|
||||
found = true;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
if (!found) {
|
||||
mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!readyToPause) {
|
||||
mError("stream:%s task not ready for pause yet", pauseReq.name);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
STrans *pTrans =
|
||||
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream");
|
||||
if (pTrans == NULL) {
|
||||
|
|
|
@ -799,6 +799,29 @@ static int32_t initRebOutput(SMqRebOutputObj *rebOutput) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// This function only works when there are dirty consumers
|
||||
static void checkConsumer(SMnode *pMnode, SMqSubscribeObj* pSub){
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId);
|
||||
if (pConsumer != NULL) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
continue;
|
||||
}
|
||||
mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
|
||||
taosArrayAddAll(pSub->unassignedVgs, pConsumerEp->vgs);
|
||||
|
||||
taosArrayDestroy(pConsumerEp->vgs);
|
||||
taosHashRemove(pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput){
|
||||
const char *key = rebInput->pRebInfo->key;
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key);
|
||||
|
@ -834,8 +857,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
|
|||
mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
|
||||
} else {
|
||||
taosRLockLatch(&pSub->lock);
|
||||
rebInput->oldConsumerNum = taosHashGetSize(pSub->consumerHash);
|
||||
rebOutput->pSub = tCloneSubscribeObj(pSub);
|
||||
checkConsumer(pMnode, rebOutput->pSub);
|
||||
rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
|
||||
mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
|
||||
|
@ -910,6 +934,7 @@ END:
|
|||
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
|
||||
void* pIter = NULL;
|
||||
SVgObj* pVgObj = NULL;
|
||||
int32_t ret = 0;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj);
|
||||
if (pIter == NULL) {
|
||||
|
@ -923,8 +948,8 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
|
|||
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
||||
if(pReq == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
sdbRelease(pMnode->pSdb, pVgObj);
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto END;
|
||||
}
|
||||
pReq->head.vgId = htonl(pVgObj->vgId);
|
||||
pReq->vgId = pVgObj->vgId;
|
||||
|
@ -940,33 +965,50 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
|
|||
|
||||
sdbRelease(pMnode->pSdb, pVgObj);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
END:
|
||||
sdbRelease(pMnode->pSdb, pVgObj);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
int ret = 0;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (strcmp(cgroup, pConsumer->cgroup) == 0 && taosArrayGetSize(pConsumer->currentTopics) == 0) {
|
||||
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
||||
if (code != 0) {
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return code;
|
||||
// drop consumer in lost status, other consumers not in lost status already deleted by rebalance
|
||||
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
continue;
|
||||
}
|
||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||
if (strcmp(topic, name) == 0) {
|
||||
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
||||
if (code != 0) {
|
||||
ret = code;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
}
|
||||
return 0;
|
||||
|
||||
END:
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||
|
|
|
@ -2805,7 +2805,7 @@ static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter) {
|
|||
}
|
||||
|
||||
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
|
||||
int32_t *pRspLen) {
|
||||
int32_t *pRspLen, int64_t ipWhiteListVer) {
|
||||
SUserAuthBatchRsp batchRsp = {0};
|
||||
batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserAuthRsp));
|
||||
if (batchRsp.pArray == NULL) {
|
||||
|
@ -2827,7 +2827,7 @@ int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_
|
|||
}
|
||||
|
||||
pUsers[i].version = ntohl(pUsers[i].version);
|
||||
if (pUser->authVersion <= pUsers[i].version) {
|
||||
if (pUser->authVersion <= pUsers[i].version && ipWhiteListVer == pMnode->ipWhiteVer) {
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -70,6 +70,8 @@ const char *sdbTableName(ESdbType type) {
|
|||
return "compact";
|
||||
case SDB_COMPACT_DETAIL:
|
||||
return "compact_detail";
|
||||
case SDB_GRANT:
|
||||
return "grant";
|
||||
case SDB_ARBGROUP:
|
||||
return "arb_group";
|
||||
default:
|
||||
|
|
|
@ -154,7 +154,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
|||
case TDMT_STREAM_TASK_RESUME:
|
||||
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
|
||||
case TDMT_STREAM_TASK_UPDATE_CHKPT:
|
||||
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
|
||||
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen);
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -1013,7 +1013,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen);
|
||||
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
|
|
|
@ -640,7 +640,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) {
|
||||
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
|
||||
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
@ -652,13 +652,14 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, in
|
|||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
|
||||
if (ppTask != NULL && (*ppTask) != NULL) {
|
||||
streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq);
|
||||
streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
|
||||
} else { // failed to get the task.
|
||||
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
|
||||
vgId, pReq->taskId);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
// always return success when handling the requirement issued by mnode during transaction.
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -853,7 +854,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
} else if (pState->state == TASK_STATUS__UNINIT) {
|
||||
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
||||
ASSERT(pTask->status.downstreamReady == 0);
|
||||
// /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
|
||||
} else {
|
||||
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
|
||||
|
@ -883,7 +883,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
|
||||
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
|
||||
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_STREAM_TASK_IVLD_STATUS);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -901,7 +901,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
// re-send the lost checkpoint-trigger msg to downstream task
|
||||
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
|
||||
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_SUCCESS);
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_SUCCESS);
|
||||
} else { // not send checkpoint-trigger yet, wait
|
||||
int32_t recv = 0, total = 0;
|
||||
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
|
||||
|
@ -914,7 +914,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
"sending checkpoint-source/trigger",
|
||||
pTask->id.idStr, recv, total);
|
||||
}
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
}
|
||||
} else { // upstream not recv the checkpoint-source/trigger till now
|
||||
ASSERT(pState->state == TASK_STATUS__READY || pState->state == TASK_STATUS__HALT);
|
||||
|
@ -922,7 +922,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
|
||||
"upstream sending checkpoint-source/trigger",
|
||||
pTask->id.idStr);
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
@ -998,8 +998,18 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
|||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SINK) {
|
||||
if (status == TASK_STATUS__UNINIT) {
|
||||
}
|
||||
ASSERT (status != TASK_STATUS__UNINIT); /*{
|
||||
// tqDebug("s-task:%s initialize the uninit sink stream task after resume from pause", pTask->id.idStr);
|
||||
//
|
||||
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
|
||||
// int32_t code = pMeta->expandTaskFn(pTask);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
|
||||
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
||||
// }
|
||||
// }
|
||||
// int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
}*/
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1025,11 +1035,21 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
|||
} else {
|
||||
streamTrySchedExec(pTask);
|
||||
}
|
||||
} else if (status == TASK_STATUS__UNINIT) {
|
||||
// todo: fill-history task init ?
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
}
|
||||
} else {
|
||||
ASSERT (status != TASK_STATUS__UNINIT);// { // todo: fill-history task init ?
|
||||
// if (pTask->info.fillHistory == 0) {
|
||||
// tqDebug("s-task:%s initialize the uninit task after resume from pause", pTask->id.idStr);
|
||||
//
|
||||
// if (pTask->pBackend == NULL) { // TODO: add test cases for this
|
||||
// int32_t code = pMeta->expandTaskFn(pTask);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// tqError("s-task:%s vgId:%d failed to expand stream backend", pTask->id.idStr, vgId);
|
||||
// streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
||||
// }
|
||||
// }
|
||||
// int32_t ret = */streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
|
|
@ -410,6 +410,11 @@ int32_t qBindStmtSingleColValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bi
|
|||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
||||
// Column index exceeds the number of columns
|
||||
if (colIdx >= pCols->size && pCol == NULL) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column index exceeds the number of columns");
|
||||
}
|
||||
|
||||
if (bind->buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
|
|
@ -257,7 +257,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
|
|||
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
|
||||
void bkdMgtDestroy(SBkdMgt* bm);
|
||||
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
|
||||
const char* id);
|
||||
|
||||
void* taskAcquireDb(int64_t refId);
|
||||
void taskReleaseDb(int64_t refId);
|
||||
|
|
|
@ -194,7 +194,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
|||
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
|
||||
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
|
||||
|
||||
void streamClearChkptReadyMsg(SStreamTask* pTask);
|
||||
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo);
|
||||
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize);
|
||||
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
|
||||
|
|
|
@ -2053,7 +2053,11 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
|
|||
stInfo("%s newly create db in state-backend", key);
|
||||
// pre create db
|
||||
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
|
||||
if (pTaskDb->db == NULL) goto _EXIT;
|
||||
if (pTaskDb->db == NULL) {
|
||||
stError("%s open state-backend failed, reason:%s", key, err);
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
rocksdb_close(pTaskDb->db);
|
||||
|
||||
if (cfNames != NULL) {
|
||||
|
@ -2181,7 +2185,6 @@ void taskDbDestroy(void* pDb, bool flush) {
|
|||
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
|
||||
|
||||
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
|
||||
int64_t st = taosGetTimestampMs();
|
||||
int32_t code = -1;
|
||||
int64_t refId = pDb->refId;
|
||||
|
||||
|
@ -2202,15 +2205,15 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) {
|
||||
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) {
|
||||
int32_t code = 0;
|
||||
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
|
||||
|
||||
char* temp = taosMemoryCalloc(1, strlen(pDb->path) + 32);
|
||||
sprintf(temp, "%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "tmp", chkpId);
|
||||
sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
|
||||
|
||||
if (taosDirExist(temp)) {
|
||||
cleanDir(temp, "");
|
||||
cleanDir(temp, idStr);
|
||||
} else {
|
||||
taosMkDir(temp);
|
||||
}
|
||||
|
@ -2220,7 +2223,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
|
|||
|
||||
return code;
|
||||
}
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
||||
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) {
|
||||
int32_t code = -1;
|
||||
STaskDbWrapper* pDb = arg;
|
||||
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||
|
@ -2229,7 +2233,7 @@ int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t typ
|
|||
if (utype == DATA_UPLOAD_RSYNC) {
|
||||
code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
||||
} else if (utype == DATA_UPLOAD_S3) {
|
||||
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list);
|
||||
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
|
||||
}
|
||||
taskDbUnRefChkp(pDb, chkpId);
|
||||
return code;
|
||||
|
|
|
@ -58,8 +58,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
|||
}
|
||||
|
||||
if (pInfo->stage < stage) {
|
||||
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64
|
||||
", prev:%" PRId64,
|
||||
stError("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), new stage received:%" PRId64 ", prev:%" PRId64,
|
||||
id, upstreamTaskId, vgId, stage, pInfo->stage);
|
||||
// record the checkpoint failure id and sent to mnode
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
@ -170,13 +169,13 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
|
|||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
||||
if (pTask != NULL) {
|
||||
pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
|
||||
") task:0x%x (vgId:%d), check_status:%d",
|
||||
pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
|
||||
pRsp->status);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
} else {
|
||||
pRsp->status = TASK_DOWNSTREAM_NOT_READY;
|
||||
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
||||
|
|
|
@ -18,16 +18,6 @@
|
|||
#include "streamBackendRocksdb.h"
|
||||
#include "streamInt.h"
|
||||
|
||||
typedef struct {
|
||||
ECHECKPOINT_BACKUP_TYPE type;
|
||||
|
||||
char* taskId;
|
||||
int64_t chkpId;
|
||||
SStreamTask* pTask;
|
||||
int64_t dbRefId;
|
||||
void* pMeta;
|
||||
} SAsyncUploadArg;
|
||||
|
||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
|
||||
|
@ -114,8 +104,15 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo, int32_t code) {
|
||||
SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp));
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pRpcInfo, int32_t code) {
|
||||
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
|
||||
|
||||
void* pBuf = rpcMallocCont(size);
|
||||
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||
|
||||
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
|
||||
|
||||
pRsp->streamId = pTask->id.streamId;
|
||||
pRsp->upstreamTaskId = pTask->id.taskId;
|
||||
pRsp->taskId = dstTaskId;
|
||||
|
@ -130,7 +127,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
|
|||
|
||||
pRsp->rspCode = code;
|
||||
|
||||
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo};
|
||||
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo};
|
||||
tmsgSendRsp(&rspMsg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -408,11 +405,11 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
|||
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
|
||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||
if (clearChkpReadyMsg) {
|
||||
streamClearChkptReadyMsg(pTask);
|
||||
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) {
|
||||
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
|
@ -429,7 +426,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
|
|||
pReq->transId);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
{ // destroy the related fill-history tasks
|
||||
{ // destroy the related fill-history tasks
|
||||
// drop task should not in the meta-lock, and drop the related fill-history task now
|
||||
streamMetaWUnLock(pMeta);
|
||||
if (pReq->dropRelHTask) {
|
||||
|
@ -446,34 +443,42 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
|
|||
|
||||
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
|
||||
|
||||
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64
|
||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||
pInfo->checkpointTime, pReq->checkpointTs);
|
||||
|
||||
if (pStatus->state != TASK_STATUS__DROPPING) {
|
||||
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer);
|
||||
|
||||
pInfo->checkpointId = pReq->checkpointId;
|
||||
pInfo->checkpointVer = pReq->checkpointVer;
|
||||
pInfo->checkpointTime = pReq->checkpointTs;
|
||||
|
||||
streamTaskClearCheckInfo(pTask, false);
|
||||
|
||||
// todo handle error
|
||||
if (pStatus->state == TASK_STATUS__CK) {
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||
} else {
|
||||
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed",
|
||||
pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
|
||||
if ((!restored) && (pStatus->state != TASK_STATUS__CK)) {
|
||||
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||
" failed",
|
||||
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||
}
|
||||
|
||||
if (!restored) { // during restore procedure, do update checkpoint-info
|
||||
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
|
||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||
pInfo->checkpointTime, pReq->checkpointTs);
|
||||
} else { // not in restore status, must be in checkpoint status
|
||||
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
|
||||
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
|
||||
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
|
||||
pInfo->checkpointTime, pReq->checkpointTs);
|
||||
}
|
||||
|
||||
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
||||
pInfo->processedVer <= pReq->checkpointVer);
|
||||
|
||||
pInfo->checkpointId = pReq->checkpointId;
|
||||
pInfo->checkpointVer = pReq->checkpointVer;
|
||||
pInfo->checkpointTime = pReq->checkpointTs;
|
||||
|
||||
streamTaskClearCheckInfo(pTask, true);
|
||||
|
||||
if (pStatus->state == TASK_STATUS__CK) {
|
||||
// todo handle error
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||
} else {
|
||||
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
|
||||
}
|
||||
|
||||
if (pReq->dropRelHTask) {
|
||||
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
|
||||
pReq->taskId, vgId, pReq->hTaskId);
|
||||
|
@ -562,78 +567,70 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t uploadCheckpointData(void* param) {
|
||||
SAsyncUploadArg* pParam = param;
|
||||
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
|
||||
char* path = NULL;
|
||||
int32_t code = 0;
|
||||
SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
|
||||
char* taskStr = pParam->taskId ? pParam->taskId : "NULL";
|
||||
int64_t now = taosGetTimestampMs();
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
const char* idStr = pTask->id.idStr;
|
||||
|
||||
void* pBackend = taskAcquireDb(pParam->dbRefId);
|
||||
if (pBackend == NULL) {
|
||||
stError("s-task:%s failed to acquire db", taskStr);
|
||||
taosMemoryFree(pParam->taskId);
|
||||
taosMemoryFree(pParam);
|
||||
return -1;
|
||||
if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
|
||||
pTask->id.idStr)) != 0) {
|
||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId);
|
||||
}
|
||||
|
||||
if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt,
|
||||
pParam->chkpId, (int8_t)(pParam->type), &path, toDelFiles)) != 0) {
|
||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId);
|
||||
}
|
||||
|
||||
if (pParam->type == DATA_UPLOAD_S3) {
|
||||
if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) {
|
||||
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId);
|
||||
if (type == DATA_UPLOAD_S3) {
|
||||
if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
|
||||
stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId);
|
||||
}
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = streamTaskUploadCheckpoint(pParam->taskId, path);
|
||||
code = streamTaskUploadCheckpoint(idStr, path);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId);
|
||||
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
||||
} else {
|
||||
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path);
|
||||
stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path);
|
||||
}
|
||||
}
|
||||
|
||||
taskReleaseDb(pParam->dbRefId);
|
||||
|
||||
if (code == 0) {
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
int32_t size = taosArrayGetSize(toDelFiles);
|
||||
stDebug("s-task:%s remove redundant %d files", taskStr, size);
|
||||
stDebug("s-task:%s remove redundant %d files", idStr, size);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
char* pName = taosArrayGetP(toDelFiles, i);
|
||||
code = deleteCheckpointFile(pParam->taskId, pName);
|
||||
code = deleteCheckpointFile(idStr, pName);
|
||||
if (code != 0) {
|
||||
stDebug("s-task:%s failed to del file: %s", taskStr, pName);
|
||||
stDebug("s-task:%s failed to remove file: %s", idStr, pName);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s remove redundant files done", taskStr);
|
||||
stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
|
||||
}
|
||||
|
||||
taosArrayDestroyP(toDelFiles, taosMemoryFree);
|
||||
double el = (taosGetTimestampMs() - now) / 1000.0;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path);
|
||||
stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
|
||||
idStr, checkpointId, el, path);
|
||||
taosRemoveDir(path);
|
||||
} else {
|
||||
stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId);
|
||||
stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs",
|
||||
idStr, checkpointId, el);
|
||||
}
|
||||
|
||||
taosMemoryFree(path);
|
||||
taosMemoryFree(pParam->taskId);
|
||||
taosMemoryFree(pParam);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) {
|
||||
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
|
||||
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
|
||||
if (type == DATA_UPLOAD_DISABLE) {
|
||||
stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -641,15 +638,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI
|
|||
return 0;
|
||||
}
|
||||
|
||||
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
|
||||
arg->type = type;
|
||||
arg->taskId = taosStrdup(taskId);
|
||||
arg->chkpId = checkpointId;
|
||||
arg->pTask = pTask;
|
||||
arg->dbRefId = taskGetDBRef(pTask->pBackend);
|
||||
arg->pMeta = pTask->pMeta;
|
||||
int64_t dbRefId = taskGetDBRef(pTask->pBackend);
|
||||
void* pBackend = taskAcquireDb(dbRefId);
|
||||
if (pBackend == NULL) {
|
||||
stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
|
||||
int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
|
||||
taskReleaseDb(dbRefId);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||
|
@ -670,6 +669,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: monitoring the checkpoint-source msg
|
||||
// send check point response to upstream task
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
@ -679,38 +679,39 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo: let's retry send rsp to upstream/mnode
|
||||
// todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
|
||||
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
|
||||
tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
|
||||
if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) {
|
||||
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId);
|
||||
}
|
||||
} else {
|
||||
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
|
||||
}
|
||||
|
||||
// clear the checkpoint info if failed
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// TODO: monitoring the checkpoint-report msg
|
||||
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (pTask->pMsgCb != NULL) {
|
||||
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
|
||||
}
|
||||
} else { // clear the checkpoint info if failed
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
streamTaskClearCheckInfo(pTask, false);
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
|
||||
}
|
||||
|
||||
double el = (taosGetTimestampMs() - startTs) / 1000.0;
|
||||
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id,
|
||||
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
|
||||
pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
|
||||
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
|
||||
|
||||
|
@ -739,14 +740,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
pActiveInfo->checkCounter = 0;
|
||||
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, pTask->id.idStr, vgId, now);
|
||||
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||
if (pState->state != TASK_STATUS__CK) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", pTask->id.idStr,
|
||||
vgId, ref);
|
||||
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
|
@ -756,8 +756,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
// checkpoint-trigger recv flag is set, quit
|
||||
if (pActiveInfo->allUpstreamTriggerRecv) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d",
|
||||
pTask->id.idStr, vgId, ref);
|
||||
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
|
||||
ref);
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
|
@ -815,6 +815,7 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
|||
const char* pId = pTask->id.idStr;
|
||||
int32_t size = taosArrayGetSize(pNotSendList);
|
||||
int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask);
|
||||
int64_t checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
||||
|
||||
if (size <= 0) {
|
||||
stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
|
||||
|
@ -840,15 +841,14 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
|||
pReq->downstreamNodeId = vgId;
|
||||
pReq->upstreamTaskId = pUpstreamTask->taskId;
|
||||
pReq->upstreamNodeId = pUpstreamTask->nodeId;
|
||||
pReq->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
|
||||
|
||||
pReq->checkpointId = checkpointId;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
|
||||
|
||||
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
|
||||
stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId,
|
||||
pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId);
|
||||
pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -781,7 +781,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
// check the status every 100ms
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
@ -795,6 +795,18 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
pActiveInfo->sendReadyCheckCounter = 0;
|
||||
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
SStreamTaskState* pState = streamTaskGetStatus(pTask);
|
||||
if (pState->state != TASK_STATUS__CK) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
|
||||
pState->name, ref);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
taosThreadMutexLock(&pActiveInfo->lock);
|
||||
|
||||
SArray* pList = pActiveInfo->pReadyMsgList;
|
||||
|
@ -844,7 +856,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
"and quit from timer, ref:%d",
|
||||
id, vgId, ref);
|
||||
|
||||
streamClearChkptReadyMsg(pTask);
|
||||
streamClearChkptReadyMsg(pActiveInfo);
|
||||
taosThreadMutexUnlock(&pActiveInfo->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
}
|
||||
|
@ -906,9 +918,9 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
|||
tmsgSendRsp(&pInfo->msg);
|
||||
|
||||
taosArrayClear(pList);
|
||||
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||
stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||
} else {
|
||||
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||
stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
|
||||
|
@ -1116,8 +1128,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
|||
return 0;
|
||||
}
|
||||
|
||||
void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
|
||||
if (pActiveInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -231,7 +231,6 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
|
|||
return 0;
|
||||
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
||||
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
|
||||
|
||||
return streamMetaCvtDbFormat(pMeta);
|
||||
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
||||
stError(
|
||||
|
|
|
@ -240,7 +240,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
if (code == TSDB_CODE_SUCCESS) {
|
||||
checkFillhistoryTaskStatus(pTask, pHisTask);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pHisTask);
|
||||
|
|
|
@ -254,7 +254,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
walCloseReader(pTask->exec.pWalReader);
|
||||
}
|
||||
|
||||
streamClearChkptReadyMsg(pTask);
|
||||
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
||||
|
||||
if (pTask->msgInfo.pData != NULL) {
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
|
@ -836,7 +836,7 @@ static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
|
|||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||
stInfo("vgId:%d s-task:%s pause stream task. paused task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||
|
||||
// in case of fill-history task, stop the tsdb file scan operation.
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
|
|
|
@ -623,9 +623,9 @@ void doInitStateTransferTable(void) {
|
|||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
|
||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
taosArrayPush(streamTaskSMTrans, &trans);
|
||||
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
||||
|
|
|
@ -220,7 +220,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow")
|
||||
// TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow") // unused
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST, "Same with old param")
|
||||
|
||||
|
||||
|
@ -462,7 +462,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitti
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR, "Geometry not support in this operator")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_WINDOW_CONDITION, "The time pseudo column is illegally used in the condition of the event window.")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_JOIN_CONDITION, "Not supported join on condition")
|
||||
|
||||
// grant
|
||||
|
@ -507,7 +506,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED, "License expired for o
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED,"License expired for dual-replica HA function")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED, "License expired for database encryption function")
|
||||
|
||||
|
||||
// sync
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Sync signature mismatch")
|
||||
|
@ -659,7 +657,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not al
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "System table not allowed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VARBINARY, "Invalid varbinary value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_IP_RANGE, "Invalid IPV4 address ranges")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table")
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -73,7 +73,24 @@ pair<string, int32_t> parseKeyValuePair(const string &line, char delim = '=') {
|
|||
key = key.substr(1, key.size() - 2);
|
||||
|
||||
string valStr = line.substr(pos + 1);
|
||||
int32_t val = stoi(valStr);
|
||||
|
||||
// remove leading spaces
|
||||
firstNotSpace = valStr.find_first_not_of(" ");
|
||||
if (firstNotSpace != string::npos) {
|
||||
valStr = valStr.substr(firstNotSpace);
|
||||
} else {
|
||||
valStr.clear();
|
||||
}
|
||||
|
||||
// remove ending spaces
|
||||
lastNotSpace = valStr.find_last_not_of(" ");
|
||||
if (lastNotSpace != string::npos) {
|
||||
valStr = valStr.substr(0, lastNotSpace + 1);
|
||||
}
|
||||
|
||||
valStr = valStr.substr(2);
|
||||
int32_t val = int32_t(std::stol(valStr, &pos, 16));
|
||||
|
||||
return make_pair(key, val);
|
||||
}
|
||||
|
||||
|
@ -188,7 +205,8 @@ void generateConfigFile(const string& filePath) {
|
|||
|
||||
for (int32_t i = 0; i < errSize; ++i) {
|
||||
STaosError *pInfo = &errors[i];
|
||||
file << std::left << std::setw(maxStringLength) << pInfo->macro << "= " << pInfo->val << endl;
|
||||
file << std::left << std::setw(maxStringLength) << pInfo->macro << "= ";
|
||||
file << "0x" << std::uppercase << std::hex << pInfo->val << endl;
|
||||
}
|
||||
|
||||
if (file.fail()) {
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import frame.etool
|
||||
|
||||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame import *
|
||||
from frame.autogen import *
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
|
||||
def td_30642(self):
|
||||
sqls = [
|
||||
"CREATE DATABASE IF NOT EXISTS `_xTest2`",
|
||||
"CREATE USER `_xTest` PASS 'taosdata'",
|
||||
"CREATE TABLE IF NOT EXISTS `_xTest2`.`meters` (ts timestamp, v1 int) tags(t1 int)",
|
||||
|
||||
"CREATE DATABASE IF NOT EXISTS `test2`",
|
||||
"CREATE USER `user1` PASS 'taosdata'",
|
||||
"CREATE TABLE IF NOT EXISTS `test2`.`meters2` (ts timestamp, v1 int) tags(t1 int)"
|
||||
]
|
||||
tdSql.executes(sqls)
|
||||
|
||||
sql1 = 'GRANT read ON `_xTest2`.`meters` WITH (t1 = 1) TO `_xTest`'
|
||||
tdSql.query(sql1)
|
||||
sql1_verify = "select * from information_schema.ins_user_privileges where user_name='_xTest' and privilege='read' and db_name='_xTest2' and table_name='meters'"
|
||||
tdSql.query(sql1_verify)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 4, '(`_xTest2`.`meters`.`t1` = 1)')
|
||||
|
||||
sql2 = 'GRANT write ON test2.meters2 WITH (t1 = 1) TO user1'
|
||||
tdSql.query(sql2)
|
||||
sql2_verify = "select * from information_schema.ins_user_privileges where user_name='user1' and privilege='write' and db_name='test2' and table_name='meters2'"
|
||||
tdSql.query(sql2_verify)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 4, '(`test2`.`meters2`.`t1` = 1)')
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
||||
# TD-30642
|
||||
self.td_30642()
|
||||
|
||||
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -194,11 +194,11 @@ class TDTestCase(TBase):
|
|||
# alter float(c9) double(c10) to tsz
|
||||
comp = "tsz"
|
||||
sql = f"alter table {tbname} modify column c9 COMPRESS '{comp}';"
|
||||
tdSql.execute(sql)
|
||||
tdSql.execute(sql, show=True)
|
||||
self.checkDataDesc(tbname, 10, 5, comp)
|
||||
self.writeData(10000)
|
||||
sql = f"alter table {tbname} modify column c10 COMPRESS '{comp}';"
|
||||
tdSql.execute(sql)
|
||||
tdSql.execute(sql, show=True)
|
||||
self.checkDataDesc(tbname, 11, 5, comp)
|
||||
self.writeData(10000)
|
||||
|
||||
|
@ -207,9 +207,48 @@ class TDTestCase(TBase):
|
|||
for i in range(self.colCnt - 1):
|
||||
col = f"c{i}"
|
||||
sql = f"alter table {tbname} modify column {col} LEVEL '{level}';"
|
||||
tdSql.execute(sql)
|
||||
tdSql.execute(sql, show=True)
|
||||
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||
self.writeData(1000)
|
||||
|
||||
# modify two combine
|
||||
|
||||
|
||||
i = 9
|
||||
encode = "delta-d"
|
||||
compress = "zlib"
|
||||
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}';"
|
||||
tdSql.execute(sql, show=True)
|
||||
self.checkDataDesc(tbname, i + 1, 4, encode)
|
||||
self.checkDataDesc(tbname, i + 1, 5, compress)
|
||||
|
||||
i = 10
|
||||
encode = "delta-d"
|
||||
level = "high"
|
||||
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' LEVEL '{level}';"
|
||||
tdSql.execute(sql, show=True)
|
||||
self.checkDataDesc(tbname, i + 1, 4, encode)
|
||||
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||
|
||||
i = 2
|
||||
compress = "zlib"
|
||||
level = "high"
|
||||
sql = f"alter table {tbname} modify column c{i} COMPRESS '{compress}' LEVEL '{level}';"
|
||||
tdSql.execute(sql, show=True)
|
||||
self.checkDataDesc(tbname, i + 1, 5, compress)
|
||||
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||
|
||||
# modify three combine
|
||||
i = 7
|
||||
encode = "simple8b"
|
||||
compress = "zstd"
|
||||
level = "medium"
|
||||
sql = f"alter table {tbname} modify column c{i} ENCODE '{encode}' COMPRESS '{compress}' LEVEL '{level}';"
|
||||
tdSql.execute(sql, show=True)
|
||||
self.checkDataDesc(tbname, i + 1, 4, encode)
|
||||
self.checkDataDesc(tbname, i + 1, 5, compress)
|
||||
self.checkDataDesc(tbname, i + 1, 6, level)
|
||||
|
||||
# alter error
|
||||
sqls = [
|
||||
"alter table nodb.nostb modify column ts LEVEL 'high';",
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f storage/oneStageComp.py -N 3 -L 3 -D 1
|
||||
,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f grant/grantBugs.py -N 3
|
||||
|
||||
#
|
||||
# system test
|
||||
|
|
|
@ -15,8 +15,38 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
|||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2 from st interval(1s) ;
|
||||
|
||||
print ====check task status start
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loopCheck:
|
||||
|
||||
sleep 1000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 30 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 1 select * from information_schema.ins_stream_tasks;
|
||||
sql select * from information_schema.ins_stream_tasks;
|
||||
|
||||
if $rows == 0 then
|
||||
print rows=$rows
|
||||
goto loopCheck
|
||||
endi
|
||||
|
||||
print 1 select * from information_schema.ins_stream_tasks where status != "ready";
|
||||
sql select * from information_schema.ins_stream_tasks where status != "ready";
|
||||
|
||||
if $rows != 0 then
|
||||
print rows=$rows
|
||||
goto loopCheck
|
||||
endi
|
||||
|
||||
print ====check task status end
|
||||
|
||||
sql insert into t1 values(1648791211000,1,2,3);
|
||||
sql insert into t1 values(1648791212000,2,2,3);
|
||||
|
||||
|
@ -46,8 +76,38 @@ sql alter table streamt1 add column c3 double;
|
|||
|
||||
print create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
|
||||
sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ;
|
||||
|
||||
print ====check task status start
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loopCheck1:
|
||||
|
||||
sleep 1000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 30 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 1 select * from information_schema.ins_stream_tasks;
|
||||
sql select * from information_schema.ins_stream_tasks;
|
||||
|
||||
if $rows == 0 then
|
||||
print rows=$rows
|
||||
goto loopCheck1
|
||||
endi
|
||||
|
||||
print 1 select * from information_schema.ins_stream_tasks where status != "ready";
|
||||
sql select * from information_schema.ins_stream_tasks where status != "ready";
|
||||
|
||||
if $rows != 0 then
|
||||
print rows=$rows
|
||||
goto loopCheck1
|
||||
endi
|
||||
|
||||
print ====check task status end
|
||||
|
||||
sql insert into t2 values(1648791213000,1,2,3);
|
||||
sql insert into t1 values(1648791214000,1,2,3);
|
||||
|
||||
|
|
|
@ -496,7 +496,71 @@ class TDTestCase:
|
|||
consumer.close()
|
||||
print("consume_ts_4551 ok")
|
||||
|
||||
def consume_TS_5067_Test(self):
|
||||
tdSql.execute(f'create database if not exists d1 vgroups 1')
|
||||
tdSql.execute(f'use d1')
|
||||
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
|
||||
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t1 using st tags(1) values(now+5s, 11) (now+10s, 12)')
|
||||
|
||||
tdSql.query("select * from st")
|
||||
tdSql.checkRows(8)
|
||||
|
||||
tdSql.execute(f'create topic t1 as select * from st')
|
||||
tdSql.execute(f'create topic t2 as select * from st')
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["t1"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
index = 0
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
if index != 1:
|
||||
tdLog.exit("consume error")
|
||||
break
|
||||
val = res.value()
|
||||
if val is None:
|
||||
continue
|
||||
cnt = 0;
|
||||
for block in val:
|
||||
cnt += len(block.fetchall())
|
||||
|
||||
if cnt != 8:
|
||||
tdLog.exit("consume error")
|
||||
|
||||
index += 1
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
consumer1 = Consumer(consumer_dict)
|
||||
try:
|
||||
consumer1.subscribe(["t2"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
tdSql.execute(f'drop consumer group g1 on t1')
|
||||
tdSql.query(f'show consumers')
|
||||
tdSql.checkRows(1)
|
||||
consumer1.close()
|
||||
tdSql.execute(f'drop topic t1')
|
||||
tdSql.execute(f'drop topic t2')
|
||||
tdSql.execute(f'drop database d1')
|
||||
|
||||
def run(self):
|
||||
self.consume_TS_5067_Test()
|
||||
self.consumeTest()
|
||||
self.consume_ts_4544()
|
||||
self.consume_ts_4551()
|
||||
|
|
Loading…
Reference in New Issue