Merge pull request #15361 from taosdata/feature/tq
fix(tmq): correctly set reader status
This commit is contained in:
commit
072fe10013
|
@ -135,6 +135,7 @@ typedef struct {
|
||||||
int64_t curVersion;
|
int64_t curVersion;
|
||||||
int64_t capacity;
|
int64_t capacity;
|
||||||
int8_t curInvalid;
|
int8_t curInvalid;
|
||||||
|
int8_t curStopped;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
SWalFilterCond cond;
|
SWalFilterCond cond;
|
||||||
SWalCkHead *pHead;
|
SWalCkHead *pHead;
|
||||||
|
|
|
@ -138,6 +138,14 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
|
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
if (pRsp->blockNum > 0) {
|
||||||
|
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||||
|
} else {
|
||||||
|
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||||
|
|
|
@ -65,12 +65,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
qTaskInfo_t task = pExec->execCol.task;
|
qTaskInfo_t task = pExec->execCol.task;
|
||||||
|
|
||||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||||
|
tqDebug("prepare scan failed, return");
|
||||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||||
pRsp->rspOffset = *pOffset;
|
pRsp->rspOffset = *pOffset;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
|
||||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||||
|
tqDebug("prepare scan failed, return");
|
||||||
pRsp->rspOffset = *pOffset;
|
pRsp->rspOffset = *pOffset;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -126,9 +128,16 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
||||||
|
|
||||||
ASSERT(pRsp->rspOffset.type != 0);
|
ASSERT(pRsp->rspOffset.type != 0);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
|
if (pRsp->blockNum > 0) {
|
||||||
|
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||||
|
} else {
|
||||||
|
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
tqDebug("task exec exited");
|
tqDebug("task exec exited");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,10 +132,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
while (1) {
|
while (1) {
|
||||||
if (!fromProcessedMsg) {
|
if (!fromProcessedMsg) {
|
||||||
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
||||||
pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curInvalid;
|
pReader->ver =
|
||||||
|
pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
|
||||||
ret->offset.type = TMQ_OFFSET__LOG;
|
ret->offset.type = TMQ_OFFSET__LOG;
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
|
tqDebug("return offset %ld, no more valid", ret->offset.version);
|
||||||
ASSERT(ret->offset.version >= 0);
|
ASSERT(ret->offset.version >= 0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -167,6 +169,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
ASSERT(pReader->ver >= 0);
|
ASSERT(pReader->ver >= 0);
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
|
tqDebug("return offset %ld, processed finish", ret->offset.version);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -948,7 +948,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
||||||
}
|
}
|
||||||
blockDataCleanup(pDestBlock);
|
blockDataCleanup(pDestBlock);
|
||||||
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
|
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
|
ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
|
||||||
|
@ -996,16 +996,16 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
||||||
|
|
||||||
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
|
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
|
||||||
ASSERT(pTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
ASSERT(pTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
|
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[0]);
|
uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[0]);
|
||||||
for (int32_t i = 0; i < rows; ) {
|
for (int32_t i = 0; i < rows;) {
|
||||||
colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(tsCol + i), false);
|
colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(tsCol + i), false);
|
||||||
STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i);
|
STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i);
|
||||||
colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(tsCol + i - 1), false);
|
colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(tsCol + i - 1), false);
|
||||||
|
@ -1169,8 +1169,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
} else if (ret.fetchType == FETCH_TYPE__NONE) {
|
||||||
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||||
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
|
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
|
||||||
qDebug("stream scan log return null");
|
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
|
||||||
|
char formatBuf[80];
|
||||||
|
tFormatOffset(formatBuf, 80, &ret.offset);
|
||||||
|
qDebug("stream scan log return null, offset %s", formatBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -1274,7 +1277,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamAggSupporter* pSup = pInfo->sessionSup.pStreamAggSup;
|
SStreamAggSupporter* pSup = pInfo->sessionSup.pStreamAggSup;
|
||||||
if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
|
if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||||
|
|
|
@ -21,7 +21,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead);
|
||||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
||||||
|
|
||||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
|
SWalReader *pRead = taosMemoryCalloc(1, sizeof(SWalReader));
|
||||||
if (pRead == NULL) {
|
if (pRead == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -75,6 +75,7 @@ int32_t walNextValidMsg(SWalReader *pRead) {
|
||||||
|
|
||||||
wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
|
wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
|
||||||
pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
||||||
|
pRead->curStopped = 0;
|
||||||
while (fetchVer <= endVer) {
|
while (fetchVer <= endVer) {
|
||||||
if (walFetchHeadNew(pRead, fetchVer) < 0) {
|
if (walFetchHeadNew(pRead, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -93,6 +94,7 @@ int32_t walNextValidMsg(SWalReader *pRead) {
|
||||||
ASSERT(fetchVer == pRead->curVersion);
|
ASSERT(fetchVer == pRead->curVersion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pRead->curStopped = 1;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,6 +223,8 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||||
int64_t contLen;
|
int64_t contLen;
|
||||||
bool seeked = false;
|
bool seeked = false;
|
||||||
|
|
||||||
|
wDebug("vgId:%d, wal starts to fetch head %d", pRead->pWal->cfg.vgId, fetchVer);
|
||||||
|
|
||||||
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
|
||||||
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
if (walReadSeekVer(pRead, fetchVer) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -257,6 +261,8 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
SWalCont *pReadHead = &pRead->pHead->head;
|
SWalCont *pReadHead = &pRead->pHead->head;
|
||||||
int64_t ver = pReadHead->version;
|
int64_t ver = pReadHead->version;
|
||||||
|
|
||||||
|
wDebug("vgId:%d, wal starts to fetch body %ld", pRead->pWal->cfg.vgId, ver);
|
||||||
|
|
||||||
if (pRead->capacity < pReadHead->bodyLen) {
|
if (pRead->capacity < pReadHead->bodyLen) {
|
||||||
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
|
@ -300,8 +306,8 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wDebug("version %ld is fetched, cursor advance", ver);
|
||||||
pRead->curVersion = ver + 1;
|
pRead->curVersion = ver + 1;
|
||||||
wDebug("version advance to %ld, fetch body", pRead->curVersion);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue