commit
09a384acac
|
@ -28,7 +28,7 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -62,6 +62,23 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
const char* sql = "select * from tu1";
|
||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
||||
|
@ -193,6 +210,7 @@ int main(int argc, char* argv[]) {
|
|||
printf("env init\n");
|
||||
code = init_env();
|
||||
}
|
||||
create_topic();
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
/*perf_loop(tmq, topic_list);*/
|
||||
|
|
|
@ -745,6 +745,7 @@ typedef struct {
|
|||
int8_t cacheLastRow;
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
int8_t streamMode;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
} SCreateVnodeReq, SAlterVnodeReq;
|
||||
|
||||
|
@ -1759,6 +1760,11 @@ typedef struct {
|
|||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqOffset;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
SArray* offsets; // SArray<SMqOffset>
|
||||
} SMqVgOffsets;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SMqOffset* offsets;
|
||||
|
@ -1769,8 +1775,8 @@ typedef struct {
|
|||
} SMqCMResetOffsetRsp;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SMqOffset* offsets;
|
||||
int64_t leftForVer;
|
||||
SMqVgOffsets offsets;
|
||||
} SMqMVResetOffsetReq;
|
||||
|
||||
typedef struct {
|
||||
|
@ -1781,7 +1787,6 @@ int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
|
|||
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
|
||||
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq);
|
||||
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq);
|
||||
|
||||
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
|
||||
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);
|
||||
|
||||
|
|
|
@ -269,7 +269,7 @@ tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offse
|
|||
tsem_wait(¶m.rspSem);
|
||||
tsem_destroy(¶m.rspSem);
|
||||
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
return param.rspErr;
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||
|
|
|
@ -45,6 +45,8 @@ int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
|||
}
|
||||
|
||||
int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||
ASSERT(pIter->len >= 0);
|
||||
|
||||
if (pIter->len == 0) {
|
||||
pIter->len += sizeof(SSubmitReq);
|
||||
} else {
|
||||
|
@ -2109,6 +2111,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
|||
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->streamMode) < 0) return -1;
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
SReplica *pReplica = &pReq->replicas[i];
|
||||
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||
|
@ -2148,6 +2151,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->streamMode) < 0) return -1;
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
SReplica *pReplica = &pReq->replicas[i];
|
||||
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
||||
|
@ -2350,6 +2354,34 @@ int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqVgOffsets(SCoder *encoder, const SMqVgOffsets *pOffsets) {
|
||||
if (tStartEncode(encoder) < 0) return -1;
|
||||
if (tEncodeI32(encoder, pOffsets->vgId) < 0) return -1;
|
||||
int32_t sz = taosArrayGetSize(pOffsets->offsets);
|
||||
if (tEncodeI32(encoder, sz) < 0) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqOffset *offset = taosArrayGet(pOffsets->offsets, i);
|
||||
if (tEncodeSMqOffset(encoder, offset) < 0) return -1;
|
||||
}
|
||||
tEndEncode(encoder);
|
||||
return encoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSMqVgOffsets(SCoder *decoder, SMqVgOffsets *pOffsets) {
|
||||
int32_t sz;
|
||||
if (tStartDecode(decoder) < 0) return -1;
|
||||
if (tDecodeI32(decoder, &pOffsets->vgId) < 0) return -1;
|
||||
if (tDecodeI32(decoder, &sz) < 0) return -1;
|
||||
pOffsets->offsets = taosArrayInit(sz, sizeof(SMqOffset));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqOffset offset;
|
||||
if (tDecodeSMqOffset(decoder, &offset) < 0) return -1;
|
||||
taosArrayPush(pOffsets->offsets, &offset);
|
||||
}
|
||||
tEndDecode(decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) {
|
||||
if (tStartEncode(encoder) < 0) return -1;
|
||||
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
|
||||
|
@ -2361,17 +2393,20 @@ int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *p
|
|||
}
|
||||
|
||||
int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
|
||||
if (tStartDecode(decoder) < 0) return -1;
|
||||
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||
if (pReq->offsets == NULL) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
|
||||
}
|
||||
tEndDecode(decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) {
|
||||
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
|
||||
if (tEncodeI64(encoder, pReq->leftForVer) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
|
||||
}
|
||||
|
@ -2387,3 +2422,4 @@ int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -507,6 +507,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
pCfg->isHeapAllocator = true;
|
||||
pCfg->ttl = 4;
|
||||
pCfg->keep = pCreate->daysToKeep0;
|
||||
pCfg->streamMode = pCreate->streamMode;
|
||||
pCfg->isWeak = true;
|
||||
pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
|
||||
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
|
||||
|
|
|
@ -300,6 +300,7 @@ typedef struct {
|
|||
int8_t quorum;
|
||||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
int8_t streamMode;
|
||||
} SDbCfg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -336,6 +337,7 @@ typedef struct {
|
|||
int64_t pointsWritten;
|
||||
int8_t compact;
|
||||
int8_t replica;
|
||||
int8_t streamMode;
|
||||
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
||||
} SVgObj;
|
||||
|
||||
|
@ -583,13 +585,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
|||
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
|
||||
if (pSub->consumers) {
|
||||
taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
|
||||
//taosArrayDestroy(pSub->consumers);
|
||||
// taosArrayDestroy(pSub->consumers);
|
||||
pSub->consumers = NULL;
|
||||
}
|
||||
|
||||
if (pSub->unassignedVg) {
|
||||
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
//taosArrayDestroy(pSub->unassignedVg);
|
||||
// taosArrayDestroy(pSub->unassignedVg);
|
||||
pSub->unassignedVg = NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -395,7 +395,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat
|
|||
dbObj.vgVersion = 1;
|
||||
dbObj.hashMethod = 1;
|
||||
memcpy(dbObj.createUser, pUser->user, TSDB_USER_LEN);
|
||||
dbObj.cfg = (SDbCfg){.numOfVgroups = pCreate->numOfVgroups,
|
||||
dbObj.cfg = (SDbCfg){
|
||||
.numOfVgroups = pCreate->numOfVgroups,
|
||||
.cacheBlockSize = pCreate->cacheBlockSize,
|
||||
.totalBlocks = pCreate->totalBlocks,
|
||||
.daysPerFile = pCreate->daysPerFile,
|
||||
|
@ -412,7 +413,9 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat
|
|||
.replications = pCreate->replications,
|
||||
.quorum = pCreate->quorum,
|
||||
.update = pCreate->update,
|
||||
.cacheLastRow = pCreate->cacheLastRow};
|
||||
.cacheLastRow = pCreate->cacheLastRow,
|
||||
.streamMode = pCreate->streamMode,
|
||||
};
|
||||
|
||||
mndSetDefaultDbCfg(&dbObj.cfg);
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
|||
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg);
|
||||
|
||||
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
||||
const SMqConsumerEp *pConsumerEp);
|
||||
|
@ -205,6 +206,45 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
uint8_t *str = pMsg->rpcMsg.pCont;
|
||||
SMqCMResetOffsetReq req;
|
||||
|
||||
SCoder decoder;
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, str, pMsg->rpcMsg.contLen, TD_DECODER);
|
||||
tDecodeSMqCMResetOffsetReq(&decoder, &req);
|
||||
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (pHash == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < req.num; i++) {
|
||||
SMqOffset *pOffset = &req.offsets[i];
|
||||
SMqVgOffsets *pVgOffset = taosHashGet(pHash, &pOffset->vgId, sizeof(int32_t));
|
||||
if (pVgOffset == NULL) {
|
||||
pVgOffset = malloc(sizeof(SMqVgOffsets));
|
||||
if (pVgOffset == NULL) {
|
||||
return -1;
|
||||
}
|
||||
pVgOffset->offsets = taosArrayInit(0, sizeof(void *));
|
||||
taosArrayPush(pVgOffset->offsets, &pOffset);
|
||||
}
|
||||
taosHashPut(pHash, &pOffset->vgId, sizeof(int32_t), &pVgOffset, sizeof(void *));
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("mq-reset-offset: failed since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
|
||||
|
@ -1059,6 +1099,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
pConsumerEp->consumerId = consumerId;
|
||||
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
|
||||
if (pConsumerEp->oldConsumerId == -1) {
|
||||
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, newTopicName,
|
||||
pConsumerEp->consumerId);
|
||||
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||
} else {
|
||||
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
||||
|
|
|
@ -214,6 +214,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
createReq.cacheLastRow = pDb->cfg.cacheLastRow;
|
||||
createReq.replica = pVgroup->replica;
|
||||
createReq.selfIndex = -1;
|
||||
createReq.streamMode = pVgroup->streamMode;
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SReplica *pReplica = &createReq.replicas[v];
|
||||
|
@ -255,8 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
return pReq;
|
||||
}
|
||||
|
||||
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup,
|
||||
int32_t *pContLen) {
|
||||
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
||||
SDropVnodeReq dropReq = {0};
|
||||
dropReq.dnodeId = pDnode->id;
|
||||
dropReq.vgId = pVgroup->vgId;
|
||||
|
@ -399,6 +399,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
|||
pVgroup->createdTime = taosGetTimestampMs();
|
||||
pVgroup->updateTime = pVgroups->createdTime;
|
||||
pVgroup->version = 1;
|
||||
pVgroup->streamMode = pDb->cfg.streamMode;
|
||||
pVgroup->hashBegin = hashMin + hashInterval * v;
|
||||
if (v == pDb->cfg.numOfVgroups - 1) {
|
||||
pVgroup->hashEnd = hashMax;
|
||||
|
|
|
@ -51,6 +51,7 @@ typedef struct {
|
|||
bool isHeapAllocator;
|
||||
uint32_t ttl;
|
||||
uint32_t keep;
|
||||
int8_t streamMode;
|
||||
bool isWeak;
|
||||
STsdbCfg tsdbCfg;
|
||||
SMetaCfg metaCfg;
|
||||
|
|
|
@ -43,13 +43,17 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
|||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
SVCreateTbReq vCreateTbReq;
|
||||
SVCreateTbBatchReq vCreateTbBatchReq;
|
||||
void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
||||
void *ptr = NULL;
|
||||
|
||||
if (pVnode->config.streamMode == 0) {
|
||||
ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
||||
if (ptr == NULL) {
|
||||
// TODO: handle error
|
||||
}
|
||||
|
||||
// TODO: copy here need to be extended
|
||||
memcpy(ptr, pMsg->pCont, pMsg->contLen);
|
||||
}
|
||||
|
||||
// todo: change the interface here
|
||||
int64_t ver;
|
||||
|
@ -109,17 +113,19 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
// }
|
||||
break;
|
||||
case TDMT_VND_SUBMIT:
|
||||
if (pVnode->config.streamMode == 0) {
|
||||
if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_MQ_SET_CONN: {
|
||||
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) {
|
||||
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
} break;
|
||||
case TDMT_VND_MQ_REB: {
|
||||
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) {
|
||||
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||
}
|
||||
} break;
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue