Merge pull request #26468 from taosdata/fix/TS-4331-3.0

protection when alter stt_trigger
This commit is contained in:
Hongze Cheng 2024-07-09 11:14:44 +08:00 committed by GitHub
commit 0b241fa3fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 126 additions and 88 deletions

View File

@ -965,20 +965,20 @@ int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
typedef struct {
int32_t acctId;
int64_t clusterId;
uint32_t connId;
int32_t dnodeNum;
int8_t superUser;
int8_t sysInfo;
int8_t connType;
SEpSet epSet;
int32_t svrTimestamp;
int32_t passVer;
int32_t authVer;
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
int64_t whiteListVer;
int32_t acctId;
int64_t clusterId;
uint32_t connId;
int32_t dnodeNum;
int8_t superUser;
int8_t sysInfo;
int8_t connType;
SEpSet epSet;
int32_t svrTimestamp;
int32_t passVer;
int32_t authVer;
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
int64_t whiteListVer;
SMonitorParas monitorParas;
} SConnectRsp;
@ -1638,15 +1638,15 @@ void tFreeSFuncInfo(SFuncInfo* pInfo);
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
typedef struct {
int32_t statusInterval;
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[TD_TIMEZONE_LEN]; // tsTimezone
char locale[TD_LOCALE_LEN]; // tsLocale
char charset[TD_LOCALE_LEN]; // tsCharset
int8_t ttlChangeOnWrite;
int8_t enableWhiteList;
int8_t encryptionKeyStat;
uint32_t encryptionKeyChksum;
int32_t statusInterval;
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[TD_TIMEZONE_LEN]; // tsTimezone
char locale[TD_LOCALE_LEN]; // tsLocale
char charset[TD_LOCALE_LEN]; // tsCharset
int8_t ttlChangeOnWrite;
int8_t enableWhiteList;
int8_t encryptionKeyStat;
uint32_t encryptionKeyChksum;
SMonitorParas monitorParas;
} SClusterCfg;
@ -1745,9 +1745,9 @@ typedef enum {
} MONITOR_TYPE;
typedef struct {
int32_t contLen;
char* pCont;
MONITOR_TYPE type;
int32_t contLen;
char* pCont;
MONITOR_TYPE type;
} SStatisReq;
int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
@ -3035,8 +3035,8 @@ typedef struct {
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
} SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
void tDeleteSVCreateTbBatchReq(SVCreateTbBatchReq* pReq);
typedef struct {
@ -3275,10 +3275,10 @@ typedef struct {
} SClientHbRsp;
typedef struct {
int64_t reqId;
int64_t rspId;
int32_t svrTimestamp;
SArray* rsps; // SArray<SClientHbRsp>
int64_t reqId;
int64_t rspId;
int32_t svrTimestamp;
SArray* rsps; // SArray<SClientHbRsp>
SMonitorParas monitorParas;
} SClientHbBatchRsp;
@ -3514,7 +3514,7 @@ typedef struct SVUpdateCheckpointInfoReq {
int64_t checkpointVer;
int64_t checkpointTs;
int32_t transId;
int64_t hStreamId; // add encode/decode
int64_t hStreamId; // add encode/decode
int64_t hTaskId;
int8_t dropRelHTask;
} SVUpdateCheckpointInfoReq;
@ -3993,7 +3993,7 @@ int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp);
void tDeleteSTaosxRsp(void* pRsp);
typedef struct SMqBatchMetaRsp {
SMqRspHead head; // not serialize
SMqRspHead head; // not serialize
STqOffsetVal rspOffset;
SArray* batchMetaLen;
SArray* batchMetaReq;

View File

@ -46,9 +46,11 @@ const char* terrstr();
char* taosGetErrMsgReturn();
char* taosGetErrMsg();
int32_t* taosGetErrno();
int32_t* taosGetErrln();
int32_t taosGetErrSize();
#define terrno (*taosGetErrno())
#define terrMsg (taosGetErrMsg())
#define terrln (*taosGetErrln())
#define SET_ERROR_MSG(MSG, ...) \
snprintf(terrMsg, ERR_MSG_LEN, MSG, ##__VA_ARGS__)
@ -145,6 +147,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134)
#define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135)
#define TSDB_CODE_MSG_PREPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0136) // internal
#define TSDB_CODE_OUT_OF_BUFFER TAOS_DEF_ERROR_CODE(0, 0x0137)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)

View File

@ -117,6 +117,15 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
}
}
#define TAOS_CHECK_ERRNO(CODE) \
do { \
terrno = (CODE); \
if (terrno != TSDB_CODE_SUCCESS) { \
terrln = __LINE__; \
goto _exit; \
} \
} while (0)
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
do { \
if (TSDB_CODE_SUCCESS != (CODE)) { \

View File

@ -69,7 +69,7 @@
pReq->sql = NULL; \
} while (0)
static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas* pMonitorParas) {
static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas *pMonitorParas) {
if (tEncodeI8(encoder, pMonitorParas->tsEnableMonitor) < 0) return -1;
if (tEncodeI32(encoder, pMonitorParas->tsMonitorInterval) < 0) return -1;
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogScope) < 0) return -1;
@ -80,7 +80,7 @@ static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas* p
return 0;
}
static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas* pMonitorParas){
static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas *pMonitorParas) {
if (tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor) < 0) return -1;
if (tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval) < 0) return -1;
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope) < 0) return -1;
@ -1577,7 +1577,7 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) {
if (tDecodeCStrTo(&decoder, pReq->pCont) < 0) return -1;
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, (int8_t*)&pReq->type) < 0) return -1;
if (tDecodeI8(&decoder, (int8_t *)&pReq->type) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -5737,65 +5737,74 @@ _exit:
}
int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) {
int32_t tlen;
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
if (tEncodeI32(&encoder, pReq->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pages) < 0) return -1;
if (tEncodeI32(&encoder, pReq->cacheLastSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep1) < 0) return -1;
if (tEncodeI32(&encoder, pReq->daysToKeep2) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walFsyncPeriod) < 0) return -1;
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
TAOS_CHECK_ERRNO(tStartEncode(&encoder));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->vgVersion));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->buffer));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->pageSize));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->pages));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->cacheLastSize));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->daysPerFile));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->daysToKeep0));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->daysToKeep1));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->daysToKeep2));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->walFsyncPeriod));
TAOS_CHECK_ERRNO(tEncodeI8(&encoder, pReq->walLevel));
TAOS_CHECK_ERRNO(tEncodeI8(&encoder, pReq->strict));
TAOS_CHECK_ERRNO(tEncodeI8(&encoder, pReq->cacheLast));
for (int32_t i = 0; i < 7; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
TAOS_CHECK_ERRNO(tEncodeI64(&encoder, pReq->reserved[i]));
}
// 1st modification
if (tEncodeI16(&encoder, pReq->sttTrigger) < 0) return -1;
if (tEncodeI32(&encoder, pReq->minRows) < 0) return -1;
TAOS_CHECK_ERRNO(tEncodeI16(&encoder, pReq->sttTrigger));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->minRows));
// 2nd modification
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->walRetentionPeriod));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->walRetentionSize));
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->keepTimeOffset));
if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1;
if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1;
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, pReq->s3KeepLocal));
TAOS_CHECK_ERRNO(tEncodeI8(&encoder, pReq->s3Compact));
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
_exit:
if (terrno) {
uError("%s failed at line %d since %s", __func__, terrln, terrstr());
tlen = -1;
} else {
tlen = encoder.pos;
}
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->cacheLastSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep1) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->daysToKeep2) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walFsyncPeriod) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
TAOS_CHECK_ERRNO(tStartDecode(&decoder));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->vgVersion));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->buffer));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->pageSize));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->pages));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->cacheLastSize));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->daysPerFile));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->daysToKeep0));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->daysToKeep1));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->daysToKeep2));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->walFsyncPeriod));
TAOS_CHECK_ERRNO(tDecodeI8(&decoder, &pReq->walLevel));
TAOS_CHECK_ERRNO(tDecodeI8(&decoder, &pReq->strict));
TAOS_CHECK_ERRNO(tDecodeI8(&decoder, &pReq->cacheLast));
for (int32_t i = 0; i < 7; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
TAOS_CHECK_ERRNO(tDecodeI64(&decoder, &pReq->reserved[i]));
}
// 1st modification
@ -5803,8 +5812,8 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC
pReq->sttTrigger = -1;
pReq->minRows = -1;
} else {
if (tDecodeI16(&decoder, &pReq->sttTrigger) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->minRows) < 0) return -1;
TAOS_CHECK_ERRNO(tDecodeI16(&decoder, &pReq->sttTrigger));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->minRows));
}
// 2n modification
@ -5812,24 +5821,29 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC
pReq->walRetentionPeriod = -1;
pReq->walRetentionSize = -1;
} else {
if (tDecodeI32(&decoder, &pReq->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->walRetentionSize) < 0) return -1;
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->walRetentionPeriod));
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->walRetentionSize));
}
pReq->keepTimeOffset = TSDB_DEFAULT_KEEP_TIME_OFFSET;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1;
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->keepTimeOffset));
}
pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1;
TAOS_CHECK_ERRNO(tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0);
TAOS_CHECK_ERRNO(tDecodeI8(&decoder, &pReq->s3Compact) < 0);
}
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
return 0;
if (terrno) {
uError("%s failed at line %d since %s", __func__, terrln, terrstr());
}
return terrno;
}
int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeReplicaReq *pReq) {
@ -9296,7 +9310,7 @@ int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
}
void tDeleteSTqCheckInfo(STqCheckInfo *pInfo) { taosArrayDestroy(pInfo->colIdList); }
int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) {
int32_t tEncodeSMqRebVgReq(SEncoder *pCoder, const SMqRebVgReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1;
if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1;
@ -9316,7 +9330,7 @@ int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) {
return 0;
}
int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) {
int32_t tDecodeSMqRebVgReq(SDecoder *pCoder, SMqRebVgReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1;
@ -9341,7 +9355,6 @@ int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) {
return 0;
}
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList);

View File

@ -738,7 +738,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
}
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME ||
pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) &&
@ -1978,6 +1978,9 @@ _exit:
return code;
}
extern int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb);
extern int32_t tsdbEnableBgTask(STsdb *pTsdb);
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
bool walChanged = false;
bool tsdbChanged = false;
@ -2075,7 +2078,14 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pRe
}
if (req.sttTrigger != -1 && req.sttTrigger != pVnode->config.sttTrigger) {
pVnode->config.sttTrigger = req.sttTrigger;
if (req.sttTrigger > 1 && pVnode->config.sttTrigger > 1) {
pVnode->config.sttTrigger = req.sttTrigger;
} else {
vnodeAWait(&pVnode->commitTask);
tsdbDisableAndCancelAllBgTask(pVnode->pTsdb);
pVnode->config.sttTrigger = req.sttTrigger;
tsdbEnableBgTask(pVnode->pTsdb);
}
}
if (req.minRows != -1 && req.minRows != pVnode->config.tsdbCfg.minRows) {

View File

@ -22,10 +22,12 @@
#define TAOS_ERROR_C
static threadlocal int32_t tsErrno;
static threadlocal int32_t tsErrln;
static threadlocal char tsErrMsgDetail[ERR_MSG_LEN] = {0};
static threadlocal char tsErrMsgReturn[ERR_MSG_LEN] = {0};
int32_t* taosGetErrno() { return &tsErrno; }
int32_t* taosGetErrln() { return &tsErrln; }
char* taosGetErrMsg() { return tsErrMsgDetail; }
char* taosGetErrMsgReturn() { return tsErrMsgReturn; }
@ -104,6 +106,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_BUFFER, "Out of buffer")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")