fix:modify commit version to next validate version
This commit is contained in:
parent
d2971230e0
commit
bbdcbcb75b
|
@ -287,11 +287,20 @@ DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
|
|||
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
|
||||
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
|
||||
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
|
||||
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
|
||||
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
|
||||
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
|
||||
int32_t *numOfAssignment);
|
||||
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
|
||||
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
|
||||
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
|
||||
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
|
||||
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
|
||||
|
||||
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
|
||||
|
||||
enum tmq_conf_res_t {
|
||||
|
@ -309,11 +318,6 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
|
|||
|
||||
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
|
||||
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
|
||||
|
||||
/* ------------------------------ TAOSX -----------------------------------*/
|
||||
// note: following apis are unstable
|
||||
enum tmq_res_t {
|
||||
|
|
|
@ -228,7 +228,7 @@ typedef struct SStoreTqReader {
|
|||
} SStoreTqReader;
|
||||
|
||||
typedef struct SStoreSnapshotFn {
|
||||
int32_t (*createSnapshot)(SSnapContext* ctx, int64_t uid);
|
||||
int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid);
|
||||
int32_t (*destroySnapshot)(SSnapContext* ctx);
|
||||
SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);
|
||||
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
|
||||
|
|
|
@ -1901,7 +1901,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
|
|||
|
||||
// update the valid wal version range
|
||||
pVg->offsetInfo.walVerBegin = sver;
|
||||
pVg->offsetInfo.walVerEnd = ever;
|
||||
pVg->offsetInfo.walVerEnd = ever + 1;
|
||||
// pVg->receivedInfoFromVnode = true;
|
||||
}
|
||||
|
||||
|
@ -2541,7 +2541,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
SMqRspHead* pHead = pMsg->pData;
|
||||
|
||||
tmq_topic_assignment assignment = {.begin = pHead->walsver,
|
||||
.end = pHead->walever,
|
||||
.end = pHead->walever + 1,
|
||||
.currentOffset = rsp.rspOffset.version,
|
||||
.vgId = pParam->vgId};
|
||||
|
||||
|
@ -2600,7 +2600,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
*numOfAssignment = taosArrayGetSize(pTopic->vgs);
|
||||
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
||||
int32_t type = pClientVg->offsetInfo.currentOffset.type;
|
||||
int32_t type = pClientVg->offsetInfo.seekOffset.type;
|
||||
if (isInSnapshotMode(type, tmq->useSnapshot)) {
|
||||
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type);
|
||||
code = TSDB_CODE_TMQ_SNAPSHOT_ERROR;
|
||||
|
@ -2620,7 +2620,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
|
||||
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (pClientVg->offsetInfo.currentOffset.type != TMQ_OFFSET__LOG) {
|
||||
if (pClientVg->offsetInfo.seekOffset.type != TMQ_OFFSET__LOG) {
|
||||
needFetch = true;
|
||||
break;
|
||||
}
|
||||
|
@ -2705,7 +2705,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
|
||||
int64_t transporterId = 0;
|
||||
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
|
||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.seekOffset);
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
||||
|
@ -2825,7 +2825,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
|
|||
|
||||
// update the offset, and then commit to vnode
|
||||
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
||||
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
|
||||
pOffsetInfo->currentOffset.version = offset;
|
||||
pOffsetInfo->seekOffset = pOffsetInfo->currentOffset;
|
||||
// pOffsetInfo->committedOffset.version = INT64_MIN;
|
||||
pVg->seekUpdated = true;
|
||||
|
|
|
@ -196,7 +196,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
|||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
|
||||
", no more log to return, reqId:0x%" PRIx64,
|
||||
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
|
||||
*fetchOffset = offset - 1;
|
||||
*fetchOffset = offset;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
|
|
@ -119,7 +119,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
}
|
||||
} else {
|
||||
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
|
||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
|
@ -127,7 +127,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pRequest);
|
||||
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer);
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
|
||||
pHandle->subKey, vgId, dataRsp.rspOffset.version);
|
||||
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
|
||||
|
@ -138,7 +138,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
} else {
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer + 1);
|
||||
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
||||
|
@ -246,7 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
|
||||
if (offset->type == TMQ_OFFSET__LOG) {
|
||||
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
||||
int64_t fetchVer = offset->version + 1;
|
||||
int64_t fetchVer = offset->version;
|
||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||
if (pCkHead == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -279,14 +279,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
// process meta
|
||||
if (pHead->msgType != TDMT_VND_SUBMIT) {
|
||||
if (totalRows > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
|
||||
goto end;
|
||||
}
|
||||
|
||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
|
||||
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
|
||||
metaRsp.resMsgType = pHead->msgType;
|
||||
metaRsp.metaRspLen = pHead->bodyLen;
|
||||
metaRsp.metaRsp = pHead->body;
|
||||
|
@ -309,7 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
}
|
||||
|
||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
||||
setRequestVersion(&taosxRsp.reqOffset, offset->version);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
|
||||
goto end;
|
||||
|
|
|
@ -237,7 +237,7 @@ void initCacheFn(SStoreCacheReader* pCache) {
|
|||
}
|
||||
|
||||
void initSnapshotFn(SStoreSnapshotFn* pSnapshot) {
|
||||
pSnapshot->createSnapshot = setForSnapShot;
|
||||
pSnapshot->setForSnapShot = setForSnapShot;
|
||||
pSnapshot->destroySnapshot = destroySnapContext;
|
||||
pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot;
|
||||
pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot;
|
||||
|
|
|
@ -1112,8 +1112,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
|
||||
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
|
||||
walReaderVerifyOffset(pWalReader, pOffset);
|
||||
if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||
qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
||||
if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id) < 0) {
|
||||
qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
|
||||
return -1;
|
||||
}
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
|
@ -1202,7 +1202,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
|
||||
STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
|
||||
|
||||
if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
|
||||
if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
|
@ -1239,7 +1239,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
SSnapContext* sContext = pInfo->sContext;
|
||||
if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
|
||||
if (pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
|
||||
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
|
||||
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
|
|
|
@ -1644,12 +1644,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
|
||||
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
|
||||
if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
|
||||
int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1;
|
||||
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer);
|
||||
if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer);
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
||||
|
@ -1660,8 +1661,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
|
||||
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
|
||||
|
||||
// curVersion move to next, so currentOffset = curVersion - 1
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion - 1);
|
||||
// curVersion move to next
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
|
||||
|
||||
if (hasResult) {
|
||||
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
||||
|
@ -2182,7 +2183,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
|||
STqOffsetVal offset = {0};
|
||||
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
|
||||
qDebug("tmqsnap read snapshot done, change to get data from wal");
|
||||
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
|
||||
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1);
|
||||
} else {
|
||||
tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
|
||||
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
|
||||
|
|
|
@ -135,8 +135,8 @@ void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){
|
|||
int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
|
||||
taosThreadMutexUnlock(&pWalReader->pWal->mutex);
|
||||
|
||||
if (pOffset->version + 1 < firstVer){
|
||||
pOffset->version = firstVer - 1;
|
||||
if (pOffset->version < firstVer){
|
||||
pOffset->version = firstVer;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue