fix(tmq): seek snapshot offset
This commit is contained in:
parent
52e0482013
commit
31bbef82d1
|
@ -247,7 +247,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
|
||||||
pObj->status = 0;
|
pObj->status = 0;
|
||||||
|
|
||||||
// TODO
|
|
||||||
pObj->igExpired = pCreate->igExpired;
|
pObj->igExpired = pCreate->igExpired;
|
||||||
pObj->trigger = pCreate->triggerType;
|
pObj->trigger = pCreate->triggerType;
|
||||||
pObj->triggerParam = pCreate->maxDelay;
|
pObj->triggerParam = pCreate->maxDelay;
|
||||||
|
|
|
@ -406,193 +406,6 @@ OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|
||||||
SMqPollReq* pReq = pMsg->pCont;
|
|
||||||
int64_t consumerId = pReq->consumerId;
|
|
||||||
int64_t timeout = pReq->timeout;
|
|
||||||
int32_t reqEpoch = pReq->epoch;
|
|
||||||
int64_t fetchOffset;
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// get offset to fetch message
|
|
||||||
if (pReq->currentOffset >= 0) {
|
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
|
||||||
} else {
|
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
|
|
||||||
if (pOffset != NULL) {
|
|
||||||
ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);
|
|
||||||
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
|
|
||||||
TD_VID(pTq->pVnode), pOffset->val.version);
|
|
||||||
fetchOffset = pOffset->val.version + 1;
|
|
||||||
} else {
|
|
||||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
|
||||||
fetchOffset = walGetFirstVer(pTq->pWal);
|
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
|
||||||
fetchOffset = walGetCommittedVer(pTq->pWal);
|
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
|
|
||||||
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
|
||||||
pReq->subKey);
|
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
|
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId,
|
|
||||||
pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
|
||||||
/*ASSERT(pHandle);*/
|
|
||||||
if (pHandle == NULL) {
|
|
||||||
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
|
||||||
pReq->subKey);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHandle->consumerId != consumerId) {
|
|
||||||
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
|
|
||||||
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
|
|
||||||
while (consumerEpoch < reqEpoch) {
|
|
||||||
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
|
||||||
|
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
|
||||||
|
|
||||||
if (rsp.blockData == NULL || rsp.blockDataLen == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp.withTbName = pReq->withTbName;
|
|
||||||
if (rsp.withTbName) {
|
|
||||||
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
rsp.withSchema = false;
|
|
||||||
} else {
|
|
||||||
rsp.withSchema = true;
|
|
||||||
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 1
|
|
||||||
if (pReq->useSnapshot) {
|
|
||||||
// TODO set ver into snapshot
|
|
||||||
int64_t lastVer = walGetCommittedVer(pTq->pWal);
|
|
||||||
if (rsp.reqOffset < lastVer) {
|
|
||||||
tqInfo("retrieve using snapshot req offset %ld last ver %ld", rsp.reqOffset, lastVer);
|
|
||||||
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
|
|
||||||
|
|
||||||
if (rsp.blockNum != 0) {
|
|
||||||
rsp.withTbName = false;
|
|
||||||
rsp.rspOffset = lastVer;
|
|
||||||
tqInfo("direct send by snapshot req offset %ld rsp offset %ld", rsp.reqOffset, rsp.rspOffset);
|
|
||||||
fetchOffset = lastVer;
|
|
||||||
goto SEND_RSP;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
|
|
||||||
if (pHeadWithCkSum == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
|
||||||
if (consumerEpoch > reqEpoch) {
|
|
||||||
tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d",
|
|
||||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchOffset, &pHeadWithCkSum) < 0) {
|
|
||||||
// TODO add push mgr
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SWalCont* pHead = &pHeadWithCkSum->head;
|
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
|
||||||
|
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
|
||||||
|
|
||||||
if (tqDataExec(pTq, &pHandle->execHandle, pCont, &rsp, workerId) < 0) {
|
|
||||||
/*ASSERT(0);*/
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(pHandle->fetchMeta);
|
|
||||||
ASSERT(IS_META_MSG(pHead->msgType));
|
|
||||||
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
|
|
||||||
SMqMetaRsp metaRsp = {0};
|
|
||||||
metaRsp.reqOffset = pReq->currentOffset;
|
|
||||||
metaRsp.rspOffset = fetchOffset;
|
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
|
||||||
metaRsp.metaRspLen = pHead->bodyLen;
|
|
||||||
metaRsp.metaRsp = pHead->body;
|
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
goto OVER;
|
|
||||||
}
|
|
||||||
code = 0;
|
|
||||||
goto OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO batch optimization:
|
|
||||||
// TODO continue scan until meeting batch requirement
|
|
||||||
if (rsp.blockNum > 0 /* threshold */) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
fetchOffset++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(pHeadWithCkSum);
|
|
||||||
|
|
||||||
SEND_RSP:
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
|
||||||
if (rsp.withSchema) {
|
|
||||||
ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum);
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp.rspOffset = fetchOffset;
|
|
||||||
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &rsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
OVER:
|
|
||||||
// TODO wrap in destroy func
|
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
|
||||||
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
|
|
||||||
|
|
||||||
if (rsp.withSchema) {
|
|
||||||
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rsp.withTbName) {
|
|
||||||
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
|
|
|
@ -1422,7 +1422,7 @@ void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
qDebug("page_setbuf, groupId:%"PRIu64, groupId);
|
qDebug("page_setbuf, groupId:%" PRIu64, groupId);
|
||||||
#endif
|
#endif
|
||||||
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
||||||
|
|
||||||
|
@ -1570,9 +1570,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
|
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||||
// break;
|
// break;
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||||
|
@ -2868,7 +2868,24 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||||
pInfo->cond.twindows[0].skey = oldSkey;
|
pInfo->cond.twindows[0].skey = oldSkey;
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
pInfo->curTWinIdx = 0;
|
pInfo->curTWinIdx = 0;
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
bool found = false;
|
||||||
|
for (int32_t i = 0; i < tableSz; i++) {
|
||||||
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
|
||||||
|
if (pTableInfo->uid == uid) {
|
||||||
|
found = true;
|
||||||
|
pInfo->currentTable = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO after processing drop,
|
||||||
|
ASSERT(found);
|
||||||
|
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||||
|
pInfo->currentTable, tableSz);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -4107,8 +4124,8 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
} else {
|
} else {
|
||||||
isNull[index++] = 0;
|
isNull[index++] = 0;
|
||||||
char* data = nodesGetValueFromNode(pValue);
|
char* data = nodesGetValueFromNode(pValue);
|
||||||
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||||
if(tTagIsJson(data)){
|
if (tTagIsJson(data)) {
|
||||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||||
taosMemoryFree(keyBuf);
|
taosMemoryFree(keyBuf);
|
||||||
nodesClearList(groupNew);
|
nodesClearList(groupNew);
|
||||||
|
@ -4173,7 +4190,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if(code){
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4202,7 +4219,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
};
|
};
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if(code){
|
if (code) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,3 +157,4 @@ python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
||||||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
||||||
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py
|
||||||
|
python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg.py
|
||||||
|
|
Loading…
Reference in New Issue