batch meta
This commit is contained in:
parent
d33b2521fd
commit
7fc2e8cc26
|
@ -2051,6 +2051,7 @@ static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) {
|
||||||
static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
|
static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
|
||||||
SDecoder coder;
|
SDecoder coder;
|
||||||
SMqBatchMetaRsp rsp = {0};
|
SMqBatchMetaRsp rsp = {0};
|
||||||
|
SArray* strArray = NULL;
|
||||||
tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
|
tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
|
||||||
if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
|
if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
@ -2058,7 +2059,7 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
|
||||||
|
|
||||||
int64_t fullSize = 0;
|
int64_t fullSize = 0;
|
||||||
int32_t num = taosArrayGetSize(rsp.batchMetaReq);
|
int32_t num = taosArrayGetSize(rsp.batchMetaReq);
|
||||||
SArray* strArray = taosArrayInit(num, POINTER_BYTES);
|
strArray = taosArrayInit(num, POINTER_BYTES);
|
||||||
for (int32_t i = 0; i < num; i++) {
|
for (int32_t i = 0; i < num; i++) {
|
||||||
int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i);
|
int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i);
|
||||||
void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
|
void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
|
||||||
|
@ -2078,10 +2079,17 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
|
||||||
for (int32_t i = 0; i < num; i++) {
|
for (int32_t i = 0; i < num; i++) {
|
||||||
char* subStr = taosArrayGetP(strArray, i);
|
char* subStr = taosArrayGetP(strArray, i);
|
||||||
strcat(buf, subStr);
|
strcat(buf, subStr);
|
||||||
strcat(buf, "\n");
|
if (i < num - 1) {
|
||||||
|
strcat(buf, "\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
taosArrayDestroyP(strArray, taosMemoryFree);
|
||||||
|
tDeleteMqBatchMetaRsp(&rsp);
|
||||||
|
return buf;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
taosArrayDestroyP(strArray, taosMemoryFree);
|
||||||
|
tDeleteMqBatchMetaRsp(&rsp);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2009,7 +2009,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
tmqFreeRspWrapper(pRspWrapper);
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
}
|
}
|
||||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
|
||||||
|
@ -2032,12 +2032,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset,
|
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset,
|
||||||
pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
|
||||||
// build rsp
|
// build rsp
|
||||||
void* pRsp = NULL;
|
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
||||||
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
|
||||||
pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
|
|
||||||
} else {
|
|
||||||
pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper);
|
|
||||||
}
|
|
||||||
taosFreeQitem(pRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
taosWUnLockLatch(&tmq->lock);
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
|
@ -2048,6 +2043,42 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||||
tmqFreeRspWrapper(pRspWrapper);
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
taosFreeQitem(pRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
}
|
}
|
||||||
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
|
||||||
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
|
||||||
|
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
|
||||||
|
|
||||||
|
if (pollRspWrapper->batchMetaRsp.head.epoch == consumerEpoch) {
|
||||||
|
taosWLockLatch(&tmq->lock);
|
||||||
|
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
|
pollRspWrapper->vgHandle = pVg;
|
||||||
|
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
|
||||||
|
if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) {
|
||||||
|
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
|
||||||
|
pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
|
taosFreeQitem(pRspWrapper);
|
||||||
|
taosWUnLockLatch(&tmq->lock);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// build rsp
|
||||||
|
void* pRsp = NULL;
|
||||||
|
updateVgInfo(pVg, &pollRspWrapper->batchMetaRsp.rspOffset, &pollRspWrapper->batchMetaRsp.rspOffset,
|
||||||
|
pollRspWrapper->batchMetaRsp.head.walsver, pollRspWrapper->batchMetaRsp.head.walever,
|
||||||
|
tmq->consumerId, true);
|
||||||
|
pRsp = tmqBuildBatchMetaRspFromWrapper(pollRspWrapper);
|
||||||
|
taosFreeQitem(pRspWrapper);
|
||||||
|
taosWUnLockLatch(&tmq->lock);
|
||||||
|
return pRsp;
|
||||||
|
} else {
|
||||||
|
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
|
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->batchMetaRsp.head.epoch, consumerEpoch);
|
||||||
|
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||||
|
tmqFreeRspWrapper(pRspWrapper);
|
||||||
|
taosFreeQitem(pRspWrapper);
|
||||||
|
}
|
||||||
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
|
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
|
||||||
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
|
||||||
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
|
||||||
|
|
|
@ -191,14 +191,13 @@ end : {
|
||||||
|
|
||||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
SRpcMsg* pMsg, STqOffsetVal* offset) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SMqMetaRsp metaRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
STaosxRsp taosxRsp = {0};
|
SMqBatchMetaRsp btMetaRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp.common, *offset);
|
tqInitTaosxRsp(&taosxRsp.common, *offset);
|
||||||
|
|
||||||
if (offset->type != TMQ_OFFSET__LOG) {
|
if (offset->type != TMQ_OFFSET__LOG) {
|
||||||
SMqBatchMetaRsp btMetaRsp = {0};
|
|
||||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset) < 0) {
|
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -232,11 +231,18 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
|
|
||||||
uint64_t st = taosGetTimestampMs();
|
uint64_t st = taosGetTimestampMs();
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
|
int32_t totalMetaRows = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
ASSERT(savedEpoch <= pRequest->epoch);
|
ASSERT(savedEpoch <= pRequest->epoch);
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
|
||||||
|
if (totalMetaRows > 0) {
|
||||||
|
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
|
||||||
|
tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
|
||||||
|
ASSERT(totalRows == 0);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer);
|
||||||
code = tqSendDataRsp(
|
code = tqSendDataRsp(
|
||||||
pHandle, pMsg, pRequest, &taosxRsp,
|
pHandle, pMsg, pRequest, &taosxRsp,
|
||||||
|
@ -270,12 +276,48 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s", pHead->version, vgId, TMSG_INFO(pHead->msgType));
|
||||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
|
if (!btMetaRsp.batchMetaReq) {
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
|
||||||
metaRsp.metaRspLen = pHead->bodyLen;
|
btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
|
||||||
metaRsp.metaRsp = pHead->body;
|
}
|
||||||
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
|
fetchVer++;
|
||||||
|
|
||||||
|
SMqMetaRsp tmpMetaRsp = {0};
|
||||||
|
tmpMetaRsp.resMsgType = pHead->msgType;
|
||||||
|
tmpMetaRsp.metaRspLen = pHead->bodyLen;
|
||||||
|
tmpMetaRsp.metaRsp = pHead->body;
|
||||||
|
uint32_t len = 0;
|
||||||
|
tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
int32_t tLen = sizeof(SMqRspHead) + len;
|
||||||
|
void* tBuf = taosMemoryCalloc(1, tLen);
|
||||||
|
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, metaBuff, len);
|
||||||
|
code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
|
||||||
|
if (code < 0) {
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
taosArrayPush(btMetaRsp.batchMetaReq, &tBuf);
|
||||||
|
taosArrayPush(btMetaRsp.batchMetaLen, &tLen);
|
||||||
|
totalMetaRows++;
|
||||||
|
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
|
||||||
|
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
|
||||||
|
tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (totalMetaRows > 0) {
|
||||||
|
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
|
||||||
|
tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,8 +424,8 @@ int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SM
|
||||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
||||||
|
|
||||||
tmsgSendRsp(&resp);
|
tmsgSendRsp(&resp);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, offset type:%d", vgId,
|
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%d offset type:%d", vgId,
|
||||||
pReq->consumerId, pReq->epoch, pRsp->rspOffset.type);
|
pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,6 @@ int32_t scanDebug = 0;
|
||||||
#define STREAM_SCAN_OP_NAME "StreamScanOperator"
|
#define STREAM_SCAN_OP_NAME "StreamScanOperator"
|
||||||
#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState"
|
#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState"
|
||||||
#define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint"
|
#define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint"
|
||||||
#define TMQ_MAX_BATCH_SIZE 4096
|
|
||||||
|
|
||||||
typedef struct STableMergeScanExecInfo {
|
typedef struct STableMergeScanExecInfo {
|
||||||
SFileBlockLoadRecorder blockRecorder;
|
SFileBlockLoadRecorder blockRecorder;
|
||||||
|
@ -2894,7 +2893,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
SSnapContext* sContext = pInfo->sContext;
|
SSnapContext* sContext = pInfo->sContext;
|
||||||
for(int32_t i = 0; i < TMQ_MAX_BATCH_SIZE; i++) {
|
for(int32_t i = 0; i < tmqRowSize; i++) {
|
||||||
void* data = NULL;
|
void* data = NULL;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
int16_t type = 0;
|
int16_t type = 0;
|
||||||
|
|
|
@ -1064,7 +1064,7 @@ void testConsumeExcluded(int topic_type) {
|
||||||
char* topic = "create topic topic_excluded with meta as database db_taosx";
|
char* topic = "create topic topic_excluded with meta as database db_taosx";
|
||||||
pRes = taos_query(pConn, topic);
|
pRes = taos_query(pConn, topic);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_excluded1, reason:%s\n", taos_errstr(pRes));
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1073,7 +1073,7 @@ void testConsumeExcluded(int topic_type) {
|
||||||
char* topic = "create topic topic_excluded as select * from stt";
|
char* topic = "create topic topic_excluded as select * from stt";
|
||||||
pRes = taos_query(pConn, topic);
|
pRes = taos_query(pConn, topic);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_excluded2, reason:%s\n", taos_errstr(pRes));
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1115,7 +1115,7 @@ void testConsumeExcluded(int topic_type) {
|
||||||
assert(raw.raw_type != 2 && raw.raw_type != 4 && raw.raw_type != TDMT_VND_CREATE_STB &&
|
assert(raw.raw_type != 2 && raw.raw_type != 4 && raw.raw_type != TDMT_VND_CREATE_STB &&
|
||||||
raw.raw_type != TDMT_VND_ALTER_STB && raw.raw_type != TDMT_VND_CREATE_TABLE &&
|
raw.raw_type != TDMT_VND_ALTER_STB && raw.raw_type != TDMT_VND_CREATE_TABLE &&
|
||||||
raw.raw_type != TDMT_VND_ALTER_TABLE && raw.raw_type != TDMT_VND_DELETE);
|
raw.raw_type != TDMT_VND_ALTER_TABLE && raw.raw_type != TDMT_VND_DELETE);
|
||||||
assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE);
|
assert(raw.raw_type == TDMT_VND_DROP_STB || raw.raw_type == TDMT_VND_DROP_TABLE || raw.raw_type == 5);
|
||||||
} else if (topic_type == 2) {
|
} else if (topic_type == 2) {
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue