Merge pull request #26968 from taosdata/fix/syntax
refactor: do some internal refactor.
This commit is contained in:
commit
9f161c4326
|
@ -534,7 +534,7 @@ void tFreeStreamTask(SStreamTask* pTask);
|
|||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
|
||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
|
||||
void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status);
|
||||
void streamFreeTaskState(SStreamTask* pTask, int8_t remove);
|
||||
|
||||
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
||||
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId);
|
||||
|
|
|
@ -644,6 +644,10 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
|
|||
|
||||
SDataBlockInfo* pInfo = &pDataBlock->info;
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
|
||||
if (pColInfoData == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -685,6 +689,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
||||
if (pCol1 == NULL || pCol2 == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
capacity = pDest->info.capacity;
|
||||
int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
|
||||
|
@ -709,6 +716,9 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
||||
if (pCol2 == NULL || pCol1 == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
|
||||
if (code) {
|
||||
|
@ -729,6 +739,10 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
|
|||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pCol == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
|
||||
memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
|
||||
|
@ -760,6 +774,10 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
|
|||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pColInfoData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
|
||||
}
|
||||
|
||||
|
@ -861,6 +879,10 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||
if (pColData == NULL || pDstCol == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
||||
bool isNull = false;
|
||||
if (pBlock->pBlockAgg == NULL) {
|
||||
|
@ -908,6 +930,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pCol == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
|
||||
pStart += numOfRows * sizeof(int32_t);
|
||||
|
@ -958,6 +984,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pCol == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
|
||||
|
@ -965,6 +994,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
|||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pCol->varmeta.offset = (int32_t*)tmp;
|
||||
memcpy(pCol->varmeta.offset, pStart, metaSize);
|
||||
pStart += metaSize;
|
||||
|
@ -1039,6 +1069,10 @@ int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity)
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pCol == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCol->hasNull = true;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
|
@ -1087,6 +1121,10 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
|
|||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pColInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
rowSize += pColInfo->info.bytes;
|
||||
}
|
||||
|
||||
|
@ -1114,8 +1152,11 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
|||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
rowSize += pColInfo->info.bytes;
|
||||
if (pColInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
rowSize += pColInfo->info.bytes;
|
||||
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||
rowSize += sizeof(int32_t);
|
||||
} else {
|
||||
|
@ -1193,6 +1234,9 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = &pCols[i];
|
||||
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
if (pSrc == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
|
||||
if (pSrc->varmeta.length != 0) {
|
||||
|
@ -1228,8 +1272,11 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
pCols[i].info = pColInfoData->info;
|
||||
if (pColInfoData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCols[i].info = pColInfoData->info;
|
||||
if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
|
||||
pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
|
||||
pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
|
||||
|
@ -1256,8 +1303,11 @@ static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
|
|||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
pColInfoData->info = pCols[i].info;
|
||||
if (pColInfoData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pColInfoData->info = pCols[i].info;
|
||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
taosMemoryFreeClear(pColInfoData->varmeta.offset);
|
||||
pColInfoData->varmeta = pCols[i].varmeta;
|
||||
|
@ -1301,8 +1351,15 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
|
||||
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
|
||||
if (pInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
|
||||
if (pColInfoData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pColInfoData->hasNull) {
|
||||
sortColumnHasNull = true;
|
||||
}
|
||||
|
@ -1319,6 +1376,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
if (!varTypeSort) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
|
||||
SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);
|
||||
if (pColInfoData == NULL || pOrder == NULL) {
|
||||
return errno;
|
||||
}
|
||||
|
||||
int64_t p0 = taosGetTimestampUs();
|
||||
|
||||
|
@ -1346,7 +1406,14 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
|
||||
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
|
||||
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
|
||||
if (pInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
|
||||
if (pInfo->pColData == NULL) {
|
||||
continue;
|
||||
}
|
||||
pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
|
||||
}
|
||||
|
||||
|
@ -1399,6 +1466,10 @@ void blockDataEmpty(SSDataBlock* pDataBlock) {
|
|||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
colInfoDataCleanup(p, pInfo->capacity);
|
||||
}
|
||||
|
||||
|
@ -1417,6 +1488,10 @@ void blockDataReset(SSDataBlock* pDataBlock) {
|
|||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
p->hasNull = false;
|
||||
p->reassigned = false;
|
||||
if (IS_VAR_DATA_TYPE(p->info.type)) {
|
||||
|
@ -1527,6 +1602,10 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
|||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -1544,6 +1623,10 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
|
|||
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pColInfoData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
colDataDestroy(pColInfoData);
|
||||
}
|
||||
|
||||
|
@ -1579,6 +1662,10 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
|||
size_t numOfCols = taosArrayGetSize(src->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
|
||||
code = blockDataAppendColInfo(dst, &colInfo);
|
||||
if (code) {
|
||||
|
@ -1594,7 +1681,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
|
||||
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
|
||||
if (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type))) {
|
||||
if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1622,6 +1709,10 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
|
||||
if (pDstCol == NULL || pSrcCol == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
|
||||
if (ret < 0) {
|
||||
code = ret;
|
||||
|
@ -3149,15 +3240,26 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
if (!pDataBlock || !pOrderInfo) return 0;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
|
||||
SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
|
||||
if (pOrder == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
|
||||
if (pOrder->pColData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
|
||||
}
|
||||
|
||||
SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
|
||||
|
||||
int32_t rowIdx = 0, nextRowIdx = 1;
|
||||
for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
|
||||
if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return nextRowIdx;
|
||||
}
|
||||
|
|
|
@ -553,8 +553,15 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
return code;
|
||||
}
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
tqDebug("vgId:%d s-task:%s recv invalid the checkpoint-ready msg from task:0x%x (vgId:%d), discard", vgId,
|
||||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
} else {
|
||||
tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
|
||||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
|
||||
}
|
||||
|
||||
code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
|
|
@ -176,7 +176,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
|
||||
int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId);
|
||||
int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
|
||||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
|
||||
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs);
|
||||
|
||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset,
|
||||
SFunctionStateStore* pStore);
|
||||
|
@ -197,9 +197,6 @@ char* getStreamOpName(uint16_t opType);
|
|||
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
|
||||
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);
|
||||
|
||||
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
||||
|
||||
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
|
||||
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);
|
||||
|
||||
|
|
|
@ -73,7 +73,13 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
|
||||
int32_t lino = 0;
|
||||
int32_t code = 0;
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
|
||||
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -89,29 +95,23 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
if (pAggNode->pExprs != NULL) {
|
||||
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
|
||||
code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
|
||||
pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
|
||||
|
|
|
@ -200,7 +200,9 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
|
||||
if (pScanNode->scan.pScanPseudoCols != NULL) {
|
||||
SExprSupp* p = &pInfo->pseudoExprSup;
|
||||
p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
|
||||
code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
p->pCtx =
|
||||
createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
|
||||
}
|
||||
|
|
|
@ -256,14 +256,19 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
|||
|
||||
if (pCountWindowNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pCountWindowNode->window.pExprs, NULL, &numOfScalarExpr);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pCountWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
size_t keyBufSize = 0;
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
|
@ -286,6 +291,7 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
|||
if (pInfo->windowCount != pInfo->windowSliding) {
|
||||
numOfItem = pInfo->windowCount / pInfo->windowSliding + 1;
|
||||
}
|
||||
|
||||
pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem);
|
||||
if (!pInfo->countSup.pWinStates) {
|
||||
goto _error;
|
||||
|
|
|
@ -84,7 +84,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
|||
|
||||
if (pEventWindowNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pEventWindowNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
@ -95,7 +98,10 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
|
|
|
@ -1822,7 +1822,10 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
|
|||
return pExprs;
|
||||
}
|
||||
|
||||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
|
||||
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
|
||||
QRY_OPTR_CHECK(pExprInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
|
||||
int32_t numOfGroupKeys = 0;
|
||||
if (pGroupKeys != NULL) {
|
||||
|
@ -1831,10 +1834,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
|
||||
*numOfExprs = numOfFuncs + numOfGroupKeys;
|
||||
if (*numOfExprs == 0) {
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
|
||||
if (pExprs == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < (*numOfExprs); ++i) {
|
||||
STargetNode* pTargetNode = NULL;
|
||||
|
@ -1845,15 +1851,16 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
}
|
||||
|
||||
SExprInfo* pExp = &pExprs[i];
|
||||
int32_t code = createExprFromTargetNode(pExp, pTargetNode);
|
||||
code = createExprFromTargetNode(pExp, pTargetNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pExprs);
|
||||
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
return NULL;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return pExprs;
|
||||
*pExprInfo = pExprs;
|
||||
return code;
|
||||
}
|
||||
|
||||
// set the output buffer for the selectivity + tag query
|
||||
|
|
|
@ -455,6 +455,7 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -464,21 +465,23 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
|
|||
}
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
|
||||
pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
|
||||
code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code =
|
||||
initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SInterval* pInterval =
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||
|
|
|
@ -560,7 +560,8 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
|||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
if (pAggNode->pExprs != NULL) {
|
||||
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
|
||||
code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
pInfo->pGroupCols = NULL;
|
||||
|
@ -578,7 +579,11 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -1125,42 +1130,42 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
pTaskInfo->code = code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pTaskInfo->code = code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
|
||||
|
||||
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
|
||||
|
||||
if (pPartNode->needBlockOutputTsOrder) {
|
||||
SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId};
|
||||
pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
if (!pInfo->pOrderInfoArr) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pTaskInfo->code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
|
||||
}
|
||||
|
||||
if (pPartNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
|
||||
SExprInfo* pExprInfo1 = NULL;
|
||||
code = createExprInfo(pPartNode->pExprs, NULL, &pExprInfo1, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
pTaskInfo->code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||
if (pInfo->pGroupSet == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pTaskInfo->code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -1170,22 +1175,17 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc);
|
||||
code = getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
pTaskInfo->code = code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (!osTempSpaceAvailable()) {
|
||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
||||
pTaskInfo->code = terrno;
|
||||
qError("Create partition operator info failed since %s, tempDir:%s", terrstr(), tsTempDir);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
pTaskInfo->code = code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -1195,8 +1195,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
pTaskInfo->code = code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -1210,8 +1208,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
pTaskInfo->code = code;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
@ -1224,7 +1220,7 @@ _error:
|
|||
}
|
||||
pTaskInfo->code = code;
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
|
||||
|
@ -1663,7 +1659,10 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
|
|||
|
||||
if (pPartNode->part.pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num);
|
||||
SExprInfo* pCalExprInfo = NULL;
|
||||
code = createExprInfo(pPartNode->part.pExprs, NULL, &pCalExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
@ -1724,7 +1723,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPartNode->part.pTargets, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
|
|
@ -108,9 +108,13 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
|
|||
|
||||
int32_t lino = 0;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc);
|
||||
TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||
|
||||
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
@ -258,7 +262,6 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
|
|||
|
||||
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
||||
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
||||
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
SSDataBlock* pFinalRes = pProjectInfo->pFinalRes;
|
||||
|
@ -465,11 +468,16 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
||||
|
||||
int32_t numOfExpr = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr);
|
||||
TSDB_CHECK_CODE(code, lino, _error);
|
||||
|
||||
if (pPhyNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
|
||||
SExprInfo* pSExpr = NULL;
|
||||
code = createExprInfo(pPhyNode->pExprs, NULL, &pSExpr, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
|
|
@ -1338,7 +1338,10 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
|||
|
||||
if (pScanNode->pScanPseudoCols != NULL) {
|
||||
SExprSupp* pSup = &pInfo->base.pseudoSup;
|
||||
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
pSup->pExprInfo = NULL;
|
||||
code = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
}
|
||||
|
@ -3981,13 +3984,12 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
|
||||
// create the pseduo columns info
|
||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
|
||||
code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
|
||||
|
@ -4539,7 +4541,11 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
|
|||
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfExprs = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &pExprInfo, &numOfExprs);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
@ -5694,7 +5700,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
|
|||
|
||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||
SExprSupp* pSup = &pInfo->base.pseudoSup;
|
||||
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->pExprInfo, &pSup->numOfExprs);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
}
|
||||
|
|
|
@ -60,6 +60,8 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
|||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -71,7 +73,9 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
|||
SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
|
||||
code = createExprInfo(pSortNode->pExprs, NULL, &pOperator->exprSupp.pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
int32_t numOfOutputCols = 0;
|
||||
code =
|
||||
|
@ -770,7 +774,9 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
|
|||
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pSortPhyNode->pExprs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pSup->pExprInfo = pExprInfo;
|
||||
pSup->numOfExprs = numOfCols;
|
||||
|
|
|
@ -823,13 +823,19 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pCountNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pCountNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -864,7 +864,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pEventNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pEventNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pEventNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -884,7 +887,10 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pEventNode->window.pFuncs, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -1190,7 +1190,11 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
|
|||
}
|
||||
pFillSup->numOfFillCols = numOfFillCols;
|
||||
int32_t numOfNotFillCols = 0;
|
||||
SExprInfo* noFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
|
||||
SExprInfo* noFillExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
|
||||
(const SNodeListNode*)(pPhyFillNode->pValues));
|
||||
pFillSup->type = convertFillType(pPhyFillNode->mode);
|
||||
|
@ -1201,7 +1205,10 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
|
|||
code = initResultBuf(pFillSup);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
|
||||
SExprInfo* noFillExpr = NULL;
|
||||
code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -1343,7 +1350,11 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
|
||||
SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
|
||||
int32_t numOfFillCols = 0;
|
||||
SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols);
|
||||
SExprInfo* pFillExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI);
|
||||
if (!pInfo->pFillSup) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
|
|
|
@ -1880,13 +1880,20 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pIntervalPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
|
@ -3690,7 +3697,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pSessionNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -3698,7 +3708,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
}
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4831,7 +4844,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
if (pStateNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
@ -4849,7 +4865,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
SExprSupp* pExpSup = &pOperator->exprSupp;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -5126,7 +5145,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
}
|
||||
|
||||
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
pInfo->interval = (SInterval){
|
||||
|
@ -5174,7 +5196,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
if (pIntervalPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
|
||||
code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
|
|
@ -2716,7 +2716,10 @@ int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanP
|
|||
pInfo->uid = (pBlockScanNode->suid != 0) ? pBlockScanNode->suid : pBlockScanNode->uid;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
|
|
|
@ -1126,13 +1126,19 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
|||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
||||
int32_t numOfExprs = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
if (pInterpPhyNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
}
|
||||
|
|
|
@ -1298,7 +1298,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
|||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -1336,7 +1339,10 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
|
|||
|
||||
if (pPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -1578,7 +1584,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
|
|||
|
||||
if (pStateNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalarExpr = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -1603,7 +1612,10 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
|
|||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
|
@ -1682,7 +1694,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
|
@ -1709,7 +1724,10 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
|
|||
|
||||
if (pSessionNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -2012,7 +2030,9 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
|||
initResultSizeInfo(&pOperator->resultInfo, 512);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
|
||||
pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
|
||||
|
@ -2312,7 +2332,9 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
|
|||
}
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SExprInfo* pExprInfo = NULL;
|
||||
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval,
|
||||
.sliding = pIntervalPhyNode->sliding,
|
||||
|
|
|
@ -81,6 +81,7 @@ typedef struct {
|
|||
int64_t dataWritten;
|
||||
|
||||
void* pMeta;
|
||||
int8_t removeAllFiles;
|
||||
|
||||
} STaskDbWrapper;
|
||||
|
||||
|
@ -152,6 +153,8 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
|
|||
void* taskDbAddRef(void* pTaskDb);
|
||||
void taskDbRemoveRef(void* pTaskDb);
|
||||
|
||||
void taskDbSetClearFileFlag(void* pTaskDb);
|
||||
|
||||
int streamStateOpenBackend(void* backend, SStreamState* pState);
|
||||
void streamStateCloseBackend(SStreamState* pState, bool remove);
|
||||
void streamStateDestroyCompar(void* arg);
|
||||
|
|
|
@ -2331,6 +2331,15 @@ void taskDbRemoveRef(void* pTaskDb) {
|
|||
(void)taosReleaseRef(taskDbWrapperId, pBackend->refId);
|
||||
}
|
||||
|
||||
void taskDbSetClearFileFlag(void* pTaskDb) {
|
||||
if (pTaskDb == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STaskDbWrapper* pBackend = pTaskDb;
|
||||
atomic_store_8(&pBackend->removeAllFiles, 1);
|
||||
}
|
||||
|
||||
void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
|
||||
rocksdb_env_t* env = rocksdb_create_default_env();
|
||||
|
||||
|
@ -2573,8 +2582,7 @@ void taskDbDestroy(void* pDb, bool flush) {
|
|||
stDebug("succ to destroy stream backend:%p", wrapper);
|
||||
|
||||
int8_t nCf = tListLen(ginitDict);
|
||||
|
||||
if (flush) {
|
||||
if (flush && wrapper->removeAllFiles == 0) {
|
||||
if (wrapper->db && wrapper->pCf) {
|
||||
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
||||
rocksdb_flushoptions_set_wait(flushOpt, 1);
|
||||
|
@ -2636,6 +2644,11 @@ void taskDbDestroy(void* pDb, bool flush) {
|
|||
taskDbDestroyChkpOpt(wrapper);
|
||||
|
||||
taosMemoryFree(wrapper->idstr);
|
||||
|
||||
if (wrapper->removeAllFiles) {
|
||||
char* err = NULL;
|
||||
taosRemoveDir(wrapper->path);
|
||||
}
|
||||
taosMemoryFree(wrapper->path);
|
||||
taosMemoryFree(wrapper);
|
||||
|
||||
|
|
|
@ -94,12 +94,17 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i
|
|||
}
|
||||
|
||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
// todo this status may not be set here.
|
||||
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
||||
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
pTask->chkInfo.pActiveInfo->transId = pReq->transId;
|
||||
pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
|
||||
|
@ -112,7 +117,10 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
|||
}
|
||||
|
||||
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
|
||||
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE);
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", pTask->id.idStr);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
|
||||
stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", pTask->id.idStr,
|
||||
|
@ -258,7 +266,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
}
|
||||
|
||||
if (p->upstreamTaskId == pBlock->srcTaskId) {
|
||||
ASSERT(p->checkpointId == checkpointId);
|
||||
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
||||
", prev recvTs:%" PRId64 " discard",
|
||||
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
||||
|
@ -320,7 +327,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
}
|
||||
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0);
|
||||
if (pTask->chkInfo.startTs == 0) {
|
||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||
pTask->execInfo.checkpoint += 1;
|
||||
|
@ -410,8 +416,6 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
|
|||
int32_t notReady = 0;
|
||||
int32_t transId = 0;
|
||||
|
||||
ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG));
|
||||
|
||||
// 1. not in checkpoint status now
|
||||
SStreamTaskState pStat = streamTaskGetStatus(pTask);
|
||||
if (pStat.state != TASK_STATUS__CK) {
|
||||
|
@ -799,6 +803,13 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
// check the status every 100ms
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
|
@ -843,7 +854,6 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
|
||||
// send msg to retrieve checkpoint trigger msg
|
||||
SArray* pList = pTask->upstreamInfo.pList;
|
||||
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
|
||||
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
||||
if (pNotSendList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1085,10 +1095,12 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
|
|||
|
||||
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask);
|
||||
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
||||
if (taskId == 0) {
|
||||
stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
|
||||
} else {
|
||||
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
|
||||
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
|
||||
|
||||
ASSERT(taskId != 0);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
|
||||
|
|
|
@ -175,10 +175,11 @@ int32_t streamDataSubmitNew(SPackedData* pData, int32_t type, SStreamDataSubmit*
|
|||
}
|
||||
|
||||
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
|
||||
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
||||
if (pDataSubmit != NULL && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
taosMemoryFree(pDataSubmit->submit.msgStr);
|
||||
taosFreeQitem(pDataSubmit);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
|
||||
*pSubmit = NULL;
|
||||
|
|
|
@ -96,8 +96,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
|||
int32_t code = 0;
|
||||
void* buf = NULL;
|
||||
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
ASSERT(sz > 0);
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
req->reqId = tGenIdPI64();
|
||||
SStreamUpstreamEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
|
@ -107,7 +105,6 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
|||
|
||||
tEncodeSize(tEncodeStreamRetrieveReq, req, len, code);
|
||||
if (code != 0) {
|
||||
ASSERT(0);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -946,8 +943,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
|||
SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList;
|
||||
|
||||
streamMutexLock(&pTask->chkInfo.pActiveInfo->lock);
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
if (taosArrayGetSize(pList) == 1) {
|
||||
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0);
|
||||
tmsgSendRsp(&pInfo->msg);
|
||||
|
@ -1122,8 +1117,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
|
|||
|
||||
void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId,
|
||||
int32_t childId, SEpSet* pEpset, int64_t checkpointId) {
|
||||
ASSERT(upstreamTaskId != 0);
|
||||
|
||||
pReadyInfo->upstreamTaskId = upstreamTaskId;
|
||||
pReadyInfo->upstreamNodeEpset = *pEpset;
|
||||
pReadyInfo->upstreamNodeId = upstreamNodeId;
|
||||
|
|
|
@ -275,7 +275,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
|
||||
streamFreeTaskState(pTask, status1);
|
||||
streamFreeTaskState(pTask, pTask->status.removeBackendFiles ? 1 : 0);
|
||||
|
||||
if (pTask->pNameMap) {
|
||||
tSimpleHashCleanup(pTask->pNameMap);
|
||||
|
@ -296,14 +296,14 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
taosArrayDestroy(pTask->outputInfo.pNodeEpsetUpdateList);
|
||||
pTask->outputInfo.pNodeEpsetUpdateList = NULL;
|
||||
|
||||
if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
|
||||
char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128);
|
||||
sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr);
|
||||
taosRemoveDir(path);
|
||||
// if ((pTask->status.removeBackendFiles) && (pTask->pMeta != NULL)) {
|
||||
// char* path = taosMemoryCalloc(1, strlen(pTask->pMeta->path) + 128);
|
||||
// sprintf(path, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, pTask->id.idStr);
|
||||
// taosRemoveDir(path);
|
||||
|
||||
stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path);
|
||||
taosMemoryFree(path);
|
||||
}
|
||||
// stInfo("s-task:0x%x vgId:%d remove all backend files:%s", taskId, pTask->pMeta->vgId, path);
|
||||
// taosMemoryFree(path);
|
||||
// }
|
||||
|
||||
if (pTask->id.idStr != NULL) {
|
||||
taosMemoryFree((void*)pTask->id.idStr);
|
||||
|
@ -316,10 +316,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
stDebug("s-task:0x%x free task completed", taskId);
|
||||
}
|
||||
|
||||
void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) {
|
||||
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
|
||||
if (pTask->pState != NULL) {
|
||||
stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
|
||||
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
|
||||
streamStateClose(pTask->pState, remove);
|
||||
|
||||
taskDbSetClearFileFlag(pTask->pBackend);
|
||||
taskDbRemoveRef(pTask->pBackend);
|
||||
pTask->pBackend = NULL;
|
||||
pTask->pState = NULL;
|
||||
|
|
|
@ -98,7 +98,7 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
|
|||
|
||||
static int32_t stopTaskSuccFn(SStreamTask* pTask) {
|
||||
SStreamTaskSM* pSM = pTask->status.pSM;
|
||||
streamFreeTaskState(pTask, pSM->current.state);
|
||||
streamFreeTaskState(pTask,pSM->current.state == TASK_STATUS__DROPPING ? 1 : 0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue