refactor: do some internal refactor, and update the logs.
This commit is contained in:
parent
7f19909147
commit
1bd0cc4a5a
|
@ -239,6 +239,7 @@ typedef struct SSourceDataInfo {
|
||||||
int32_t index;
|
int32_t index;
|
||||||
SRetrieveTableRsp* pRsp;
|
SRetrieveTableRsp* pRsp;
|
||||||
uint64_t totalRows;
|
uint64_t totalRows;
|
||||||
|
int64_t startTime;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
EX_SOURCE_STATUS status;
|
EX_SOURCE_STATUS status;
|
||||||
const char* taskId;
|
const char* taskId;
|
||||||
|
|
|
@ -44,7 +44,7 @@ typedef struct SFetchRspHandleWrapper {
|
||||||
static void destroyExchangeOperatorInfo(void* param);
|
static void destroyExchangeOperatorInfo(void* param);
|
||||||
static void freeBlock(void* pParam);
|
static void freeBlock(void* pParam);
|
||||||
static void freeSourceDataInfo(void* param);
|
static void freeSourceDataInfo(void* param);
|
||||||
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs);
|
static void* setAllSourcesCompleted(SOperatorInfo* pOperator);
|
||||||
|
|
||||||
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
|
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
|
||||||
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
|
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
|
||||||
|
@ -59,7 +59,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
|
||||||
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
||||||
if (completed == totalSources) {
|
if (completed == totalSources) {
|
||||||
setAllSourcesCompleted(pOperator, pExchangeInfo->openedTs);
|
setAllSourcesCompleted(pOperator);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +113,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
||||||
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator);
|
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
|
||||||
|
pDataInfo->totalRows += pRetrieveRsp->numOfRows;
|
||||||
|
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
|
@ -388,6 +389,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
||||||
|
|
||||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
|
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
|
||||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
|
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
|
||||||
|
pDataInfo->startTime = taosGetTimestampUs();
|
||||||
|
|
||||||
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
|
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
|
||||||
|
|
||||||
|
@ -493,18 +495,14 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
|
void* setAllSourcesCompleted(SOperatorInfo* pOperator) {
|
||||||
SExchangeInfo* pExchangeInfo = pOperator->info;
|
SExchangeInfo* pExchangeInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - startTs;
|
|
||||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
|
|
||||||
pLoadInfo->totalElapsed += el;
|
|
||||||
|
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||||
qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
|
qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
|
||||||
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
|
GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
|
||||||
pLoadInfo->totalElapsed / 1000.0);
|
pLoadInfo->totalElapsed / 1000.0);
|
||||||
|
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
|
@ -566,7 +564,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pExchangeInfo->current >= totalSources) {
|
if (pExchangeInfo->current >= totalSources) {
|
||||||
setAllSourcesCompleted(pOperator, startTs);
|
setAllSourcesCompleted(pOperator);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -690,6 +690,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
|
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
@ -754,7 +756,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
||||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||||
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
|
SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
pInfo->groupSort = pMergePhyNode->groupSort;
|
pInfo->groupSort = pMergePhyNode->groupSort;
|
||||||
|
|
Loading…
Reference in New Issue