Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
This commit is contained in:
commit
04d59ab836
|
@ -1474,7 +1474,6 @@ _err:
|
||||||
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
|
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
|
||||||
// deserialization
|
// deserialization
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t* mqInReb;
|
|
||||||
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
|
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
|
||||||
} SMqDoRebalanceMsg;
|
} SMqDoRebalanceMsg;
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,11 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||||
|
|
||||||
|
bool mndRebTryStart();
|
||||||
|
void mndRebEnd();
|
||||||
|
void mndRebCntInc();
|
||||||
|
void mndRebCntDec();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -36,8 +36,8 @@ typedef struct {
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TEST_TRANS_START_FUNC = 1,
|
TEST_TRANS_START_FUNC = 1,
|
||||||
TEST_TRANS_STOP_FUNC = 2,
|
TEST_TRANS_STOP_FUNC = 2,
|
||||||
CONSUME_TRANS_START_FUNC = 3,
|
MQ_REB_TRANS_START_FUNC = 3,
|
||||||
CONSUME_TRANS_STOP_FUNC = 4,
|
MQ_REB_TRANS_STOP_FUNC = 4,
|
||||||
} ETrnFuncType;
|
} ETrnFuncType;
|
||||||
|
|
||||||
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
|
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
|
|
||||||
#define MND_CONSUMER_LOST_HB_CNT 3
|
#define MND_CONSUMER_LOST_HB_CNT 3
|
||||||
|
|
||||||
static int8_t mqInRebFlag = 0;
|
static int8_t mqRebLock = 0;
|
||||||
|
|
||||||
static const char *mndConsumerStatusName(int status);
|
static const char *mndConsumerStatusName(int status);
|
||||||
|
|
||||||
|
@ -75,6 +75,17 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
|
|
||||||
void mndCleanupConsumer(SMnode *pMnode) {}
|
void mndCleanupConsumer(SMnode *pMnode) {}
|
||||||
|
|
||||||
|
bool mndRebTryStart() {
|
||||||
|
int8_t old = atomic_val_compare_exchange_8(&mqRebLock, 0, 1);
|
||||||
|
return old == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndRebEnd() { atomic_sub_fetch_8(&mqRebLock, 1); }
|
||||||
|
|
||||||
|
void mndRebCntInc() { atomic_add_fetch_8(&mqRebLock, 1); }
|
||||||
|
|
||||||
|
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebLock, 1); }
|
||||||
|
|
||||||
static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) {
|
static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pNode;
|
SMnode *pMnode = pMsg->pNode;
|
||||||
SMqConsumerLostMsg *pLostMsg = pMsg->rpcMsg.pCont;
|
SMqConsumerLostMsg *pLostMsg = pMsg->rpcMsg.pCont;
|
||||||
|
@ -143,8 +154,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
||||||
// rebalance cannot be parallel
|
// rebalance cannot be parallel
|
||||||
int8_t old = atomic_val_compare_exchange_8(&mqInRebFlag, 0, 1);
|
if (!mndRebTryStart()) {
|
||||||
if (old != 0) {
|
|
||||||
mInfo("mq rebalance already in progress, do nothing");
|
mInfo("mq rebalance already in progress, do nothing");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -152,7 +162,6 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
|
||||||
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
|
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
|
||||||
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
||||||
// TODO set cleanfp
|
// TODO set cleanfp
|
||||||
pRebMsg->mqInReb = &mqInRebFlag;
|
|
||||||
|
|
||||||
// iterate all consumers, find all modification
|
// iterate all consumers, find all modification
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -223,7 +232,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
|
||||||
taosHashCleanup(pRebMsg->rebSubHash);
|
taosHashCleanup(pRebMsg->rebSubHash);
|
||||||
rpcFreeCont(pRebMsg);
|
rpcFreeCont(pRebMsg);
|
||||||
mTrace("mq rebalance finished, no modification");
|
mTrace("mq rebalance finished, no modification");
|
||||||
atomic_store_8(&mqInRebFlag, 0);
|
mndRebEnd();
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -308,8 +308,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
// sink part
|
// sink part
|
||||||
if (level == 0) {
|
if (level == 0) {
|
||||||
// only for inplace
|
// only for inplace
|
||||||
pTask->sinkType = TASK_SINK__SHOW;
|
pTask->sinkType = TASK_SINK__NONE;
|
||||||
pTask->showSink.reserved = 0;
|
|
||||||
if (!hasExtraSink) {
|
if (!hasExtraSink) {
|
||||||
#if 1
|
#if 1
|
||||||
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
|
||||||
|
@ -368,8 +367,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pTask->sourceType = TASK_SOURCE__PIPE;
|
pTask->sourceType = TASK_SOURCE__PIPE;
|
||||||
|
|
||||||
// sink part
|
// sink part
|
||||||
pTask->sinkType = TASK_SINK__SHOW;
|
pTask->sinkType = TASK_SINK__NONE;
|
||||||
/*pTask->sinkType = TASK_SINK__NONE;*/
|
|
||||||
|
|
||||||
// dispatch part
|
// dispatch part
|
||||||
ASSERT(hasExtraSink);
|
ASSERT(hasExtraSink);
|
||||||
|
@ -456,7 +454,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pTask->sourceType = TASK_SOURCE__MERGE;
|
pTask->sourceType = TASK_SOURCE__MERGE;
|
||||||
|
|
||||||
// sink part
|
// sink part
|
||||||
pTask->sinkType = TASK_SINK__SHOW;
|
pTask->sinkType = TASK_SINK__NONE;
|
||||||
|
|
||||||
// dispatch part
|
// dispatch part
|
||||||
pTask->dispatchType = TASK_DISPATCH__NONE;
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
|
|
|
@ -72,8 +72,6 @@ static int32_t convertToRetrieveType(char* name, int32_t len) {
|
||||||
// type = TSDB_MGMT_TABLE_INDEX;
|
// type = TSDB_MGMT_TABLE_INDEX;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
|
||||||
type = TSDB_MGMT_TABLE_STB;
|
type = TSDB_MGMT_TABLE_STB;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) {
|
|
||||||
type = TSDB_MGMT_TABLE_STREAMS;
|
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
|
||||||
type = TSDB_MGMT_TABLE_TABLE;
|
type = TSDB_MGMT_TABLE_TABLE;
|
||||||
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
|
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
|
||||||
|
@ -102,6 +100,8 @@ static int32_t convertToRetrieveType(char* name, int32_t len) {
|
||||||
type = TSDB_MGMT_TABLE_VNODES;
|
type = TSDB_MGMT_TABLE_VNODES;
|
||||||
} else if (strncasecmp(name, TSDB_PERFS_TABLE_TOPICS, len) == 0) {
|
} else if (strncasecmp(name, TSDB_PERFS_TABLE_TOPICS, len) == 0) {
|
||||||
type = TSDB_MGMT_TABLE_TOPICS;
|
type = TSDB_MGMT_TABLE_TOPICS;
|
||||||
|
} else if (strncasecmp(name, TSDB_PERFS_TABLE_STREAMS, len) == 0) {
|
||||||
|
type = TSDB_MGMT_TABLE_STREAMS;
|
||||||
} else {
|
} else {
|
||||||
// ASSERT(0);
|
// ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -452,7 +452,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
|
||||||
}
|
}
|
||||||
// 4. TODO commit log: modification log
|
// 4. TODO commit log: modification log
|
||||||
|
|
||||||
// 5. execution
|
// 5. set cb
|
||||||
|
mndTransSetCb(pTrans, MQ_REB_TRANS_START_FUNC, MQ_REB_TRANS_STOP_FUNC, NULL, 0);
|
||||||
|
|
||||||
|
// 6. execution
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -518,9 +521,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset flag
|
// reset flag
|
||||||
atomic_store_8(pReq->mqInReb, 0);
|
|
||||||
mInfo("mq rebalance completed successfully");
|
mInfo("mq rebalance completed successfully");
|
||||||
taosHashCleanup(pReq->rebSubHash);
|
taosHashCleanup(pReq->rebSubHash);
|
||||||
|
mndRebEnd();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndAuth.h"
|
#include "mndAuth.h"
|
||||||
|
#include "mndConsumer.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
|
@ -442,6 +443,10 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) {
|
||||||
return mndTransTestStartFunc;
|
return mndTransTestStartFunc;
|
||||||
case TEST_TRANS_STOP_FUNC:
|
case TEST_TRANS_STOP_FUNC:
|
||||||
return mndTransTestStopFunc;
|
return mndTransTestStopFunc;
|
||||||
|
case MQ_REB_TRANS_START_FUNC:
|
||||||
|
return mndRebCntInc;
|
||||||
|
case MQ_REB_TRANS_STOP_FUNC:
|
||||||
|
return mndRebCntDec;
|
||||||
default:
|
default:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,8 +158,6 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
//
|
//
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||||
//
|
//
|
||||||
} else if (pTask->sinkType == TASK_SINK__SHOW) {
|
|
||||||
blockDebugShowData(pRes);
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||||
}
|
}
|
||||||
|
@ -280,8 +278,6 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
||||||
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||||
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
|
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__SHOW) {
|
|
||||||
if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1;
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||||
}
|
}
|
||||||
|
@ -326,8 +322,6 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
||||||
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||||
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__SHOW) {
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1;
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue