Merge pull request #21362 from taosdata/feature/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
e2f43d4ac8
|
@ -208,11 +208,6 @@ typedef struct SSDataBlock {
|
||||||
SDataBlockInfo info;
|
SDataBlockInfo info;
|
||||||
} SSDataBlock;
|
} SSDataBlock;
|
||||||
|
|
||||||
enum {
|
|
||||||
FETCH_TYPE__DATA = 0,
|
|
||||||
FETCH_TYPE__NONE,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct SVarColAttr {
|
typedef struct SVarColAttr {
|
||||||
int32_t* offset; // start position for each entry in the list
|
int32_t* offset; // start position for each entry in the list
|
||||||
uint32_t length; // used buffer size that contain the valid data
|
uint32_t length; // used buffer size that contain the valid data
|
||||||
|
|
|
@ -190,8 +190,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
|
||||||
|
|
||||||
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
|
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
|
|
||||||
|
|
||||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
|
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
|
||||||
|
|
||||||
void qStreamSetOpen(qTaskInfo_t tinfo);
|
void qStreamSetOpen(qTaskInfo_t tinfo);
|
||||||
|
|
|
@ -142,7 +142,7 @@ typedef struct {
|
||||||
typedef struct SWalReader SWalReader;
|
typedef struct SWalReader SWalReader;
|
||||||
|
|
||||||
// todo hide this struct
|
// todo hide this struct
|
||||||
typedef struct SWalReader {
|
struct SWalReader {
|
||||||
SWal *pWal;
|
SWal *pWal;
|
||||||
int64_t readerId;
|
int64_t readerId;
|
||||||
TdFilePtr pLogFile;
|
TdFilePtr pLogFile;
|
||||||
|
@ -154,7 +154,7 @@ typedef struct SWalReader {
|
||||||
SWalFilterCond cond;
|
SWalFilterCond cond;
|
||||||
// TODO remove it
|
// TODO remove it
|
||||||
SWalCkHead *pHead;
|
SWalCkHead *pHead;
|
||||||
} SWalReader;
|
};
|
||||||
|
|
||||||
// module initialization
|
// module initialization
|
||||||
int32_t walInit();
|
int32_t walInit();
|
||||||
|
@ -201,6 +201,7 @@ int32_t walNextValidMsg(SWalReader *pRead);
|
||||||
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
|
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
|
||||||
int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
|
int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
|
||||||
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
|
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
|
||||||
|
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
|
||||||
|
|
||||||
// only for tq usage
|
// only for tq usage
|
||||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||||
|
|
|
@ -185,7 +185,6 @@ typedef struct STsdbReader STsdbReader;
|
||||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||||
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, SHashObj** pIgnoreTables);
|
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly, SHashObj** pIgnoreTables);
|
||||||
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
||||||
|
|
||||||
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
||||||
|
@ -198,8 +197,6 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void *tsdbGetIdx(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
|
uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
|
||||||
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
|
||||||
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
|
|
||||||
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||||
|
|
||||||
int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
|
int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
|
||||||
|
@ -267,7 +264,7 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
|
||||||
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||||
int32_t tqNextBlockInWal(STqReader* pReader);
|
bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
|
||||||
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
||||||
|
|
||||||
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id);
|
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id);
|
||||||
|
|
|
@ -219,8 +219,6 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
||||||
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
|
|
||||||
int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
|
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -1195,159 +1195,6 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
|
||||||
bool failed = false;
|
|
||||||
SDecoder* pCoder = &(SDecoder){0};
|
|
||||||
SDeleteRes* pRes = &(SDeleteRes){0};
|
|
||||||
|
|
||||||
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
|
|
||||||
if (pRes->uidList == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
failed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
tDecoderInit(pCoder, pReq, len);
|
|
||||||
tDecodeDeleteRes(pCoder, pRes);
|
|
||||||
tDecoderClear(pCoder);
|
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pRes->uidList);
|
|
||||||
if (sz == 0 || pRes->affectedRows == 0) {
|
|
||||||
taosArrayDestroy(pRes->uidList);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
|
||||||
blockDataEnsureCapacity(pDelBlock, sz);
|
|
||||||
pDelBlock->info.rows = sz;
|
|
||||||
pDelBlock->info.version = ver;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
// start key column
|
|
||||||
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
||||||
colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column
|
|
||||||
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
||||||
colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
|
|
||||||
// uid column
|
|
||||||
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
|
|
||||||
int64_t* pUid = taosArrayGet(pRes->uidList, i);
|
|
||||||
colDataSetVal(pUidCol, i, (const char*)pUid, false);
|
|
||||||
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
|
|
||||||
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pRes->uidList);
|
|
||||||
|
|
||||||
int32_t* pRef = taosMemoryMalloc(sizeof(int32_t));
|
|
||||||
*pRef = 1;
|
|
||||||
|
|
||||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
|
||||||
|
|
||||||
void* pIter = NULL;
|
|
||||||
while (1) {
|
|
||||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
|
||||||
if (pIter == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("s-task:%s delete req enqueue, ver: %" PRId64, pTask->id.idStr, ver);
|
|
||||||
|
|
||||||
if (!failed) {
|
|
||||||
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
|
||||||
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
|
|
||||||
pRefBlock->pBlock = pDelBlock;
|
|
||||||
|
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
|
|
||||||
atomic_sub_fetch_32(pRef, 1);
|
|
||||||
taosFreeQitem(pRefBlock);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamSchedExec(pTask) < 0) {
|
|
||||||
qError("s-task:%s stream task launch failed", pTask->id.idStr);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
streamTaskInputFail(pTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
|
||||||
|
|
||||||
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
|
||||||
if (ref == 0) {
|
|
||||||
blockDataDestroy(pDelBlock);
|
|
||||||
taosMemoryFree(pRef);
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
|
||||||
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
SSDataBlock block = {0};
|
|
||||||
assignOneDataBlock(&block, pDelBlock);
|
|
||||||
block.info.type = STREAM_DELETE_DATA;
|
|
||||||
taosArrayPush(pStreamBlock->blocks, &block);
|
|
||||||
|
|
||||||
if (!failed) {
|
|
||||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
|
|
||||||
qError("stream task input del failed, task id %d", pTask->id.taskId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamSchedExec(pTask) < 0) {
|
|
||||||
qError("stream task launch failed, task id %d", pTask->id.taskId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
streamTaskInputFail(pTask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
blockDataDestroy(pDelBlock);
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
|
||||||
|
|
||||||
if (taosHashGetSize(pTq->pPushMgr) > 0) {
|
|
||||||
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
|
||||||
|
|
||||||
while (pIter) {
|
|
||||||
STqHandle* pHandle = *(STqHandle**)pIter;
|
|
||||||
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
|
||||||
|
|
||||||
if (ASSERT(pHandle->msg != NULL)) {
|
|
||||||
tqError("pHandle->msg should not be null");
|
|
||||||
break;
|
|
||||||
}else{
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
|
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
|
|
||||||
taosMemoryFree(pHandle->msg);
|
|
||||||
pHandle->msg = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashClear(pTq->pPushMgr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlock
|
|
||||||
taosWUnLockLatch(&pTq->lock);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTaskRunReq* pReq = pMsg->pCont;
|
SStreamTaskRunReq* pReq = pMsg->pCont;
|
||||||
|
|
||||||
|
|
|
@ -16,8 +16,40 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
|
|
||||||
|
if (taosHashGetSize(pTq->pPushMgr) > 0) {
|
||||||
|
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
||||||
|
|
||||||
|
while (pIter) {
|
||||||
|
STqHandle* pHandle = *(STqHandle**)pIter;
|
||||||
|
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
||||||
|
|
||||||
|
if (ASSERT(pHandle->msg != NULL)) {
|
||||||
|
tqError("pHandle->msg should not be null");
|
||||||
|
break;
|
||||||
|
}else{
|
||||||
|
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
|
||||||
|
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
|
||||||
|
taosMemoryFree(pHandle->msg);
|
||||||
|
pHandle->msg = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashClear(pTq->pPushMgr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// unlock
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
tqProcessSubmitReqForSubscribe(pTq);
|
tqProcessSubmitReqForSubscribe(pTq);
|
||||||
}
|
}
|
||||||
|
@ -37,10 +69,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
||||||
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
|
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType == TDMT_VND_DELETE) {
|
|
||||||
// tqProcessDeleteDataReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -344,7 +344,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo ignore the error in wal?
|
// todo ignore the error in wal?
|
||||||
int32_t tqNextBlockInWal(STqReader* pReader) {
|
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
SWalReader* pWalReader = pReader->pWalReader;
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -353,7 +353,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
// try next message in wal file
|
// try next message in wal file
|
||||||
// todo always retry to avoid read failure caused by wal file deletion
|
// todo always retry to avoid read failure caused by wal file deletion
|
||||||
if (walNextValidMsg(pWalReader) < 0) {
|
if (walNextValidMsg(pWalReader) < 0) {
|
||||||
return FETCH_TYPE__NONE;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||||
|
@ -379,24 +379,24 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
|
if (tDecodeSubmitReq(&decoder, &pReader->submit) < 0) {
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver);
|
tqError("decode wal file error, msgLen:%d, ver:%" PRId64, bodyLen, ver);
|
||||||
return FETCH_TYPE__NONE;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
pReader->nextBlk = 0;
|
pReader->nextBlk = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||||
while (pReader->nextBlk < numOfBlocks) {
|
while (pReader->nextBlk < numOfBlocks) {
|
||||||
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen,
|
tqDebug("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
|
||||||
pReader->msg.ver, pReader->nextBlk);
|
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
|
|
||||||
if (pReader->tbIdHash == NULL) {
|
if (pReader->tbIdHash == NULL) {
|
||||||
int32_t code = tqRetrieveDataBlock(pReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pReader, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
||||||
return FETCH_TYPE__DATA;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,7 +406,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
|
|
||||||
int32_t code = tqRetrieveDataBlock(pReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pReader, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
|
||||||
return FETCH_TYPE__DATA;
|
return true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pReader->nextBlk += 1;
|
pReader->nextBlk += 1;
|
||||||
|
@ -414,7 +414,9 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
|
||||||
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
||||||
|
|
||||||
pReader->msg.msgStr = NULL;
|
pReader->msg.msgStr = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -730,13 +732,17 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, const char* id) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo refactor:
|
||||||
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
|
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
|
||||||
tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
|
tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
pReader->nextBlk++;
|
pReader->nextBlk++;
|
||||||
|
|
||||||
if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
|
if (pSubmitTbDataRet) {
|
||||||
|
*pSubmitTbDataRet = pSubmitTbData;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sversion = pSubmitTbData->sver;
|
int32_t sversion = pSubmitTbData->sver;
|
||||||
int64_t suid = pSubmitTbData->suid;
|
int64_t suid = pSubmitTbData->suid;
|
||||||
int64_t uid = pSubmitTbData->uid;
|
int64_t uid = pSubmitTbData->uid;
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||||
|
|
||||||
// this function should be executed by stream threads.
|
// this function should be executed by stream threads.
|
||||||
// extract submit block from WAL, and add them into the input queue for the sources tasks.
|
// extract submit block from WAL, and add them into the input queue for the sources tasks.
|
||||||
|
@ -30,7 +30,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
|
|
||||||
// check all restore tasks
|
// check all restore tasks
|
||||||
bool shouldIdle = true;
|
bool shouldIdle = true;
|
||||||
createStreamRunReq(pTq->pStreamMeta, &shouldIdle);
|
createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle);
|
||||||
|
|
||||||
int32_t times = 0;
|
int32_t times = 0;
|
||||||
|
|
||||||
|
@ -57,7 +57,39 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
|
||||||
|
// seek the stored version and extract data from WAL
|
||||||
|
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
||||||
|
if (pTask->chkInfo.currentVer < firstVer) {
|
||||||
|
pTask->chkInfo.currentVer = firstVer;
|
||||||
|
tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
|
||||||
|
pTask->id.idStr, firstVer, pTask->chkInfo.currentVer);
|
||||||
|
|
||||||
|
// todo need retry if failed
|
||||||
|
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// append the data for the stream
|
||||||
|
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
|
} else {
|
||||||
|
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
|
if (currentVer == -1) { // we only seek the read for the first time
|
||||||
|
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// append the data for the stream
|
||||||
|
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
*pScanIdle = true;
|
*pScanIdle = true;
|
||||||
bool noNewDataInWal = true;
|
bool noNewDataInWal = true;
|
||||||
int32_t vgId = pStreamMeta->vgId;
|
int32_t vgId = pStreamMeta->vgId;
|
||||||
|
@ -67,6 +99,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clone the task list, to avoid the task update during scan wal files
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
taosWLockLatch(&pStreamMeta->lock);
|
taosWLockLatch(&pStreamMeta->lock);
|
||||||
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
||||||
|
@ -107,39 +140,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
*pScanIdle = false;
|
*pScanIdle = false;
|
||||||
|
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
int32_t code = doSetOffsetForWalReader(pTask, vgId);
|
||||||
if (pTask->chkInfo.currentVer < firstVer) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTask->chkInfo.currentVer = firstVer;
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
|
continue;
|
||||||
pTask->id.idStr, firstVer, pTask->chkInfo.currentVer);
|
|
||||||
|
|
||||||
// todo need retry if failed
|
|
||||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// append the data for the stream
|
|
||||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
|
||||||
} else {
|
|
||||||
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
|
||||||
if (currentVer == -1) {
|
|
||||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
|
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// append the data for the stream
|
|
||||||
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
|
|
||||||
pTask->chkInfo.currentVer);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// append the data for the stream
|
// append the data for the stream
|
||||||
SStreamQueueItem* pItem = NULL;
|
SStreamQueueItem* pItem = NULL;
|
||||||
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, pTask->id.idStr);
|
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
if (code != TSDB_CODE_SUCCESS) { // failed, continue
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -249,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offset->type == TMQ_OFFSET__LOG) {
|
if (offset->type == TMQ_OFFSET__LOG) {
|
||||||
verifyOffset(pHandle->pWalReader, offset);
|
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
||||||
int64_t fetchVer = offset->version + 1;
|
int64_t fetchVer = offset->version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
|
|
|
@ -1058,17 +1058,6 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
}
|
}
|
||||||
|
|
||||||
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
|
|
||||||
// if offset version is small than first version , let's seek to first version
|
|
||||||
taosThreadMutexLock(&((SWalReader*)pWalReader)->pWal->mutex);
|
|
||||||
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
|
|
||||||
taosThreadMutexUnlock(&((SWalReader*)pWalReader)->pWal->mutex);
|
|
||||||
|
|
||||||
if (pOffset->version + 1 < firstVer){
|
|
||||||
pOffset->version = firstVer - 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
|
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
SOperatorInfo* pOperator = pTaskInfo->pRoot;
|
||||||
|
@ -1095,7 +1084,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
tsdbReaderClose(pScanBaseInfo->dataReader);
|
tsdbReaderClose(pScanBaseInfo->dataReader);
|
||||||
pScanBaseInfo->dataReader = NULL;
|
pScanBaseInfo->dataReader = NULL;
|
||||||
|
|
||||||
verifyOffset(pInfo->tqReader->pWalReader, pOffset);
|
walReaderVerifyOffset(pInfo->tqReader->pWalReader, pOffset);
|
||||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
|
||||||
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -1697,13 +1697,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t type = tqNextBlockInWal(pInfo->tqReader);
|
bool hasResult = tqNextBlockInWal(pInfo->tqReader, id);
|
||||||
SSDataBlock* pRes = pInfo->tqReader->pResBlock;
|
SSDataBlock* pRes = pInfo->tqReader->pResBlock;
|
||||||
|
|
||||||
// curVersion move to next, so currentOffset = curVersion - 1
|
// curVersion move to next, so currentOffset = curVersion - 1
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
|
||||||
|
|
||||||
if (type == FETCH_TYPE__DATA) {
|
if (hasResult) {
|
||||||
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
||||||
pTaskInfo->streamInfo.currentOffset.version);
|
pTaskInfo->streamInfo.currentOffset.version);
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
@ -1711,7 +1711,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
} else if (type == FETCH_TYPE__NONE) {
|
} else {
|
||||||
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
|
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2074,8 +2074,9 @@ FETCH_NEXT_BLOCK:
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* id = GET_TASKID(pTaskInfo);
|
const char* id = GET_TASKID(pTaskInfo);
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SSDataBlock* pBlock = pInfo->pRes;
|
||||||
|
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||||
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
|
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
|
|
||||||
NEXT_SUBMIT_BLK:
|
NEXT_SUBMIT_BLK:
|
||||||
|
@ -2086,12 +2087,6 @@ FETCH_NEXT_BLOCK:
|
||||||
doClearBufferedBlocks(pInfo);
|
doClearBufferedBlocks(pInfo);
|
||||||
|
|
||||||
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
|
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
|
||||||
void* buff = NULL;
|
|
||||||
// int32_t len = streamScanOperatorEncode(pInfo, &buff);
|
|
||||||
// if (len > 0) {
|
|
||||||
// streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len);
|
|
||||||
// }
|
|
||||||
taosMemoryFreeClear(buff);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2105,7 +2100,7 @@ FETCH_NEXT_BLOCK:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
while (tqNextBlockImpl(pInfo->tqReader, id)) {
|
while (tqNextBlockImpl(pInfo->tqReader, id)) {
|
||||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
|
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
|
||||||
|
@ -2120,10 +2115,10 @@ FETCH_NEXT_BLOCK:
|
||||||
return pInfo->pCreateTbRes;
|
return pInfo->pCreateTbRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
|
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
|
||||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
pInfo->pRes->info.dataLoad = 1;
|
pBlock->info.dataLoad = 1;
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -2143,7 +2138,7 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
return pInfo->pRes;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
|
@ -2151,10 +2146,9 @@ FETCH_NEXT_BLOCK:
|
||||||
}
|
}
|
||||||
|
|
||||||
goto NEXT_SUBMIT_BLK;
|
goto NEXT_SUBMIT_BLK;
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
|
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
|
||||||
|
@ -2242,44 +2236,6 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
|
||||||
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
|
|
||||||
//
|
|
||||||
// while(1){
|
|
||||||
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
|
|
||||||
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
|
|
||||||
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
|
||||||
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
|
||||||
// return NULL;
|
|
||||||
// }
|
|
||||||
// SWalCont* pHead = &pInfo->pCkHead->head;
|
|
||||||
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
|
|
||||||
//
|
|
||||||
// if (pHead->msgType == TDMT_VND_SUBMIT) {
|
|
||||||
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
|
||||||
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
|
|
||||||
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
|
|
||||||
// &pInfo->pRes); if(block){
|
|
||||||
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
|
|
||||||
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
|
|
||||||
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
|
||||||
// return block;
|
|
||||||
// }else{
|
|
||||||
// fetchVer++;
|
|
||||||
// }
|
|
||||||
// } else{
|
|
||||||
// ASSERT(pInfo->sContext->withMeta);
|
|
||||||
// ASSERT(IS_META_MSG(pHead->msgType));
|
|
||||||
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
|
|
||||||
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
|
|
||||||
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
|
||||||
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
|
|
||||||
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
|
|
||||||
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
|
|
||||||
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
|
|
||||||
// return NULL;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ SStreamQueue* streamQueueOpen(int64_t cap) {
|
||||||
taosSetQueueCapacity(pQueue->queue, cap);
|
taosSetQueueCapacity(pQueue->queue, cap);
|
||||||
taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024);
|
taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024);
|
||||||
return pQueue;
|
return pQueue;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
if (pQueue->queue) taosCloseQueue(pQueue->queue);
|
if (pQueue->queue) taosCloseQueue(pQueue->queue);
|
||||||
if (pQueue->qall) taosFreeQall(pQueue->qall);
|
if (pQueue->qall) taosFreeQall(pQueue->qall);
|
||||||
|
|
|
@ -116,6 +116,17 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *eve
|
||||||
*ever = pReader->cond.scanUncommited ? lastVer : committedVer;
|
*ever = pReader->cond.scanUncommited ? lastVer : committedVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset){
|
||||||
|
// if offset version is small than first version , let's seek to first version
|
||||||
|
taosThreadMutexLock(&pWalReader->pWal->mutex);
|
||||||
|
int64_t firstVer = walGetFirstVer((pWalReader)->pWal);
|
||||||
|
taosThreadMutexUnlock(&pWalReader->pWal->mutex);
|
||||||
|
|
||||||
|
if (pOffset->version + 1 < firstVer){
|
||||||
|
pOffset->version = firstVer - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
|
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
|
||||||
int64_t ret = 0;
|
int64_t ret = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue