refactor: remove assert.
This commit is contained in:
parent
82067f3bd2
commit
8532de31cf
|
@ -422,7 +422,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
||||||
|
|
||||||
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
|
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
|
||||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||||
ASSERT(waitingRspNum >= 0);
|
|
||||||
if (waitingRspNum == 0) {
|
if (waitingRspNum == 0) {
|
||||||
tmqCommitDone(pParamSet);
|
tmqCommitDone(pParamSet);
|
||||||
}
|
}
|
||||||
|
@ -551,7 +550,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
||||||
char* topic;
|
char* topic;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
ASSERT(msg != NULL);
|
|
||||||
if (TD_RES_TMQ(msg)) {
|
if (TD_RES_TMQ(msg)) {
|
||||||
SMqRspObj* pRspObj = (SMqRspObj*)msg;
|
SMqRspObj* pRspObj = (SMqRspObj*)msg;
|
||||||
topic = pRspObj->topic;
|
topic = pRspObj->topic;
|
||||||
|
@ -994,14 +993,12 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
|
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
|
||||||
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
|
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
|
||||||
|
|
||||||
ASSERT(conf->groupId[0]);
|
|
||||||
|
|
||||||
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
||||||
pTmq->mqueue = taosOpenQueue();
|
pTmq->mqueue = taosOpenQueue();
|
||||||
pTmq->qall = taosAllocateQall();
|
pTmq->qall = taosAllocateQall();
|
||||||
pTmq->delayedTask = taosOpenQueue();
|
pTmq->delayedTask = taosOpenQueue();
|
||||||
|
|
||||||
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
|
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || conf->groupId[0] == 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
|
||||||
pTmq->groupId);
|
pTmq->groupId);
|
||||||
|
|
|
@ -855,9 +855,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
|
||||||
s = pos;
|
s = pos;
|
||||||
|
|
||||||
// check
|
// check
|
||||||
assert(pos >= 0 && pos < num);
|
ASSERT(pos >= 0 && pos < num && num > 0);
|
||||||
assert(num > 0);
|
|
||||||
|
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
// find the first position which is smaller than the key
|
// find the first position which is smaller than the key
|
||||||
e = num - 1;
|
e = num - 1;
|
||||||
|
@ -2850,7 +2848,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||||
|
|
||||||
ASSERT(pBlockInfo != NULL);
|
|
||||||
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||||
if (pScanInfo == NULL) {
|
if (pScanInfo == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
|
|
|
@ -62,9 +62,6 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
|
||||||
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
|
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
|
||||||
pEntry->dataLen = sizeof(SDeleterRes);
|
pEntry->dataLen = sizeof(SDeleterRes);
|
||||||
|
|
||||||
// ASSERT(1 == pEntry->numOfRows);
|
|
||||||
// ASSERT(3 == pEntry->numOfCols);
|
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||||
|
|
||||||
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
|
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
|
||||||
|
|
|
@ -170,7 +170,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
|
||||||
}
|
}
|
||||||
|
|
||||||
pGroupResInfo->index = 0;
|
pGroupResInfo->index = 0;
|
||||||
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
|
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
|
||||||
|
@ -334,10 +333,7 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
|
||||||
SValueNode* pValue = (SValueNode*)pNew;
|
SValueNode* pValue = (SValueNode*)pNew;
|
||||||
|
|
||||||
ASSERT(pValue->node.resType.type == TSDB_DATA_TYPE_BOOL);
|
|
||||||
*pQualified = pValue->datum.b;
|
*pQualified = pValue->datum.b;
|
||||||
|
|
||||||
nodesDestroyNode(pNew);
|
nodesDestroyNode(pNew);
|
||||||
|
@ -1056,7 +1052,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table
|
if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table
|
||||||
ASSERT(pTagIndexCond == NULL);
|
|
||||||
vnodeGetCtbIdList(pVnode, pScanNode->suid, pUidList);
|
vnodeGetCtbIdList(pVnode, pScanNode->suid, pUidList);
|
||||||
} else {
|
} else {
|
||||||
// failed to find the result in the cache, let try to calculate the results
|
// failed to find the result in the cache, let try to calculate the results
|
||||||
|
@ -1366,7 +1361,6 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
||||||
if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
|
if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
|
||||||
pExprNode->_function.functionName[len] == 0) {
|
pExprNode->_function.functionName[len] == 0) {
|
||||||
pFuncNode->pParameterList = nodesMakeList();
|
pFuncNode->pParameterList = nodesMakeList();
|
||||||
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
|
||||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
if (NULL == res) { // todo handle error
|
if (NULL == res) { // todo handle error
|
||||||
} else {
|
} else {
|
||||||
|
@ -1806,7 +1800,6 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||||
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
|
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
|
||||||
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
|
||||||
if (pTableList->map == NULL) {
|
if (pTableList->map == NULL) {
|
||||||
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
|
|
||||||
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1956,7 +1949,6 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
||||||
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group,
|
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group,
|
||||||
bool groupSort) {
|
bool groupSort) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
ASSERT(pTableListInfo->map != NULL);
|
|
||||||
|
|
||||||
bool groupByTbname = groupbyTbname(group);
|
bool groupByTbname = groupbyTbname(group);
|
||||||
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
|
@ -2013,7 +2005,6 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
int32_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
|
||||||
ASSERT(pTableListInfo->numOfOuputGroups == 1);
|
|
||||||
|
|
||||||
int64_t st1 = taosGetTimestampUs();
|
int64_t st1 = taosGetTimestampUs();
|
||||||
pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
|
pTaskInfo->cost.extractListTime = (st1 - st) / 1000.0;
|
||||||
|
|
|
@ -35,7 +35,6 @@ static void initRefPool() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||||
ASSERT(pOperator != NULL);
|
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
if (pOperator->numOfDownstream == 0) {
|
if (pOperator->numOfDownstream == 0) {
|
||||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||||
|
@ -75,27 +74,23 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
|
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
|
||||||
{
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
ASSERT(pOperator != NULL);
|
if (pOperator->numOfDownstream == 0) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||||
if (pOperator->numOfDownstream == 0) {
|
return TSDB_CODE_APP_ERROR;
|
||||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
|
||||||
return TSDB_CODE_APP_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
|
||||||
qError("join not supported for stream block scan, %s" PRIx64, id);
|
|
||||||
return TSDB_CODE_APP_ERROR;
|
|
||||||
}
|
|
||||||
pOperator->status = OP_NOT_OPENED;
|
|
||||||
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pOperator->numOfDownstream > 1) { // not handle this in join query
|
||||||
|
qError("join not supported for stream block scan, %s" PRIx64, id);
|
||||||
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
}
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||||
ASSERT(pOperator != NULL);
|
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
if (pOperator->numOfDownstream == 0) {
|
if (pOperator->numOfDownstream == 0) {
|
||||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||||
|
|
|
@ -240,7 +240,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
|
|
||||||
// allocate a new buffer page
|
// allocate a new buffer page
|
||||||
if (pResult == NULL) {
|
if (pResult == NULL) {
|
||||||
ASSERT(pSup->resultRowSize > 0);
|
|
||||||
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
|
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
|
||||||
if (pResult == NULL) {
|
if (pResult == NULL) {
|
||||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
|
@ -310,7 +309,6 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
|
||||||
pWindowRes->offset = (int32_t)pData->num;
|
pWindowRes->offset = (int32_t)pData->num;
|
||||||
|
|
||||||
pData->num += size;
|
pData->num += size;
|
||||||
assert(pWindowRes->pageId >= 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -488,7 +486,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
|
||||||
// todo: refactor this
|
// todo: refactor this
|
||||||
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
||||||
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
|
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
|
||||||
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
|
||||||
}
|
}
|
||||||
ASSERT(pInput->pData[j] != NULL);
|
ASSERT(pInput->pData[j] != NULL);
|
||||||
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
|
@ -1024,8 +1021,6 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
||||||
|
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
|
||||||
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
||||||
assert(pResultRow != NULL);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* not assign result buffer yet, add new result buffer
|
* not assign result buffer yet, add new result buffer
|
||||||
* all group belong to one result set, and each group result has different group id so set the id to be one
|
* all group belong to one result set, and each group result has different group id so set the id to be one
|
||||||
|
@ -1279,7 +1274,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||||
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
|
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
|
||||||
//
|
//
|
||||||
// assert(pQueryAttr->limit.offset == 0);
|
|
||||||
// STimeWindow tw = *win;
|
// STimeWindow tw = *win;
|
||||||
// getNextTimeWindow(pQueryAttr, &tw);
|
// getNextTimeWindow(pQueryAttr, &tw);
|
||||||
//
|
//
|
||||||
|
@ -1294,7 +1288,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
// tw = *win;
|
// tw = *win;
|
||||||
// int32_t startPos =
|
// int32_t startPos =
|
||||||
// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
||||||
// assert(startPos >= 0);
|
|
||||||
//
|
//
|
||||||
// // set the abort info
|
// // set the abort info
|
||||||
// pQueryAttr->pos = startPos;
|
// pQueryAttr->pos = startPos;
|
||||||
|
@ -1329,11 +1322,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
|
|
||||||
// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
||||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||||
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
|
||||||
// assert(*start <= pRuntimeEnv->current->lastKey);
|
|
||||||
// } else {
|
|
||||||
// assert(*start >= pRuntimeEnv->current->lastKey);
|
|
||||||
// }
|
|
||||||
//
|
//
|
||||||
// // if queried with value filter, do NOT forward query start position
|
// // if queried with value filter, do NOT forward query start position
|
||||||
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
|
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
|
||||||
|
@ -1347,8 +1335,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
// value is
|
// value is
|
||||||
// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
|
// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
|
||||||
// */
|
// */
|
||||||
// assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
|
|
||||||
//
|
|
||||||
// STimeWindow w = TSWINDOW_INITIALIZER;
|
// STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||||
//
|
//
|
||||||
|
@ -1418,8 +1404,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
// tw = win;
|
// tw = win;
|
||||||
// int32_t startPos =
|
// int32_t startPos =
|
||||||
// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
|
||||||
// assert(startPos >= 0);
|
|
||||||
//
|
|
||||||
// // set the abort info
|
// // set the abort info
|
||||||
// pQueryAttr->pos = startPos;
|
// pQueryAttr->pos = startPos;
|
||||||
// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
|
// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
|
||||||
|
@ -1441,10 +1425,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
|
||||||
if (p->pDownstream == NULL) {
|
|
||||||
assert(p->numOfDownstream == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
|
p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
|
||||||
if (p->pDownstream == NULL) {
|
if (p->pDownstream == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1800,7 +1780,10 @@ int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
|
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
|
||||||
ASSERT(numOfRows != 0);
|
if (numOfRows == 0) {
|
||||||
|
numOfRows = 4096;
|
||||||
|
}
|
||||||
|
|
||||||
pResultInfo->capacity = numOfRows;
|
pResultInfo->capacity = numOfRows;
|
||||||
pResultInfo->threshold = numOfRows * 0.75;
|
pResultInfo->threshold = numOfRows * 0.75;
|
||||||
|
|
||||||
|
@ -1941,7 +1924,6 @@ _error:
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
|
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
|
||||||
assert(pInfo != NULL);
|
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2253,7 +2235,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
|
||||||
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
|
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator != NULL) {
|
if (pOperator != NULL) {
|
||||||
|
@ -2345,7 +2328,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
|
||||||
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(ops);
|
taosMemoryFree(ops);
|
||||||
|
@ -2583,7 +2567,6 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
*pResult = (SResultRow*)value;
|
*pResult = (SResultRow*)value;
|
||||||
ASSERT(*pResult);
|
|
||||||
// set time window for current result
|
// set time window for current result
|
||||||
(*pResult)->win = (*win);
|
(*pResult)->win = (*win);
|
||||||
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
|
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||||
|
|
|
@ -193,8 +193,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
||||||
return pResBlock;
|
return pResBlock;
|
||||||
}
|
}
|
||||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||||
assert(pBlock != NULL);
|
|
||||||
|
|
||||||
blockDataCleanup(pResBlock);
|
blockDataCleanup(pResBlock);
|
||||||
|
|
||||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||||
|
|
|
@ -204,7 +204,6 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
||||||
ASSERT(pKey != NULL);
|
|
||||||
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
|
||||||
|
|
||||||
char* isNull = (char*)pKey;
|
char* isNull = (char*)pKey;
|
||||||
|
@ -570,7 +569,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
(*columnLen) += contentLen;
|
(*columnLen) += contentLen;
|
||||||
ASSERT(*columnLen >= 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(*rows) += 1;
|
(*rows) += 1;
|
||||||
|
@ -681,7 +679,6 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
|
||||||
const SDataGroupInfo* pGroupInfo2 = group2;
|
const SDataGroupInfo* pGroupInfo2 = group2;
|
||||||
|
|
||||||
if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
|
if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
|
||||||
ASSERT(0);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -768,8 +768,6 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
||||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||||
|
|
||||||
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
|
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
|
||||||
ASSERT(rowSize < 100 * 1024 * 1024);
|
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
|
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
|
||||||
&pInfo->matchInfo);
|
&pInfo->matchInfo);
|
||||||
|
|
Loading…
Reference in New Issue