Merge pull request #15950 from taosdata/feature/stream
enh(tmq): enable background heartbeat by default
This commit is contained in:
commit
4352ae4f90
|
@ -1369,8 +1369,8 @@ typedef struct {
|
|||
int32_t numOfCols;
|
||||
int64_t skey;
|
||||
int64_t ekey;
|
||||
int64_t version; // for stream
|
||||
TSKEY watermark;// for stream
|
||||
int64_t version; // for stream
|
||||
TSKEY watermark; // for stream
|
||||
char data[];
|
||||
} SRetrieveTableRsp;
|
||||
|
||||
|
@ -3080,6 +3080,22 @@ typedef struct SDeleteRes {
|
|||
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
|
||||
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
|
||||
|
||||
typedef struct {
|
||||
int64_t uid;
|
||||
int64_t ts;
|
||||
} SSingleDeleteReq;
|
||||
|
||||
int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq);
|
||||
int32_t tDecodeSSingleDeleteReq(SDecoder* pCoder, SSingleDeleteReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
SArray* deleteReqs; // SArray<SSingleDeleteReq>
|
||||
} SBatchDeleteReq;
|
||||
|
||||
int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq);
|
||||
int32_t tDecodeSBatchDeleteReq(SDecoder* pCoder, SBatchDeleteReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t msgIdx;
|
||||
int32_t msgType;
|
||||
|
|
|
@ -202,6 +202,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL)
|
||||
|
|
|
@ -212,6 +212,7 @@ tmq_conf_t* tmq_conf_new() {
|
|||
conf->autoCommit = true;
|
||||
conf->autoCommitInterval = 5000;
|
||||
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
|
||||
conf->hbBgEnable = true;
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
|
|
@ -305,7 +305,7 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
|
|||
taosArrayPush(desc.subDesc, &sDesc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc));
|
||||
|
||||
taosArrayPush(pReq->query->queryDesc, &desc);
|
||||
|
@ -5770,3 +5770,40 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
|
||||
if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
|
||||
if (tDecodeI64(pDecoder, &pReq->uid) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq) {
|
||||
if (tEncodeI64(pEncoder, pReq->suid) < 0) return -1;
|
||||
int32_t sz = taosArrayGetSize(pReq->deleteReqs);
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSingleDeleteReq *pOneReq = taosArrayGet(pReq->deleteReqs, i);
|
||||
if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
|
||||
if (tDecodeI64(pDecoder, &pReq->suid) < 0) return -1;
|
||||
int32_t sz;
|
||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||
pReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
if (pReq->deleteReqs == NULL) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSingleDeleteReq deleteReq;
|
||||
if (tDecodeSSingleDeleteReq(pDecoder, &deleteReq) < 0) return -1;
|
||||
taosArrayPush(pReq->deleteReqs, &deleteReq);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -364,6 +364,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
|
|
|
@ -929,14 +929,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true);
|
||||
|
||||
char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&n, pStream->sourceDb, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&n, varDataVal(sourceDB));
|
||||
varDataSetLen(sourceDB, strlen(varDataVal(sourceDB)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->sourceDb, true);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&sourceDB, false);
|
||||
|
||||
char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&n, pStream->targetDb, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&n, varDataVal(targetDB));
|
||||
varDataSetLen(targetDB, strlen(varDataVal(targetDB)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetDb, true);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&targetDB, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true);
|
||||
if (pStream->targetSTbName[0] == 0) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, NULL, true);
|
||||
} else {
|
||||
char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tNameFromString(&n, pStream->targetSTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
strcpy(&targetSTB[VARSTR_HEADER_SIZE], tNameGetTableName(&n));
|
||||
varDataSetLen(targetSTB, strlen(varDataVal(targetSTB)));
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&targetSTB, false);
|
||||
}
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false);
|
||||
|
|
|
@ -398,13 +398,27 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
}
|
||||
|
||||
// 8. TODO generate logs
|
||||
mInfo("rebalance calculation completed, rebalanced vg:");
|
||||
// 8. generate logs
|
||||
mInfo("mq rebalance: calculation completed, rebalanced vg:");
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||
mInfo("vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, pOutputRebVg->pVgEp->vgId,
|
||||
mInfo("mq rebalance: vgId:%d, moved from consumer:%" PRId64 ", to consumer:%" PRId64, pOutputRebVg->pVgEp->vgId,
|
||||
pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||
}
|
||||
{
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
mInfo("mq rebalance: final cfg: consumer %ld has %d vg", pConsumerEp->consumerId, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
mInfo("mq rebalance: final cfg: vg %d to consumer %ld", pVgEp->vgId, pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 9. clear
|
||||
taosHashCleanup(pHash);
|
||||
|
|
|
@ -172,7 +172,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId);
|
||||
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
|
||||
|
||||
// sma
|
||||
int32_t smaInit();
|
||||
|
|
|
@ -119,7 +119,7 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
|||
SName stbFullName = {0};
|
||||
tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
SVCreateStbReq pReq = {0};
|
||||
pReq.name = (char*)tNameGetTableName(&stbFullName);
|
||||
pReq.name = (char *)tNameGetTableName(&stbFullName);
|
||||
pReq.suid = pCfg->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
pReq.schemaTag = pCfg->schemaTag;
|
||||
|
@ -200,8 +200,9 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
SBatchDeleteReq deleteReq;
|
||||
SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
|
||||
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId);
|
||||
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
|
||||
|
||||
if (!pSubmitReq) {
|
||||
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||
|
@ -230,4 +231,4 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
|||
_err:
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "tq.h"
|
||||
|
||||
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
|
||||
const char* stbFullName, int32_t vgId) {
|
||||
const char* stbFullName, int32_t vgId, SBatchDeleteReq* deleteReq) {
|
||||
SSubmitReq* ret = NULL;
|
||||
SArray* schemaReqs = NULL;
|
||||
SArray* schemaReqSz = NULL;
|
||||
|
@ -33,10 +33,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
STagVal tagVal = {
|
||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.groupId,
|
||||
if (pDataBlock->info.type == STREAM_DELETE_DATA) {
|
||||
//
|
||||
}
|
||||
STagVal tagVal = {
|
||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.groupId,
|
||||
};
|
||||
STag* pTag = NULL;
|
||||
taosArrayClear(tagArray);
|
||||
|
@ -177,17 +180,45 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
}
|
||||
|
||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||
const SArray* pRes = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
const SArray* pRes = (const SArray*)data;
|
||||
SVnode* pVnode = (SVnode*)vnode;
|
||||
SBatchDeleteReq deleteReq = {0};
|
||||
|
||||
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
|
||||
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
|
||||
pTask->tbSink.stbFullName, pVnode->config.vgId);
|
||||
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
|
||||
|
||||
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
|
||||
|
||||
int32_t code;
|
||||
int32_t len;
|
||||
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
|
||||
if (code < 0) {
|
||||
//
|
||||
ASSERT(0);
|
||||
}
|
||||
SEncoder encoder;
|
||||
void* buf = taosMemoryCalloc(1, len + sizeof(SMsgHead));
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_BATCH_DEL,
|
||||
.pCont = buf,
|
||||
.contLen = len + sizeof(SMsgHead),
|
||||
};
|
||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(deleteReq.deleteReqs);
|
||||
|
||||
/*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
|
||||
// build write msg
|
||||
SRpcMsg msg = {
|
||||
|
|
|
@ -29,6 +29,7 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
|
@ -190,6 +191,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
case TDMT_VND_DELETE:
|
||||
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
case TDMT_VND_BATCH_DEL:
|
||||
if (vnodeProcessBatchDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
|
||||
break;
|
||||
/* TQ */
|
||||
case TDMT_VND_MQ_VG_CHANGE:
|
||||
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
|
@ -1053,6 +1057,23 @@ static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SBatchDeleteReq deleteReq;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, pReq, len);
|
||||
tDecodeSBatchDeleteReq(&decoder, &deleteReq);
|
||||
|
||||
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
|
||||
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts);
|
||||
if (code) {
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
int32_t code = 0;
|
||||
SDecoder *pCoder = &(SDecoder){0};
|
||||
|
|
|
@ -281,7 +281,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Consumer unchanged")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Topic unchanged")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
|
||||
|
|
Loading…
Reference in New Issue