Merge pull request #20418 from taosdata/fix/TD-22671
fix:deal with ASSERT in tmq
This commit is contained in:
commit
6989f1be61
|
@ -1243,7 +1243,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
/*pRspWrapper->vgHandle = pVg;*/
|
||||
/*pRspWrapper->topicHandle = pTopic;*/
|
||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||
tsem_post(&tmq->rspSem);
|
||||
}
|
||||
|
||||
goto CREATE_MSG_FAIL;
|
||||
|
@ -1923,7 +1922,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||
int32_t retryCnt = 0;
|
||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||
if (retryCnt++ > 40) {
|
||||
|
|
|
@ -169,18 +169,6 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
|||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
||||
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
A(!pRsp->withSchema);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||
|
@ -224,22 +212,6 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
|||
}
|
||||
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
A(!pRsp->withSchema);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
if (pRsp->blockNum > 0) {
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
} else {
|
||||
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||
|
@ -282,25 +254,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
|||
}
|
||||
|
||||
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
if (pRsp->withSchema) {
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||
} else {
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
}
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
if (pRsp->blockNum > 0) {
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
} else {
|
||||
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
||||
|
@ -429,18 +382,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
|||
}
|
||||
|
||||
pRsp->withTbName = 0;
|
||||
#if 0
|
||||
pRsp->withTbName = pReq->withTbName;
|
||||
if (pRsp->withTbName) {
|
||||
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
|
||||
if (pRsp->blockTbName == NULL) {
|
||||
// TODO free
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/*A(subType == TOPIC_SUB_TYPE__COLUMN);*/
|
||||
pRsp->withSchema = false;
|
||||
|
||||
return 0;
|
||||
|
@ -992,7 +933,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
|
||||
pTask->tbSink.pTSchema =
|
||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version);
|
||||
ASSERT(pTask->tbSink.pTSchema);
|
||||
if(pTask->tbSink.pTSchema == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
streamSetupTrigger(pTask);
|
||||
|
|
|
@ -113,7 +113,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(!(pRsp->withTbName || pRsp->withSchema));
|
||||
if(pRsp->withTbName || pRsp->withSchema){
|
||||
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -265,7 +265,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
.msgLen = len,
|
||||
.ver = ver,
|
||||
};
|
||||
qStreamSetScanMemData(task, submit);
|
||||
if(qStreamSetScanMemData(task, submit) != 0){
|
||||
continue;
|
||||
}
|
||||
|
||||
// here start to scan submit block to extract the subscribed data
|
||||
while (1) {
|
||||
|
|
|
@ -992,19 +992,16 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
|
|||
|
||||
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
return &pTaskInfo->streamInfo.metaRsp;
|
||||
}
|
||||
|
||||
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
return pTaskInfo->streamInfo.prepareStatus.uid;
|
||||
}
|
||||
|
||||
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
|
||||
return 0;
|
||||
}
|
||||
|
@ -1040,20 +1037,12 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t scanVer) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
ASSERT(pTaskInfo->streamInfo.pReq == NULL);
|
||||
pTaskInfo->streamInfo.pReq = pReq;
|
||||
pTaskInfo->streamInfo.scanVer = scanVer;
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL));
|
||||
if((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)){
|
||||
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
|
||||
return -1;
|
||||
}
|
||||
qDebug("set the submit block for future scan");
|
||||
|
||||
pTaskInfo->streamInfo.submit = submit;
|
||||
|
@ -1063,7 +1052,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
|
|||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
||||
pTaskInfo->streamInfo.prepareStatus = *pOffset;
|
||||
pTaskInfo->streamInfo.returned = 0;
|
||||
|
||||
|
@ -1076,7 +1064,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
// TODO add more check
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
ASSERT(pOperator->numOfDownstream == 1);
|
||||
if(pOperator->numOfDownstream != 1){
|
||||
qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream);
|
||||
return -1;
|
||||
}
|
||||
pOperator = pOperator->pDownstream[0];
|
||||
}
|
||||
|
||||
|
@ -1087,6 +1078,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
pTSInfo->base.dataReader = NULL;
|
||||
// let's seek to the next version in wal file
|
||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
|
||||
qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1);
|
||||
return -1;
|
||||
}
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
|
@ -1101,6 +1093,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
uid = pTableInfo->uid;
|
||||
ts = INT64_MIN;
|
||||
} else {
|
||||
qError("uid == 0 and tablelist size is 0");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1124,7 +1117,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
}
|
||||
|
||||
// TODO after dropping table, table may not found
|
||||
ASSERT(found);
|
||||
if(!found){
|
||||
qError("uid not found in tablelist %" PRId64, uid);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pTableScanInfo->base.dataReader == NULL) {
|
||||
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
|
||||
|
@ -1133,7 +1129,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
|
||||
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
|
||||
pTableScanInfo->base.dataReader == NULL) {
|
||||
ASSERT(0);
|
||||
qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1148,7 +1145,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
|
||||
ts, pTableScanInfo->currentTable, numOfTables);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
qError("invalid pOffset->type:%d", pOffset->type);
|
||||
return -1;
|
||||
}
|
||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||
|
@ -1180,7 +1178,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
|
||||
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||
ASSERT(size == 1);
|
||||
|
||||
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
|
||||
|
||||
|
@ -1209,7 +1206,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
|
||||
assert(pMsg->info.ahandle != NULL);
|
||||
if(pMsg->info.ahandle == NULL){
|
||||
qError("pMsg->info.ahandle is NULL");
|
||||
return;
|
||||
}
|
||||
|
||||
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
|
||||
|
||||
|
|
Loading…
Reference in New Issue