fix:modify log & sleep 500ms if data is inserting while consume
This commit is contained in:
parent
446f31925e
commit
2d6436c57d
|
@ -533,8 +533,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
|
||||||
uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
|
uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
|
||||||
if (index) {
|
if (index) {
|
||||||
if (colField[*index].type != kv->type) {
|
if (colField[*index].type != kv->type) {
|
||||||
uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
|
uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
|
||||||
kv->key, colField[*index].type, kv->type);
|
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1269,6 +1269,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
|
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
|
}else if(code == TSDB_CODE_WAL_LOG_NOT_EXIST){ //poll data while insert
|
||||||
|
taosMsleep(500);
|
||||||
}
|
}
|
||||||
|
|
||||||
goto CREATE_MSG_FAIL;
|
goto CREATE_MSG_FAIL;
|
||||||
|
|
|
@ -448,9 +448,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
if(code != 0) {
|
if(code != 0) {
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
goto end;
|
||||||
taosWUnLockLatch(&pTq->lock);
|
|
||||||
return TSDB_CODE_TMQ_CONSUMER_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
|
@ -461,16 +459,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->lock);
|
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
|
||||||
// NOTE: this pHandle->consumerId may have been changed already.
|
// NOTE: this pHandle->consumerId may have been changed already.
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
|
||||||
", ts:%" PRId64 ", reqId:0x%" PRIx64,
|
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
|
||||||
dataRsp.rspOffset.ts, pRequest->reqId);
|
|
||||||
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
end:
|
||||||
|
{
|
||||||
|
char buf[80] = {0};
|
||||||
|
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
|
||||||
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,13 +82,13 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
|
|
||||||
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task start execute", pHandle->consumerId, vgId);
|
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
|
||||||
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
|
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
|
||||||
tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
|
tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("consumer:0x%"PRIx64" vgId:%d tmq task executed, total blocks:%d, pDataBlock:%p", pHandle->consumerId, vgId, pRsp->blockNum, pDataBlock);
|
tqDebug("consumer:0x%"PRIx64" vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, pDataBlock);
|
||||||
// current scan should be stopped asap, since the rebalance occurs.
|
// current scan should be stopped asap, since the rebalance occurs.
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -107,6 +107,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tqDebug("consumer:0x%"PRIx64" vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
|
||||||
qStreamExtractOffset(task, &pRsp->rspOffset);
|
qStreamExtractOffset(task, &pRsp->rspOffset);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1672,7 +1672,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); //curVersion move to next, so currentOffset = curVersion - 1
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); //curVersion move to next, so currentOffset = curVersion - 1
|
||||||
|
|
||||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||||
qDebug("doQueueScan get data from log %"PRId64" rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
|
qDebug("doQueueScan get data from log %"PRId64" rows, version:%" PRId64, ret.data.info.rows, pTaskInfo->streamInfo.currentOffset.version);
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
setBlockIntoRes(pInfo, &ret.data, true);
|
setBlockIntoRes(pInfo, &ret.data, true);
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
|
|
Loading…
Reference in New Issue