fix res issue
This commit is contained in:
parent
333e1a97d5
commit
57068232ff
|
@ -218,6 +218,6 @@ uint64_t calcGroupId(char* pData, int32_t len);
|
|||
|
||||
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);
|
||||
|
||||
int32_t extractKeysLen(const SArray* keys);
|
||||
int32_t extractKeysLen(const SArray* keys, int32_t* pLen);
|
||||
|
||||
#endif // TDENGINE_EXECUTIL_H
|
||||
|
|
|
@ -405,6 +405,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
|||
|
||||
for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
colDataSetNULL(pColInfoData, 0);
|
||||
}
|
||||
*ppBlock = pBlock;
|
||||
|
|
|
@ -58,13 +58,16 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchI
|
|||
|
||||
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
||||
|
||||
static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
|
||||
static int32_t setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SNode* pNode;
|
||||
int32_t idx = 0;
|
||||
FOREACH(pNode, pScan->pTargets) {
|
||||
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
|
||||
pColInfo->info.colId = pCol->colId;
|
||||
}
|
||||
idx++;
|
||||
|
@ -72,6 +75,7 @@ static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode
|
|||
|
||||
for (; idx < pBlock->pDataBlock->size; ++idx) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
|
||||
if (pScan->scan.pScanPseudoCols) {
|
||||
FOREACH(pNode, pScan->scan.pScanPseudoCols) {
|
||||
STargetNode* pTarget = (STargetNode*)pNode;
|
||||
|
@ -82,6 +86,12 @@ static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
||||
|
@ -125,6 +135,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
} else {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||
SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
QUERY_CHECK_NULL(pItem, code, lino, _error, terrno);
|
||||
if (pItem->isPk) {
|
||||
pInfo->numOfPks += 1;
|
||||
pInfo->pkCol.type = pItem->dataType.type; // only record one primary key
|
||||
|
@ -147,10 +158,14 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
|
||||
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
|
||||
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, _error, terrno);
|
||||
|
||||
void* tmp = taosArrayPush(pCidList, &pColInfo->colId);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
|
||||
if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
|
||||
pColInfo->funcType = *(int32_t*)taosArrayGet(pInfo->pFuncTypeList, i);
|
||||
void* pFuncType = taosArrayGet(pInfo->pFuncTypeList, i);
|
||||
QUERY_CHECK_NULL(pFuncType, code, lino, _error, terrno);
|
||||
pColInfo->funcType = *(int32_t*)pFuncType;
|
||||
}
|
||||
}
|
||||
pInfo->pCidList = pCidList;
|
||||
|
@ -186,13 +201,16 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
|
||||
code = setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
} else { // by tags
|
||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
||||
capacity = 1; // only one row output
|
||||
setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
|
||||
code = setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, capacity);
|
||||
|
@ -282,10 +300,13 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
|
||||
QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
|
||||
int32_t slotId = pCol->info.slotId;
|
||||
|
||||
SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
|
||||
QUERY_CHECK_NULL(pSrc, code, lino, _end, terrno);
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
|
||||
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
|
||||
|
||||
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
|
||||
colDataSetNULL(pDst, 0);
|
||||
|
@ -298,7 +319,10 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
}
|
||||
}
|
||||
|
||||
pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
|
||||
void* pUid = taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
|
||||
QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
|
||||
|
||||
pRes->info.id.uid = *(tb_uid_t*)pUid;
|
||||
pRes->info.rows = 1;
|
||||
pRes->info.scanFlag = MAIN_SCAN;
|
||||
|
||||
|
@ -367,7 +391,9 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
|||
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
|
||||
|
||||
if (taosArrayGetSize(pInfo->pUidList) > 0) {
|
||||
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
||||
void* pUid = taosArrayGet(pInfo->pUidList, 0);
|
||||
QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
|
||||
pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
|
||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
||||
pInfo->pRes->info.rows, pTaskInfo, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -449,6 +475,9 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
|
||||
if (!pColMatch) {
|
||||
return terrno;
|
||||
}
|
||||
bool found = false;
|
||||
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
|
||||
/* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
|
@ -485,11 +514,16 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
|
|||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
|
||||
if (!pColInfo) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t slotId = pColInfo->dstSlotId;
|
||||
SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
|
||||
|
||||
SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
|
||||
QUERY_CHECK_NULL(pDesc, code, lino, _end, terrno);
|
||||
|
||||
if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
void* tmp = taosArrayPush(pMatchInfo, pColInfo);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
|
|
@ -65,6 +65,9 @@ static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; }
|
|||
|
||||
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
|
||||
SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
|
||||
if (!pBuffInfo) {
|
||||
return NULL;
|
||||
}
|
||||
int32_t size = taosArrayGetSize(pCountSup->pWinStates);
|
||||
// coverity scan
|
||||
ASSERTS(size > 0, "WinStates is empty");
|
||||
|
@ -99,7 +102,8 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
SCountWindowOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
|
||||
TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.rows;) {
|
||||
SCountWindowResult* pBuffInfo = NULL;
|
||||
|
@ -140,6 +144,13 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
i += step;
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo,
|
||||
|
|
|
@ -284,6 +284,7 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p
|
|||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
int64_t gid = pBlock->info.id.groupId;
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _return, terrno);
|
||||
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
|
||||
SWindowRowsSup* pRowSup = &pInfo->winSup;
|
||||
SColumnInfoData *ps = NULL, *pe = NULL;
|
||||
|
|
|
@ -52,7 +52,7 @@ static void setAllSourcesCompleted(SOperatorInfo* pOperator);
|
|||
|
||||
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
|
||||
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
|
||||
static int32_t getCompletedSources(const SArray* pArray);
|
||||
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
|
||||
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
|
||||
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
|
||||
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
|
||||
|
@ -65,8 +65,14 @@ static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeIn
|
|||
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
|
||||
int32_t completed = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
||||
int32_t completed = 0;
|
||||
code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (completed == totalSources) {
|
||||
setAllSourcesCompleted(pOperator);
|
||||
return;
|
||||
|
@ -84,6 +90,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
|
||||
for (int32_t i = 0; i < totalSources; ++i) {
|
||||
pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
||||
QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
|
||||
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||
continue;
|
||||
}
|
||||
|
@ -100,6 +107,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
tmemory_barrier();
|
||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
|
||||
QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
|
||||
|
||||
// todo
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
|
@ -159,7 +167,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
return;
|
||||
} // end loop
|
||||
|
||||
int32_t complete1 = getCompletedSources(pExchangeInfo->pSourceDataInfo);
|
||||
int32_t complete1 = 0;
|
||||
code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (complete1 == totalSources) {
|
||||
qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
|
||||
return;
|
||||
|
@ -350,9 +363,13 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
|
|||
|
||||
for (int32_t i = 0; i < numOfSources; ++i) {
|
||||
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
|
||||
void* tmp = taosArrayPush(pInfo->pSources, pNode);
|
||||
if (!pNode) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
void* tmp = taosArrayPush(pInfo->pSources, pNode);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
|
||||
|
@ -477,6 +494,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
int32_t index = pWrapper->sourceIndex;
|
||||
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
|
||||
if (!pSourceDataInfo) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pSourceDataInfo->pRsp = pMsg->pData;
|
||||
|
@ -547,12 +567,20 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
|
||||
if (!pDataInfo) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pDataInfo->status = EX_SOURCE_DATA_STARTED;
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
|
||||
if (!pSource) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pDataInfo->startTime = taosGetTimestampUs();
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
|
||||
|
@ -727,18 +755,26 @@ void setAllSourcesCompleted(SOperatorInfo* pOperator) {
|
|||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
int32_t getCompletedSources(const SArray* pArray) {
|
||||
size_t total = taosArrayGetSize(pArray);
|
||||
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
size_t total = taosArrayGetSize(pArray);
|
||||
|
||||
int32_t completed = 0;
|
||||
for (int32_t k = 0; k < total; ++k) {
|
||||
SSourceDataInfo* p = taosArrayGet(pArray, k);
|
||||
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||
completed += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return completed;
|
||||
*pRes = completed;
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
|
||||
|
@ -855,6 +891,11 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
|
||||
if (!pDataInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
pTaskInfo->code = terrno;
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
|
||||
|
||||
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
|
||||
|
@ -870,6 +911,11 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||
if (!pSource) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
pTaskInfo->code = terrno;
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
|
||||
|
@ -952,6 +998,9 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
|
|||
pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
|
||||
} else {
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
|
||||
if (!pDataInfo) {
|
||||
return terrno;
|
||||
}
|
||||
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
|
||||
}
|
||||
|
|
|
@ -233,6 +233,12 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
|
||||
if (!pSortKey) {
|
||||
taosArrayDestroy(pList);
|
||||
pList = NULL;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
SBlockOrderInfo bi = {0};
|
||||
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
||||
|
@ -267,6 +273,13 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
|
||||
if (!pDescNode) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
blockDataDestroy(pBlock);
|
||||
pBlock = NULL;
|
||||
terrno = code;
|
||||
break;
|
||||
}
|
||||
SColumnInfoData idata =
|
||||
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
|
||||
idata.info.scale = pDescNode->dataType.scale;
|
||||
|
@ -290,9 +303,15 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
|
||||
SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
|
||||
if (!pItem) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pItem->isPk) {
|
||||
SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
|
||||
if (!pInfoData) {
|
||||
return terrno;
|
||||
}
|
||||
pBlockInfo->pks[0].type = pInfoData->info.type;
|
||||
pBlockInfo->pks[1].type = pInfoData->info.type;
|
||||
|
||||
|
@ -652,6 +671,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
|
||||
for (int32_t i = 0; i < rows; ++i) {
|
||||
STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
QUERY_CHECK_NULL(pkeyInfo, code, lino, end, terrno);
|
||||
STUidTagInfo info = {.uid = pkeyInfo->uid};
|
||||
void* tmp = taosArrayPush(pUidTagList, &info);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
|
||||
|
@ -706,6 +726,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pSColumnNode = (SColumnNode*)pNode;
|
||||
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, end, terrno);
|
||||
code = colDataAssign(output.columnData, pColInfo, rows, NULL);
|
||||
} else if (nodeType(pNode) == QUERY_NODE_VALUE) {
|
||||
continue;
|
||||
|
@ -749,6 +770,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
|||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
QUERY_CHECK_NULL(info, code, lino, end, terrno);
|
||||
|
||||
char* isNull = (char*)keyBuf;
|
||||
char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
|
||||
|
@ -867,12 +889,15 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
|
|||
// remove the duplicates
|
||||
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
||||
QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
||||
void* tmpTbl = taosArrayGet(pTbList, 0);
|
||||
QUERY_CHECK_NULL(tmpTbl, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(pNewList, tmpTbl);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
||||
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||
char** name = taosArrayGetLast(pNewList);
|
||||
char** nameInOldList = taosArrayGet(pTbList, i);
|
||||
QUERY_CHECK_NULL(nameInOldList, code, lino, _end, terrno);
|
||||
if (strcmp(*name, *nameInOldList) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
@ -999,6 +1024,9 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
|
|||
|
||||
for (int i = 0; i < numOfExisted; i++) {
|
||||
STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
|
||||
if (!pTInfo) {
|
||||
return terrno;
|
||||
}
|
||||
int32_t tempRes = taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
|
||||
if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
|
||||
|
@ -1050,7 +1078,9 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
|
||||
SColumnInfoData colInfo = {0};
|
||||
colInfo.info = *(SColumnInfo*)taosArrayGet(pColList, i);
|
||||
void* tmp = taosArrayGet(pColList, i);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
colInfo.info = *(SColumnInfo*)tmp;
|
||||
code = blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
@ -1068,9 +1098,11 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
|
|||
|
||||
for (int32_t i = 0; i < numOfTables; i++) {
|
||||
STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
|
||||
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
|
||||
|
||||
for (int32_t j = 0; j < numOfCols; j++) {
|
||||
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
|
||||
|
||||
if (pColInfo->info.colId == -1) { // tbname
|
||||
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
|
@ -1144,7 +1176,11 @@ static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, co
|
|||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
if (pResultList[i]) {
|
||||
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
|
||||
STUidTagInfo* tmpTag = (STUidTagInfo*)taosArrayGet(pUidTagList, i);
|
||||
if (!tmpTag) {
|
||||
return terrno;
|
||||
}
|
||||
uint64_t uid = tmpTag->uid;
|
||||
qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
|
||||
|
||||
info.uid = uid;
|
||||
|
@ -1174,6 +1210,9 @@ static int32_t copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
|
|||
|
||||
for (int32_t i = 0; i < numOfExisted; ++i) {
|
||||
uint64_t* uid = taosArrayGet(pUidList, i);
|
||||
if (!uid) {
|
||||
return terrno;
|
||||
}
|
||||
STUidTagInfo info = {.uid = *uid};
|
||||
void* tmp = taosArrayPush(pUidTagList, &info);
|
||||
if (!tmp) {
|
||||
|
@ -1242,6 +1281,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
|||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
|
||||
QUERY_CHECK_NULL(pInfo, code, lino, end, terrno);
|
||||
void* tmp = taosArrayPush(pUidList, &pInfo->uid);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
|
||||
}
|
||||
|
@ -1392,7 +1432,9 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
|||
|
||||
*(int32_t*)pPayload = numOfTables;
|
||||
if (numOfTables > 0) {
|
||||
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t));
|
||||
void* tmp = taosArrayGet(pUidList, 0);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
memcpy(pPayload + sizeof(int32_t), tmp, numOfTables * sizeof(uint64_t));
|
||||
}
|
||||
|
||||
code = pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest),
|
||||
|
@ -1408,7 +1450,9 @@ _end:
|
|||
if (!listAdded) {
|
||||
numOfTables = taosArrayGetSize(pUidList);
|
||||
for (int i = 0; i < numOfTables; i++) {
|
||||
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(pUidList, i), .groupId = 0};
|
||||
void* tmp = taosArrayGet(pUidList, 0);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
STableKeyInfo info = {.uid = *(uint64_t*)tmp, .groupId = 0};
|
||||
|
||||
void* p = taosArrayPush(pListInfo->pTableList, &info);
|
||||
if (p == NULL) {
|
||||
|
@ -1549,6 +1593,11 @@ SArray* makeColumnArrayFromList(SNodeList* pNodeList) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pNodeList, i);
|
||||
if (!pColNode) {
|
||||
taosArrayDestroy(pList);
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// todo extract method
|
||||
SColumn c = {0};
|
||||
|
@ -1586,6 +1635,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||
QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
|
||||
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||
|
||||
|
@ -1605,6 +1655,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
|
||||
QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
|
||||
|
||||
// todo: add reserve flag check
|
||||
// it is a column reserved for the arithmetic expression calculation
|
||||
|
@ -1616,6 +1667,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
SColMatchItem* info = NULL;
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
|
||||
info = taosArrayGet(pList, j);
|
||||
QUERY_CHECK_NULL(info, code, lino, _end, terrno);
|
||||
if (info->dstSlotId == pNode->slotId) {
|
||||
break;
|
||||
}
|
||||
|
@ -1755,6 +1807,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
|
||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
||||
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
|
||||
if (p1->type == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pcn = (SColumnNode*)p1;
|
||||
|
||||
|
@ -1830,6 +1883,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
|
|||
int32_t code = createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pExprs);
|
||||
terrno = code;
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1865,6 +1919,10 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo**
|
|||
} else {
|
||||
pTargetNode = (STargetNode*)nodesListGetNode(pGroupKeys, i - numOfFuncs);
|
||||
}
|
||||
if (!pTargetNode) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SExprInfo* pExp = &pExprs[i];
|
||||
code = createExprFromTargetNode(pExp, pTargetNode);
|
||||
|
@ -2040,10 +2098,19 @@ int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SAr
|
|||
int32_t i = 0, j = 0;
|
||||
while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) {
|
||||
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
|
||||
if (!p) {
|
||||
return terrno;
|
||||
}
|
||||
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j);
|
||||
if (!pmInfo) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (p->info.colId == pmInfo->colId) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId);
|
||||
if (!pDst) {
|
||||
return terrno;
|
||||
}
|
||||
code = colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
|
@ -2117,6 +2184,10 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
|||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
|
||||
if (!pNode) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||
if (pColNode->colType == COLUMN_TYPE_TAG) {
|
||||
continue;
|
||||
|
@ -2307,6 +2378,9 @@ int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t st
|
|||
|
||||
for (int32_t i = startIndex; i < numOfTables; ++i) {
|
||||
STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
|
||||
if (!p) {
|
||||
return -1;
|
||||
}
|
||||
if (p->uid == uid) {
|
||||
return i;
|
||||
}
|
||||
|
@ -2479,22 +2553,30 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
|||
}
|
||||
|
||||
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||
if (!pInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
uint64_t gid = pInfo->groupId;
|
||||
|
||||
int32_t start = 0;
|
||||
void* tmp = taosArrayPush(pList, &start);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 1; i < size; ++i) {
|
||||
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
if (!pInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
if (pInfo->groupId != gid) {
|
||||
tmp = taosArrayPush(pList, &i);
|
||||
if (!tmp) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
gid = pInfo->groupId;
|
||||
}
|
||||
|
@ -2532,6 +2614,10 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
|
||||
for (int i = 0; i < numOfTables; i++) {
|
||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
if (!info) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
info->groupId = groupByTbname ? info->uid : 0;
|
||||
|
||||
int32_t tempRes = taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId),
|
||||
|
@ -2544,6 +2630,10 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
} else {
|
||||
for (int32_t i = 0; i < numOfTables; i++) {
|
||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
if (!info) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
info->groupId = groupByTbname ? info->uid : 0;
|
||||
}
|
||||
}
|
||||
|
@ -2587,6 +2677,10 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
|||
size_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
if (!p) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
int32_t tempRes = taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
|
||||
if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(tempRes));
|
||||
|
@ -2812,13 +2906,22 @@ SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t extractKeysLen(const SArray* keys) {
|
||||
int32_t extractKeysLen(const SArray* keys, int32_t* pLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
int32_t len = 0;
|
||||
int32_t keyNum = taosArrayGetSize(keys);
|
||||
for (int32_t i = 0; i < keyNum; ++i) {
|
||||
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
|
||||
QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
|
||||
len += pCol->bytes;
|
||||
}
|
||||
len += sizeof(int8_t) * keyNum; // null flag
|
||||
return len;
|
||||
*pLen = len;
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -392,6 +392,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
||||
for (int32_t i = 0; i < numOfUids; ++i) {
|
||||
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
|
||||
QUERY_CHECK_NULL(id, code, lino, _end, terrno);
|
||||
|
||||
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -493,6 +494,12 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
|
|||
|
||||
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
|
||||
uint64_t* uid = taosArrayGet(qa, i);
|
||||
if (!uid) {
|
||||
taosMemoryFree(keyBuf);
|
||||
taosArrayDestroy(qa);
|
||||
taosWUnLockLatch(&pTaskInfo->lock);
|
||||
return terrno;
|
||||
}
|
||||
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
|
||||
|
||||
if (bufLen > 0) {
|
||||
|
@ -547,6 +554,9 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
|
|||
}
|
||||
|
||||
SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
|
||||
if (!pSchemaInfo) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
*sversion = pSchemaInfo->sw->version;
|
||||
*tversion = pSchemaInfo->tversion;
|
||||
|
@ -704,7 +714,9 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
|||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
||||
p = p1;
|
||||
} else {
|
||||
p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
|
||||
void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
p = *(SSDataBlock**)tmp;
|
||||
code = copyDataBlock(p, pRes);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
@ -752,7 +764,9 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
|
|||
size_t num = taosArrayGetSize(pList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
|
||||
blockDataDestroy(*p);
|
||||
if (p) {
|
||||
blockDataDestroy(*p);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayClear(pTaskInfo->pResultBlockList);
|
||||
|
@ -866,6 +880,9 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
|
|||
int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
|
||||
if (!pStop) {
|
||||
continue;
|
||||
}
|
||||
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
|
||||
if (pExchangeInfo) {
|
||||
(void)tsem_post(&pExchangeInfo->ready);
|
||||
|
|
|
@ -577,8 +577,10 @@ int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* p
|
|||
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
|
||||
QUERY_CHECK_NULL(pInfo, code, lino, _err, terrno);
|
||||
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
|
||||
QUERY_CHECK_NULL(pColData, code, lino, _err, terrno);
|
||||
if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
code = blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
@ -667,6 +669,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
|
|||
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
|
||||
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||
code = colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||
|
@ -1046,6 +1049,10 @@ bool groupbyTbname(SNodeList* pGroupList) {
|
|||
bool bytbname = false;
|
||||
if (LIST_LENGTH(pGroupList) == 1) {
|
||||
SNode* p = nodesListGetNode(pGroupList, 0);
|
||||
if (!p) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return false;
|
||||
}
|
||||
if (p->type == QUERY_NODE_FUNCTION) {
|
||||
// partition by tbname/group by tbname
|
||||
bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
|
||||
|
|
|
@ -58,7 +58,7 @@ typedef struct SFillOperatorInfo {
|
|||
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order);
|
||||
static void destroyFillOperatorInfo(void* param);
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo);
|
||||
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
|
||||
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
|
||||
|
||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||
|
@ -83,7 +83,11 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
|
|||
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||
fillResetPrevForNewGroup(pInfo->pFillInfo);
|
||||
int32_t code = fillResetPrevForNewGroup(pInfo->pFillInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
|
||||
|
@ -146,13 +150,22 @@ _end:
|
|||
}
|
||||
}
|
||||
|
||||
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
|
||||
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
|
||||
if (!pFillInfo->pFillCol[colIdx].notFillCol) {
|
||||
SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
|
||||
QUERY_CHECK_NULL(key, code, lino, _end, terrno);
|
||||
key->isNull = true;
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo refactor: decide the start key according to the query time range.
|
||||
|
|
|
@ -100,6 +100,10 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
|
|||
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
|
||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||
SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
|
||||
if (!pCol) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
(*keyLen) += pCol->bytes; // actual data + null_flag
|
||||
|
||||
SGroupKeys key = {0};
|
||||
|
@ -888,10 +892,10 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
|
|||
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
|
||||
if (pGp->blockForNotLoaded) {
|
||||
if (pGp && pGp->blockForNotLoaded) {
|
||||
for (int32_t i = 0; i < pGp->blockForNotLoaded->size; i++) {
|
||||
SSDataBlock** pBlock = taosArrayGet(pGp->blockForNotLoaded, i);
|
||||
blockDataDestroy(*pBlock);
|
||||
if (pBlock) blockDataDestroy(*pBlock);
|
||||
}
|
||||
taosArrayClear(pGp->blockForNotLoaded);
|
||||
pGp->offsetForNotLoaded = 0;
|
||||
|
@ -916,6 +920,9 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
|
|||
static SSDataBlock* buildPartitionResultForNotLoadBlock(SDataGroupInfo* pGroupInfo) {
|
||||
if (pGroupInfo->blockForNotLoaded && pGroupInfo->offsetForNotLoaded < pGroupInfo->blockForNotLoaded->size) {
|
||||
SSDataBlock** pBlock = taosArrayGet(pGroupInfo->blockForNotLoaded, pGroupInfo->offsetForNotLoaded);
|
||||
if (!pBlock) {
|
||||
return NULL;
|
||||
}
|
||||
pGroupInfo->offsetForNotLoaded++;
|
||||
return *pBlock;
|
||||
}
|
||||
|
@ -946,10 +953,18 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
|||
++pInfo->groupIndex;
|
||||
|
||||
pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
|
||||
if (pGroupInfo == NULL) {
|
||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
pInfo->pageIndex = 0;
|
||||
}
|
||||
|
||||
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
|
||||
if (pageId == NULL) {
|
||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
void* page = getBufPage(pInfo->pBuf, *pageId);
|
||||
if (page == NULL) {
|
||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||
|
@ -1104,7 +1119,9 @@ static void destroyPartitionOperatorInfo(void* param) {
|
|||
int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
|
||||
taosArrayDestroy(pGp->pPageList);
|
||||
if (pGp) {
|
||||
taosArrayDestroy(pGp->pPageList);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pInfo->sortedGroupArray);
|
||||
|
||||
|
@ -1275,7 +1292,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
|
|||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
|
||||
int32_t slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
|
||||
QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
|
||||
SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
|
||||
QUERY_CHECK_NULL(pDestCol, code, lino, _end, terrno);
|
||||
bool isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
|
||||
char* pSrcData = NULL;
|
||||
if (!isNull) pSrcData = colDataGetData(pSrcCol, rowIndex);
|
||||
|
@ -1341,6 +1360,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||
QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno);
|
||||
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
|
||||
int32_t len = 0;
|
||||
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
|
||||
|
@ -1357,6 +1377,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
|||
pDestBlock->info.rows--;
|
||||
} else {
|
||||
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||
QUERY_CHECK_NULL(pTbNameCol, code, lino, _end, terrno);
|
||||
colDataSetNULL(pTbNameCol, pDestBlock->info.rows);
|
||||
tbName[0] = 0;
|
||||
}
|
||||
|
@ -1370,6 +1391,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
|||
}
|
||||
|
||||
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
|
||||
QUERY_CHECK_NULL(pGpIdCol, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pDestBlock->info.rows++;
|
||||
|
@ -1556,7 +1578,11 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
|
|||
taosArrayDestroy(pInfo->partitionSup.pGroupCols);
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
|
||||
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
|
||||
void* tmp = taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
|
||||
if (!tmp) {
|
||||
continue;
|
||||
}
|
||||
SGroupKeys key = *(SGroupKeys*)tmp;
|
||||
taosMemoryFree(key.pData);
|
||||
}
|
||||
taosArrayDestroy(pInfo->partitionSup.pGroupColVals);
|
||||
|
@ -1776,6 +1802,7 @@ int32_t extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||
QUERY_CHECK_NULL(pNode, code, lino, _end, terrno);
|
||||
|
||||
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
|
||||
|
|
|
@ -1058,6 +1058,10 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
|
|||
int32_t tableIdx = 0;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
|
||||
if (!pUid) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t))) {
|
||||
if (TSDB_CODE_DUP_KEY == terrno) {
|
||||
|
@ -3153,6 +3157,8 @@ FETCH_NEXT_BLOCK:
|
|||
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
|
||||
|
||||
SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
|
||||
QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno);
|
||||
|
||||
SSDataBlock* pBlock = pPacked->pDataBlock;
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
code =
|
||||
|
@ -3361,6 +3367,7 @@ FETCH_NEXT_BLOCK:
|
|||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
|
||||
QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno);
|
||||
|
||||
qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, id);
|
||||
if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) <
|
||||
|
@ -3448,7 +3455,9 @@ FETCH_NEXT_BLOCK:
|
|||
qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
|
||||
|
||||
SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
|
||||
QUERY_CHECK_NULL(pData, code, lino, _end, terrno);
|
||||
SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(pBlock, code, lino, _end, terrno);
|
||||
|
||||
if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||
streamScanOperatorSaveCheckpoint(pInfo);
|
||||
|
@ -3876,6 +3885,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
QUERY_CHECK_NULL(id, code, lino, _error, terrno);
|
||||
|
||||
int16_t colId = id->colId;
|
||||
void* tmp = taosArrayPush(pColIds, &colId);
|
||||
|
@ -4333,8 +4343,10 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe
|
|||
for (int i = 0; i < szTables; ++i) {
|
||||
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
|
||||
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
|
||||
QUERY_CHECK_NULL(pUidTagInfo, code, lino, _end, terrno);
|
||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
|
||||
code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||
}
|
||||
}
|
||||
|
@ -4342,8 +4354,10 @@ static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRe
|
|||
size_t szTables = taosArrayGetSize(aUidTags);
|
||||
for (int i = 0; i < szTables; ++i) {
|
||||
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
|
||||
QUERY_CHECK_NULL(pUidTagInfo, code, lino, _end, terrno);
|
||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
|
||||
code = tagScanFillOneCellWithTag(pOperator, pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||
}
|
||||
}
|
||||
|
@ -4787,6 +4801,10 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
|||
|
||||
pInput->pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||
SColumnInfoData* col = taosArrayGet(pInput->pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId);
|
||||
if (!col) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
pInput->aTs = (int64_t*)col->pData;
|
||||
}
|
||||
|
||||
|
@ -4801,8 +4819,16 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSubTblsInfo->pTsOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
|
||||
if (!pSubTblsInfo->pTsOrderInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
if (taosArrayGetSize(pInfo->pSortInfo) == 2) {
|
||||
pSubTblsInfo->pPkOrderInfo = taosArrayGet(pInfo->pSortInfo, 1);
|
||||
if (!pSubTblsInfo->pPkOrderInfo) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
pSubTblsInfo->pPkOrderInfo = NULL;
|
||||
}
|
||||
|
@ -4924,6 +4950,10 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
|
|||
}
|
||||
if (pInput->rowIdx != -1) {
|
||||
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId);
|
||||
if (!col) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
pInput->pInputBlock = pInputBlock;
|
||||
pInput->aTs = (int64_t*)col->pData;
|
||||
}
|
||||
|
@ -4942,8 +4972,10 @@ static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo,
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
|
||||
|
||||
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
|
||||
QUERY_CHECK_NULL(pSrcColInfo, code, lino, _end, terrno);
|
||||
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
|
||||
|
||||
if (isNull) {
|
||||
|
@ -5310,6 +5342,7 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS
|
|||
int32_t pkTargetSlotId = -1;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
|
||||
SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
|
||||
QUERY_CHECK_NULL(colInfo, code, lino, _end, terrno);
|
||||
if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
tsTargetSlotId = colInfo->dstSlotId;
|
||||
biTs.order = order;
|
||||
|
@ -5982,6 +6015,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch
|
|||
if (pSupp->dbNameSlotId != -1) {
|
||||
ASSERT(strlen(dbName));
|
||||
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
|
||||
QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno);
|
||||
|
||||
char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);
|
||||
|
@ -5993,6 +6027,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch
|
|||
|
||||
if (pSupp->stbNameSlotId != -1) {
|
||||
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
|
||||
QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno);
|
||||
if (strlen(stbName) != 0) {
|
||||
char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
|
||||
|
@ -6006,6 +6041,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch
|
|||
|
||||
if (pSupp->tbCountSlotId != -1) {
|
||||
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
|
||||
QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(colInfoData, 0, (char*)&count, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
|
|
@ -120,7 +120,8 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// PK ts col should always at last, see partColOptCreateSort
|
||||
if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
|
||||
keyLen = extractKeysLen(pGroupIdCalc->pSortColsArr);
|
||||
code = extractKeysLen(pGroupIdCalc->pSortColsArr, &keyLen);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pGroupIdCalc->lastKeysLen = 0;
|
||||
|
|
|
@ -985,16 +985,19 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
|
|||
|
||||
// table name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, tableName, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// database name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// super table name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, stableName, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1002,12 +1005,14 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
|
|||
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(tagName, (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].name);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, tagName, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// tag type
|
||||
int8_t tagType = (*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].type;
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
char tagTypeStr[VARSTR_HEADER_SIZE + 32];
|
||||
int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name);
|
||||
if (tagType == TSDB_DATA_TYPE_NCHAR) {
|
||||
|
@ -1073,6 +1078,7 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
|
|||
}
|
||||
}
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, tagVarChar,
|
||||
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -1108,15 +1114,18 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
|
|||
|
||||
// table name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, tName, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// database name
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 1);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 2);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, tableType, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1124,12 +1133,14 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
|
|||
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(colName, schemaRow->pSchema[i].name);
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 3);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, colName, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// col type
|
||||
int8_t colType = schemaRow->pSchema[i].type;
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
char colTypeStr[VARSTR_HEADER_SIZE + 32];
|
||||
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
|
||||
if (colType == TSDB_DATA_TYPE_VARCHAR) {
|
||||
|
@ -1144,11 +1155,13 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, 5);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (const char*)&schemaRow->pSchema[i].bytes, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
for (int32_t j = 6; j <= 8; ++j) {
|
||||
pColInfoData = taosArrayGet(dataBlock->pDataBlock, j);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
}
|
||||
++numOfRows;
|
||||
|
@ -1217,6 +1230,7 @@ int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTabl
|
|||
}
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
|
||||
STR_TO_VARSTR(n, pm->name);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
|
@ -1225,26 +1239,31 @@ int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const SSysTabl
|
|||
// database name
|
||||
STR_TO_VARSTR(n, dbName);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// create time
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
|
||||
// number of columns
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pm->colNum, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
for (int32_t j = 4; j <= 8; ++j) {
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, j);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
}
|
||||
|
||||
STR_TO_VARSTR(n, "SYSTEM_TABLE");
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 9);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1327,6 +1346,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
int32_t i = pIdx->lastIdx;
|
||||
for (; i < taosArrayGetSize(pIdx->uids); i++) {
|
||||
tb_uid_t* uid = taosArrayGet(pIdx->uids, i);
|
||||
QUERY_CHECK_NULL(uid, code, lino, _end, terrno);
|
||||
|
||||
SMetaReader mr = {0};
|
||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
||||
|
@ -1339,16 +1359,20 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
|
||||
// table name
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// database name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// vgId
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 6);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1357,6 +1381,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
// create time
|
||||
int64_t ts = mr.me.ctbEntry.btime;
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1373,18 +1398,21 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr1.me.stbEntry.schemaRow.nCols, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// super table name
|
||||
STR_TO_VARSTR(n, mr1.me.name);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pAPI->metaReaderFn.clearReader(&mr1);
|
||||
|
||||
// table comment
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
if (mr.me.ctbEntry.commentLen > 0) {
|
||||
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, mr.me.ctbEntry.comment);
|
||||
|
@ -1401,11 +1429,13 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
|
||||
// uid
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.uid, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// ttl
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 7);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ctbEntry.ttlDays, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1414,20 +1444,24 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
} else if (tableType == TSDB_NORMAL_TABLE) {
|
||||
// create time
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.btime, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// number of columns
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// super table name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
|
||||
// table comment
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
if (mr.me.ntbEntry.commentLen > 0) {
|
||||
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, mr.me.ntbEntry.comment);
|
||||
|
@ -1444,11 +1478,13 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
|
||||
// uid
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.uid, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// ttl
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 7);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.ntbEntry.ttlDays, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1459,6 +1495,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 9);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1561,16 +1598,19 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
// table name
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// database name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 1);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, dbname, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// vgId
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 6);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&vgId, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1579,6 +1619,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
// create time
|
||||
int64_t ts = pInfo->pCur->mr.me.ctbEntry.btime;
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1603,18 +1644,21 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
// number of columns
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// super table name
|
||||
STR_TO_VARSTR(n, mr.me.name);
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
||||
// table comment
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
|
||||
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
|
||||
|
@ -1631,11 +1675,13 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
// uid
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// ttl
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 7);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1643,20 +1689,24 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
} else if (tableType == TSDB_NORMAL_TABLE) {
|
||||
// create time
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 2);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.btime, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// number of columns
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 3);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// super table name
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 4);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
|
||||
// table comment
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 8);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
|
||||
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
|
||||
|
@ -1673,11 +1723,13 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
|
||||
// uid
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 5);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
// ttl
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 7);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1685,6 +1737,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pColInfoData = taosArrayGet(p->pDataBlock, 9);
|
||||
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
|
||||
code = colDataSetVal(pColInfoData, numOfRows, n, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1961,6 +2014,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
|
|||
|
||||
if (pInfo->tbnameSlotId != -1) {
|
||||
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
|
||||
QUERY_CHECK_NULL(pColumnInfoData, code, lino, _end, terrno);
|
||||
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(varTbName, name);
|
||||
|
||||
|
@ -2612,6 +2666,8 @@ static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
|||
|
||||
int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId;
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
|
||||
|
||||
|
||||
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
||||
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||
|
@ -2632,6 +2688,7 @@ static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
|||
// make the valgrind happy that all memory buffer has been initialized already.
|
||||
if (slotId != 0) {
|
||||
SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
QUERY_CHECK_NULL(p1, code, lino, _end, terrno);
|
||||
int64_t v = 0;
|
||||
colDataSetInt64(p1, 0, &v);
|
||||
}
|
||||
|
|
|
@ -47,6 +47,10 @@ static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo,
|
|||
}
|
||||
|
||||
SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx);
|
||||
if (!pKey) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||
T_LONG_JMP(pFillInfo->pTaskInfo->env, terrno);
|
||||
}
|
||||
int32_t code = doSetVal(pDstColInfo, rowIndex, pKey);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
|
@ -426,6 +430,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
|
||||
code = doSetVal(pDst, index, pKey);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||
|
@ -440,6 +445,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
|
|||
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
|
||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||
QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
|
||||
code = doSetVal(pDst, index, pKey);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
} else {
|
||||
|
@ -582,12 +588,12 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
|
|||
}
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev.pRowVal); ++i) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
|
||||
taosMemoryFree(pKey->pData);
|
||||
if (pKey) taosMemoryFree(pKey->pData);
|
||||
}
|
||||
taosArrayDestroy(pFillInfo->prev.pRowVal);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next.pRowVal); ++i) {
|
||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->next.pRowVal, i);
|
||||
taosMemoryFree(pKey->pData);
|
||||
if (pKey) taosMemoryFree(pKey->pData);
|
||||
}
|
||||
taosArrayDestroy(pFillInfo->next.pRowVal);
|
||||
|
||||
|
@ -732,6 +738,8 @@ int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start;
|
|||
|
||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNoFillExpr, const struct SNodeListNode* pValNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr, sizeof(SFillColInfo));
|
||||
if (pFillCol == NULL) {
|
||||
return NULL;
|
||||
|
@ -749,6 +757,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
int32_t index = (i >= len) ? (len - 1) : i;
|
||||
|
||||
SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
|
||||
QUERY_CHECK_NULL(pv, code, lino, _end, terrno);
|
||||
nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
|
||||
}
|
||||
}
|
||||
|
@ -761,4 +770,14 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn
|
|||
}
|
||||
|
||||
return pFillCol;
|
||||
|
||||
_end:
|
||||
for (int32_t i = 0; i < numOfFillExpr; ++i) {
|
||||
taosVariantDestroy(&pFillCol[i].fillVal);
|
||||
}
|
||||
taosMemoryFree(pFillCol);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -614,6 +614,11 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
||||
|
||||
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
|
||||
if (!pTsKey) {
|
||||
pTaskInfo->code = terrno;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
int64_t prevTs = *(int64_t*)pTsKey->pData;
|
||||
if (groupId == pBlock->info.id.groupId) {
|
||||
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
|
||||
|
@ -846,6 +851,11 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo
|
|||
|
||||
if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
if (!pColDataInfo) {
|
||||
pTaskInfo->code = terrno;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
ASSERT(tsCols[0] != 0);
|
||||
|
||||
|
@ -924,6 +934,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
|
||||
if (!pStateColInfoData) {
|
||||
pTaskInfo->code = terrno;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
int64_t gid = pBlock->info.id.groupId;
|
||||
|
||||
bool masterScan = true;
|
||||
|
@ -931,6 +945,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
int32_t bytes = pStateColInfoData->info.bytes;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
|
||||
if (!pColInfoData) {
|
||||
pTaskInfo->code = terrno;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
|
||||
|
||||
SWindowRowsSup* pRowSup = &pInfo->winSup;
|
||||
|
@ -1394,6 +1412,10 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
|
||||
if (!pColInfoData) {
|
||||
pTaskInfo->code = terrno;
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
bool masterScan = true;
|
||||
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||
|
|
Loading…
Reference in New Issue