Merge branch 'main' into feature/stream

This commit is contained in:
Liu Jicong 2022-12-17 18:30:03 +08:00 committed by GitHub
commit 233f742b41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 38 additions and 15 deletions

View File

@ -89,6 +89,7 @@ create stream if not exists s1 fill_history 1 into st1 as select count(*) from
If some streams are totally outdated, and you do not want it to monitor or process anymore, those streams can be manually dropped and output data will be still kept. If some streams are totally outdated, and you do not want it to monitor or process anymore, those streams can be manually dropped and output data will be still kept.
## Delete a Stream ## Delete a Stream
```sql ```sql

View File

@ -58,8 +58,7 @@ typedef enum {
#define QUERY_RSP_POLICY_QUICK 1 #define QUERY_RSP_POLICY_QUICK 1
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) #define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) #define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
@ -128,7 +127,7 @@ typedef struct SDBVgInfo {
int8_t hashMethod; int8_t hashMethod;
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs; int64_t stateTs;
SHashObj* vgHash; // key:vgId, value:SVgroupInfo SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SArray* vgArray; SArray* vgArray;
} SDBVgInfo; } SDBVgInfo;
@ -262,23 +261,26 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING) #define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) \
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || \
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND) (_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define NEED_REDIRECT_ERROR(_code) \ #define NEED_REDIRECT_ERROR(_code) \
(NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ (NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code)) SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code))
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \ ((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB) (_type) == TDMT_MND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
(SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code)) (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code))
#define REQUEST_TOTAL_EXEC_TIMES 2 #define REQUEST_TOTAL_EXEC_TIMES 2

View File

@ -214,6 +214,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
case SNODE: case SNODE:
terrno = TSDB_CODE_SNODE_NOT_FOUND; terrno = TSDB_CODE_SNODE_NOT_FOUND;
break; break;
case VNODE:
terrno = TSDB_CODE_VND_STOPPED;
break;
default: default:
terrno = TSDB_CODE_APP_IS_STOPPING; terrno = TSDB_CODE_APP_IS_STOPPING;
break; break;

View File

@ -663,7 +663,9 @@ _OVER:
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
SEpSet epSet = {0}; SEpSet epSet = {0};
int32_t tmpCode = terrno;
mndGetMnodeEpSet(pMnode, &epSet); mndGetMnodeEpSet(pMnode, &epSet);
terrno = tmpCode;
mGDebug( mGDebug(
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d " "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "

View File

@ -927,7 +927,8 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} }
} else { } else {
if (pTrans->stage == TRN_STAGE_REDO_ACTION) { if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING) { if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY) {
if (pTrans->failedTimes > 60) sendRsp = true; if (pTrans->failedTimes > 60) sendRsp = true;
} else { } else {
if (pTrans->failedTimes > 6) sendRsp = true; if (pTrans->failedTimes > 6) sendRsp = true;
@ -1336,6 +1337,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
} }
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode)) return false;
terrno = code;
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;

View File

@ -115,7 +115,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
@ -256,7 +255,9 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
} }
tjsonGetNumberValue(pJson, "tsdbPageSize", pCfg->tsdbPageSize, code); tjsonGetNumberValue(pJson, "tsdbPageSize", pCfg->tsdbPageSize, code);
if (code < 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE * 1024; if (code < 0 || pCfg->tsdbPageSize < TSDB_MIN_PAGESIZE_PER_VNODE * 1024) {
pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE * 1024;
}
return 0; return 0;
} }

View File

@ -1598,7 +1598,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) { code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true); noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont); transFreeMsg(pResp->pCont);

View File

@ -464,7 +464,9 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockWrlock(&(pFile->rwlock)); taosThreadRwlockWrlock(&(pFile->rwlock));
#endif #endif
assert(pFile->fd >= 0); // Please check if you have closed the file. if (pFile->fd < 0) {
return 0;
}
int64_t nleft = count; int64_t nleft = count;
int64_t nwritten = 0; int64_t nwritten = 0;

View File

@ -445,6 +445,9 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) { static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) {
if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) { if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) {
taosUpdateLogNums(level); taosUpdateLogNums(level);
#if 0
// DEBUG_FATAL and DEBUG_ERROR are duplicated
// fsync will cause thread blocking and may also generate log misalignment in case of asyncLog
if (tsAsyncLog && level != DEBUG_FATAL) { if (tsAsyncLog && level != DEBUG_FATAL) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len); taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else { } else {
@ -453,6 +456,13 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
taosFsyncFile(tsLogObj.logHandle->pFile); taosFsyncFile(tsLogObj.logHandle->pFile);
} }
} }
#else
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
}
#endif
if (tsLogObj.maxLines > 0) { if (tsLogObj.maxLines > 0) {
atomic_add_fetch_32(&tsLogObj.lines, 1); atomic_add_fetch_32(&tsLogObj.lines, 1);