refactor: do some internal refactor.
This commit is contained in:
parent
b02b3117b5
commit
8a1b0ed052
|
@ -256,7 +256,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||||
int32_t tqNextBlockInWal(STqReader* pReader);
|
int32_t tqNextBlockInWal(STqReader* pReader);
|
||||||
bool tqNextBlockImpl(STqReader *pReader);
|
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
||||||
|
|
||||||
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
||||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||||
|
|
|
@ -608,6 +608,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
|
@ -621,6 +622,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
if (pTask->exec.pExecutor == NULL) {
|
if (pTask->exec.pExecutor == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
|
|
|
@ -419,15 +419,15 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tqNextBlockImpl(STqReader* pReader) {
|
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||||
if (pReader->msg.msgStr == NULL) {
|
if (pReader->msg.msgStr == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||||
while (pReader->nextBlk < blockSz) {
|
while (pReader->nextBlk < numOfBlocks) {
|
||||||
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen,
|
tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen,
|
||||||
pReader->msg.ver, pReader->nextBlk);
|
pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
if (pReader->tbIdHash == NULL) {
|
if (pReader->tbIdHash == NULL) {
|
||||||
|
|
|
@ -205,7 +205,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
STqReader* pReader = pExec->pTqReader;
|
STqReader* pReader = pExec->pTqReader;
|
||||||
tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
|
tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
|
||||||
while (tqNextBlockImpl(pReader)) {
|
while (tqNextBlockImpl(pReader, NULL)) {
|
||||||
taosArrayClear(pBlocks);
|
taosArrayClear(pBlocks);
|
||||||
taosArrayClear(pSchemas);
|
taosArrayClear(pSchemas);
|
||||||
SSubmitTbData* pSubmitTbDataRet = NULL;
|
SSubmitTbData* pSubmitTbDataRet = NULL;
|
||||||
|
|
|
@ -1626,7 +1626,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
|
|
||||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
while (tqNextBlockImpl(pInfo->tqReader, NULL)) {
|
||||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -2077,7 +2077,7 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
while (tqNextBlockImpl(pInfo->tqReader, pTaskInfo->id.str)) {
|
||||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, pTaskInfo->id.str);
|
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue