Merge pull request #11134 from taosdata/feature/3.0_liaohj
[td-14392] fix show bug.
This commit is contained in:
commit
1e5bdbfa60
|
@ -310,6 +310,8 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
|
||||||
mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
|
mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pShow->numOfReads = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
|
ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
|
||||||
|
@ -374,7 +376,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
|
||||||
pReq->pRsp = pRsp;
|
pReq->pRsp = pRsp;
|
||||||
pReq->rspLen = size;
|
pReq->rspLen = size;
|
||||||
|
|
||||||
if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
|
if (rowsRead == 0 || rowsToRead == 0 || (rowsRead < rowsToRead)) {
|
||||||
pRsp->completed = 1;
|
pRsp->completed = 1;
|
||||||
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
|
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
|
||||||
mndReleaseShowObj((SShowObj*) pShow, true);
|
mndReleaseShowObj((SShowObj*) pShow, true);
|
||||||
|
|
|
@ -1608,7 +1608,6 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
|
||||||
SStbObj *pStb = NULL;
|
SStbObj *pStb = NULL;
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
char *pWrite;
|
char *pWrite;
|
||||||
char prefix[TSDB_DB_FNAME_LEN] = {0};
|
|
||||||
|
|
||||||
SDbObj* pDb = NULL;
|
SDbObj* pDb = NULL;
|
||||||
if (strlen(pShow->db) > 0) {
|
if (strlen(pShow->db) > 0) {
|
||||||
|
|
|
@ -167,7 +167,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
||||||
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {
|
if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {
|
||||||
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,9 +69,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return tqProcessPollReq(pVnode->pTq, pMsg);
|
return tqProcessPollReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_TASK_PIPE_EXEC:
|
case TDMT_VND_TASK_PIPE_EXEC:
|
||||||
case TDMT_VND_TASK_MERGE_EXEC:
|
case TDMT_VND_TASK_MERGE_EXEC:
|
||||||
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, pInfo->workerId);
|
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
|
||||||
case TDMT_VND_STREAM_TRIGGER:
|
case TDMT_VND_STREAM_TRIGGER:
|
||||||
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, pInfo->workerId);
|
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
|
||||||
case TDMT_VND_QUERY_HEARTBEAT:
|
case TDMT_VND_QUERY_HEARTBEAT:
|
||||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -156,6 +156,8 @@ SArray* interResFromBinary(const char* data, int32_t len);
|
||||||
void freeInterResult(void* param);
|
void freeInterResult(void* param);
|
||||||
|
|
||||||
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
|
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
|
||||||
|
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||||
|
|
||||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||||
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
|
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
|
||||||
bool hasRemainData(SGroupResInfo* pGroupResInfo);
|
bool hasRemainData(SGroupResInfo* pGroupResInfo);
|
||||||
|
|
|
@ -422,6 +422,7 @@ typedef struct SStreamBlockScanInfo {
|
||||||
uint64_t numOfRows; // total scanned rows
|
uint64_t numOfRows; // total scanned rows
|
||||||
uint64_t numOfExec; // execution times
|
uint64_t numOfExec; // execution times
|
||||||
void* readerHandle; // stream block reader handle
|
void* readerHandle; // stream block reader handle
|
||||||
|
SArray* pColMatchInfo; //
|
||||||
} SStreamBlockScanInfo;
|
} SStreamBlockScanInfo;
|
||||||
|
|
||||||
typedef struct SSysTableScanInfo {
|
typedef struct SSysTableScanInfo {
|
||||||
|
|
|
@ -222,6 +222,16 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo)
|
||||||
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
|
||||||
|
if (pGroupResInfo->pRows != NULL) {
|
||||||
|
taosArrayDestroy(pGroupResInfo->pRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pGroupResInfo->pRows = pArrayList;
|
||||||
|
pGroupResInfo->index = 0;
|
||||||
|
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||||
|
}
|
||||||
|
|
||||||
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) {
|
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) {
|
||||||
if (pGroupResInfo->pRows == NULL) {
|
if (pGroupResInfo->pRows == NULL) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -352,7 +352,7 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
idata.info.type = pDescNode->dataType.type;
|
idata.info.type = pDescNode->dataType.type;
|
||||||
idata.info.bytes = pDescNode->dataType.bytes;
|
idata.info.bytes = pDescNode->dataType.bytes;
|
||||||
idata.info.scale = pDescNode->dataType.scale;
|
idata.info.scale = pDescNode->dataType.scale;
|
||||||
idata.info.slotId = pDescNode->slotId;
|
idata.info.slotId = pDescNode->slotId;
|
||||||
|
@ -1551,8 +1551,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
||||||
if (pSDataBlock->pDataBlock != NULL) {
|
if (pSDataBlock->pDataBlock != NULL) {
|
||||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||||
tsCols = (int64_t*)pColDataInfo->pData;
|
tsCols = (int64_t*)pColDataInfo->pData;
|
||||||
assert(tsCols[0] == pSDataBlock->info.window.skey &&
|
// assert(tsCols[0] == pSDataBlock->info.window.skey &&
|
||||||
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
// tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1);
|
int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1);
|
||||||
|
@ -3660,6 +3660,7 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased
|
||||||
|
|
||||||
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
|
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
|
||||||
pCtx[j].fpSet.finalize(&pCtx[j]);
|
pCtx[j].fpSet.finalize(&pCtx[j]);
|
||||||
|
pResInfo->initialized = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRow->numOfRows < pResInfo->numOfRes) {
|
if (pRow->numOfRows < pResInfo->numOfRes) {
|
||||||
|
@ -3778,10 +3779,6 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf* pBuf, SResultRow* pResult, S
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t functionId = pCtx[i].functionId;
|
|
||||||
// if (functionId < 0) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
|
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
|
||||||
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
|
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
|
||||||
// }
|
// }
|
||||||
|
@ -4971,7 +4968,20 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pRes->pDataBlock = tqRetrieveDataBlock(pInfo->readerHandle);
|
SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle);
|
||||||
|
|
||||||
|
int32_t numOfCols = pInfo->pRes->info.numOfCols;
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||||
|
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
|
||||||
|
if (!pColMatchInfo->output) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pColMatchInfo->colId == p->info.colId);
|
||||||
|
taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, p);
|
||||||
|
}
|
||||||
|
|
||||||
if (pInfo->pRes->pDataBlock == NULL) {
|
if (pInfo->pRes->pDataBlock == NULL) {
|
||||||
// TODO add log
|
// TODO add log
|
||||||
pTaskInfo->code = terrno;
|
pTaskInfo->code = terrno;
|
||||||
|
@ -5625,8 +5635,18 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t numOfOutput = taosArrayGetSize(pColList);
|
||||||
|
|
||||||
|
SArray* pColIds = taosArrayInit(4, sizeof(int16_t));
|
||||||
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
int16_t* id = taosArrayGet(pColList, i);
|
||||||
|
taosArrayPush(pColIds, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pColMatchInfo = pColList;
|
||||||
|
|
||||||
// set the extract column id to streamHandle
|
// set the extract column id to streamHandle
|
||||||
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColList);
|
tqReadHandleSetColIdList((STqReadHandle*)streamReadHandle, pColIds);
|
||||||
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
|
int32_t code = tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMemoryFreeClear(pInfo);
|
taosMemoryFreeClear(pInfo);
|
||||||
|
@ -5665,9 +5685,9 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
|
||||||
|
|
||||||
SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
|
SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
|
||||||
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
||||||
pRsp->useconds = htobe64(pRsp->useconds);
|
pRsp->useconds = htobe64(pRsp->useconds);
|
||||||
pRsp->handle = htobe64(pRsp->handle);
|
pRsp->handle = htobe64(pRsp->handle);
|
||||||
pRsp->compLen = htonl(pRsp->compLen);
|
pRsp->compLen = htonl(pRsp->compLen);
|
||||||
} else {
|
} else {
|
||||||
operator->pTaskInfo->code = code;
|
operator->pTaskInfo->code = code;
|
||||||
}
|
}
|
||||||
|
@ -5824,6 +5844,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
// pInfo->totalBytes;
|
// pInfo->totalBytes;
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
} else { // load the meta from mnode of the given epset
|
} else { // load the meta from mnode of the given epset
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t startTs = taosGetTimestampUs();
|
int64_t startTs = taosGetTimestampUs();
|
||||||
|
|
||||||
pInfo->req.type = pInfo->type;
|
pInfo->req.type = pInfo->type;
|
||||||
|
@ -5863,6 +5887,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
|
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
|
||||||
pInfo->req.showId = pRsp->handle;
|
pInfo->req.showId = pRsp->handle;
|
||||||
|
|
||||||
|
if (pRsp->numOfRows == 0 || pRsp->completed) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
if (pRsp->numOfRows == 0) {
|
if (pRsp->numOfRows == 0) {
|
||||||
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64"
|
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64"
|
||||||
// try next",
|
// try next",
|
||||||
|
@ -6964,7 +6992,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// STimeWindow win = {0};
|
// STimeWindow win = {0};
|
||||||
|
@ -6993,6 +7021,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
|
||||||
|
|
||||||
finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||||
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
||||||
pInfo->binfo.rowCellInfoOffset);
|
pInfo->binfo.rowCellInfoOffset);
|
||||||
|
@ -8792,10 +8821,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
|
|
||||||
|
|
||||||
SOperatorInfo* pOperator =
|
int32_t numOfCols = 0;
|
||||||
createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo);
|
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
|
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pColList, tableIdList, pTaskInfo);
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
|
||||||
|
|
|
@ -152,6 +152,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
// sink
|
// sink
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
//
|
//
|
||||||
|
blockDebugShowData(pRes);
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||||
//
|
//
|
||||||
|
|
Loading…
Reference in New Issue