Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-31538-2

This commit is contained in:
Hongze Cheng 2024-08-21 17:27:46 +08:00
commit 5ab2f40ec8
48 changed files with 497 additions and 461 deletions

View File

@ -91,7 +91,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)
#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025)
#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026)
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)
#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027)
@ -152,6 +152,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135)
#define TSDB_CODE_MSG_PREPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0136) // internal
#define TSDB_CODE_OUT_OF_BUFFER TAOS_DEF_ERROR_CODE(0, 0x0137)
#define TSDB_CODE_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0138)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)

View File

@ -139,6 +139,13 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
#define QUERY_CHECK_CODE TSDB_CHECK_CODE
#define QUERY_CHECK_CONDITION(condition, CODE, LINO, LABEL, ERRNO) \
if (!condition) { \
(CODE) = (ERRNO); \
(LINO) = __LINE__; \
goto LABEL; \
}
#define TSDB_CHECK_NULL(ptr, CODE, LINO, LABEL, ERRNO) \
if ((ptr) == NULL) { \
(CODE) = (ERRNO); \

View File

@ -852,34 +852,58 @@ typedef struct SCtgCacheItemInfo {
#define CTG_LOCK(type, _lock) \
do { \
if (CTG_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value before read lock"); \
break; \
} \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
if (atomic_load_32((_lock)) <= 0) { \
qError("invalid lock value after read lock"); \
break; \
} \
} else { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value before write lock"); \
break; \
} \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
qError("invalid lock value after write lock"); \
break; \
} \
} \
} while (0)
#define CTG_UNLOCK(type, _lock) \
do { \
if (CTG_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
if (atomic_load_32((_lock)) <= 0) { \
qError("invalid lock value before read unlock"); \
break; \
} \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value after read unlock"); \
break; \
} \
} else { \
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
qError("invalid lock value before write unlock"); \
break; \
} \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value after write unlock"); \
break; \
} \
} \
} while (0)

View File

@ -773,8 +773,6 @@ int32_t ctgGetTsma(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTsmaNa
}
CTG_ERR_JRET(code);
ASSERT(tsmaRsp.pTsmas && tsmaRsp.pTsmas->size == 1);
*pTsma = taosArrayGetP(tsmaRsp.pTsmas, 0);
taosArrayDestroy(tsmaRsp.pTsmas);

View File

@ -2448,30 +2448,33 @@ int32_t ctgHandleGetTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
STableTSMAInfoRsp* pOut = pMsgCtx->out;
pRes->code = 0;
if (pOut->pTsmas->size > 0) {
ASSERT(pOut->pTsmas->size == 1);
pRes->pRes = pOut;
pMsgCtx->out = NULL;
TSWAP(pTask->res, pCtx->pResList);
if (1 != pOut->pTsmas->size) {
ctgError("invalid tsma num:%d", (int32_t)pOut->pTsmas->size);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0);
if (NULL == pTsma) {
ctgError("fail to get the 0th STableTSMAInfo, totalNum:%d", (int32_t)taosArrayGetSize(pOut->pTsmas));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t exists = false;
CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists));
if (!exists) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, tReq));
}
pRes->pRes = pOut;
pMsgCtx->out = NULL;
TSWAP(pTask->res, pCtx->pResList);
STableTSMAInfo* pTsma = taosArrayGetP(pOut->pTsmas, 0);
if (NULL == pTsma) {
ctgError("fail to get the 0th STableTSMAInfo, totalNum:%d", (int32_t)taosArrayGetSize(pOut->pTsmas));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t exists = false;
CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, pTsma->targetDbFName, pTsma->targetTb, &exists));
if (!exists) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pTsma->targetDbFName, pTsma->targetTb, NULL, tReq));
}
break;
}
default:
ASSERT(0);
ctgError("invalid reqType:%d while getting tsma rsp", reqType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
_return:
@ -2635,14 +2638,14 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
break;
}
default:
ASSERT(0);
ctgError("invalid fetchType:%d while getting tb tsma rsp", pFetch->fetchType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
break;
}
case TDMT_VND_TABLE_META: {
// handle source tb meta
ASSERT(pFetch->fetchType == FETCH_TSMA_SOURCE_TB_META);
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
pFetch->fetchType = FETCH_TB_TSMA;
pFetch->tsmaSourceTbName = *pTbName;
@ -2663,7 +2666,8 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
break;
}
default:
ASSERT(0);
ctgError("invalid reqType:%d while getting tsma rsp", reqType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
_return:
@ -3628,7 +3632,8 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
break;
}
default:
ASSERT(0);
ctgError("invalid fetchType:%d in getting tb tsma task", pFetch->fetchType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
break;
}
}
@ -3644,14 +3649,12 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
// currently, only support fetching one tsma
ASSERT(pCtx->pNames->size == 1);
STablesReq* pReq = taosArrayGet(pCtx->pNames, 0);
if (NULL == pReq) {
ctgError("fail to get the 0th STablesReq, totalNum:%d", (int32_t)taosArrayGetSize(pCtx->pNames));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
ASSERT(pReq->pTables->size == 1);
SName* pTsmaName = taosArrayGet(pReq->pTables, 0);
if (NULL == pReq) {
ctgError("fail to get the 0th SName, totalNum:%d", (int32_t)taosArrayGetSize(pReq->pTables));
@ -3686,7 +3689,6 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) {
}
STableTSMAInfoRsp* pRsp = (STableTSMAInfoRsp*)pRes->pRes;
ASSERT(pRsp->pTsmas->size == 1);
const STSMACache* pTsma = taosArrayGetP(pRsp->pTsmas, 0);
if (NULL == pTsma) {

View File

@ -42,7 +42,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
msgNum = taosArrayGetSize(batchRsp.pRsps);
}
if (ASSERTS(taskNum == msgNum || 0 == msgNum, "taskNum %d mis-match msgNum %d", taskNum, msgNum)) {
if (taskNum != msgNum && 0 != msgNum) {
ctgError("taskNum %d mis-match msgNum %d", taskNum, msgNum);
msgNum = 0;
}
@ -77,7 +78,9 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
if (msgNum > 0) {
pRsp = taosArrayGet(batchRsp.pRsps, i);
if (ASSERTS(pRsp->msgIdx == *msgIdx, "rsp msgIdx %d mis-match msgIdx %d", pRsp->msgIdx, *msgIdx)) {
if (pRsp->msgIdx != *msgIdx) {
ctgError("rsp msgIdx %d mis-match msgIdx %d", pRsp->msgIdx, *msgIdx);
pRsp = &rsp;
pRsp->msgIdx = *msgIdx;
pRsp->reqType = -1;

View File

@ -2631,7 +2631,7 @@ bool hasOutOfDateTSMACache(SArray* pTsmas) {
for (int32_t i = 0; i < pTsmas->size; ++i) {
STSMACache* pTsmaInfo = taosArrayGetP(pTsmas, i);
if (NULL == pTsmaInfo) {
ASSERT(0);
continue;
}
if (isCtgTSMACacheOutOfDate(pTsmaInfo)) {
return true;

View File

@ -298,7 +298,11 @@ int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t resultRows = pBufRes->info.rows;
// the results may be null, if last values are all null
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
if (resultRows != 0 && resultRows != taosArrayGetSize(pInfo->pUidList)) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pTaskInfo->code));
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
pInfo->indexOfBufferedRes = 0;
}

View File

@ -66,14 +66,17 @@ static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; }
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
if (!pBuffInfo) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return NULL;
}
int32_t size = taosArrayGetSize(pCountSup->pWinStates);
// coverity scan
ASSERTS(size > 0, "WinStates is empty");
if (size > 0) {
pCountSup->stateIndex = (pCountSup->stateIndex + 1) % size;
int32_t size = taosArrayGetSize(pCountSup->pWinStates);
if (size == 0) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return NULL;
}
pCountSup->stateIndex = (pCountSup->stateIndex + 1) % size;
return pBuffInfo;
}

View File

@ -87,7 +87,10 @@ static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* p
if (pRes->affectedRows) {
pRes->skey = *(int64_t*)pColSKey->pData;
pRes->ekey = *(int64_t*)pColEKey->pData;
ASSERT(pRes->skey <= pRes->ekey);
if (pRes->skey > pRes->ekey) {
qError("data delter skey:%" PRId64 " is bigger than ekey:%" PRId64, pRes->skey, pRes->ekey);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
} else {
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
@ -205,7 +208,10 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRaw
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
if (NULL == pDeleter->nextOutput.pData) {
ASSERT(pDeleter->queryEnd);
if (!pDeleter->queryEnd) {
qError("empty res while query not end in data deleter");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
pOutput->bufStatus = DS_BUF_EMPTY;

View File

@ -242,7 +242,11 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRow
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (NULL == pDispatcher->nextOutput.pData) {
ASSERT(pDispatcher->queryEnd);
if (!pDispatcher->queryEnd) {
qError("empty res while query not end in data dispatcher");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
pOutput->useconds = pDispatcher->useconds;
pOutput->precision = pDispatcher->pSchema->precision;
pOutput->bufStatus = DS_BUF_EMPTY;

View File

@ -234,7 +234,11 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type);
if (pColInfoData->info.type != pCol->type) {
qError("column:%d type:%d in block dismatch with schema col:%d type:%d", colIdx, pColInfoData->info.type, k, pCol->type);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end;
}
if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
if (NULL == taosArrayPush(pVals, &cv)) {

View File

@ -823,12 +823,14 @@ static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
/*
// debug only
iter = 0;
uint32_t* num = NULL;
while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
ASSERT(*num > 1);
A S S E R T(*num > 1);
}
*/
}
static void buildStbJoinTableList(SOperatorInfo* pOperator) {

View File

@ -158,7 +158,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
SStreamScanInfo* pInfo = pOperator->info;
qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks);
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
QUERY_CHECK_CONDITION((pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
if (type == STREAM_INPUT__MERGED_SUBMIT) {
for (int32_t i = 0; i < numOfBlocks; i++) {
@ -189,7 +190,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
} else {
ASSERT(0);
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
return TSDB_CODE_SUCCESS;
@ -543,7 +545,9 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
int32_t* tversion, int32_t idx, bool* tbGet) {
*tbGet = false;
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
if (tinfo == NULL || dbName == NULL || tableName == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
@ -722,7 +726,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
blockIndex += 1;
current += p->info.rows;
ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT);
QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
void* tmp = taosArrayPush(pResList, &p);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
@ -987,15 +992,17 @@ void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
*scanner = pOperator->info;
break;
} else {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
}
}
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
@ -1007,12 +1014,19 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
", window:%" PRId64 " - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
@ -1024,14 +1038,26 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
"-%" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
return 0;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
return 0;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
@ -1046,9 +1072,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
@ -1063,9 +1086,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
@ -1078,9 +1098,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
@ -1093,9 +1110,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
@ -1108,9 +1122,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
pSup->calTriggerSaved = pSup->calTrigger;
@ -1126,7 +1137,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
if (pOperator->numOfDownstream > 1) {
qError("unexpected stream, multiple downstream");
ASSERT(0);
return -1;
}
return 0;

View File

@ -163,7 +163,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
return NULL;
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
pTaskInfo->code = terrno;
return NULL;
}
}
} else {
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
@ -176,7 +180,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
return NULL;
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
pTaskInfo->code = terrno;
return NULL;
}
}
}
@ -324,6 +332,7 @@ _end:
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
@ -363,7 +372,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
if (hasPk && (j == pkParamIdx)) {
pInput->pPrimaryKey = pInput->pData[j];
}
ASSERT(pInput->pData[j] != NULL);
QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
// todo avoid case: top(k, 12), 12 is the value parameter.
// sum(11), 11 is also the value parameter.
@ -374,14 +383,16 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int
pInput->blankFill = pBlock->info.blankFill;
code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
}
}
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}

View File

@ -55,7 +55,6 @@ typedef struct SFillOperatorInfo {
SExprSupp noFillExprSupp;
} SFillOperatorInfo;
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order);
static void destroyFillOperatorInfo(void* param);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
@ -168,56 +167,6 @@ _end:
return code;
}
// todo refactor: decide the start key according to the query time range.
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order) {
if (order == TSDB_ORDER_ASC) {
int64_t skey = pBlock->info.window.skey;
if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the
ASSERT(taosFillNotStarted(pInfo->pFillInfo));
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
} else if (pInfo->pFillInfo->start < skey) {
int64_t t = skey;
SInterval* pInterval = &pInfo->pFillInfo->interval;
while (1) {
int64_t prev = taosTimeAdd(t, -pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
if (prev <= pInfo->pFillInfo->start) {
t = prev;
break;
}
t = prev;
}
// todo time window chosen problem: t or prev value?
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t);
}
} else {
int64_t ekey = pBlock->info.window.ekey;
if (ekey > pInfo->pFillInfo->start) {
ASSERT(taosFillNotStarted(pInfo->pFillInfo));
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, ekey);
} else if (ekey < pInfo->pFillInfo->start) {
int64_t t = ekey;
SInterval* pInterval = &pInfo->pFillInfo->interval;
int64_t prev = t;
while (1) {
int64_t next = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
if (next >= pInfo->pFillInfo->start) {
prev = t;
t = next;
break;
}
prev = t;
t = next;
}
// todo time window chosen problem: t or next value?
if (t > pInfo->pFillInfo->start) t = prev;
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, t);
}
}
}
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -602,7 +551,6 @@ static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
}
pInfo->win.ekey = ekey;
} else {
assert(order == TSDB_ORDER_DESC);
skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
next = skey;
while (next < pInfo->win.skey) {

View File

@ -60,7 +60,7 @@ int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) {
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
@ -145,7 +145,7 @@ int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOpera
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
@ -248,7 +248,7 @@ int32_t hLeftJoinHandleProbeRows(struct SOperatorInfo* pOperator, SHJoinOperator
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);

View File

@ -38,8 +38,6 @@ bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows) {
}
int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx) {
ASSERT(0 < pJoin->midBlk->info.rows);
TSWAP(pJoin->midBlk, pJoin->finBlk);
pCtx->midRemains = false;
@ -1153,7 +1151,6 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN
if (NULL == pJoin->finBlk) {
QRY_ERR_RET(terrno);
}
ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode));
if (TSDB_CODE_SUCCESS != code) {

View File

@ -321,7 +321,7 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
ASSERT(buildGrp->endIdx >= buildGrp->readIdx);
//A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
@ -1383,9 +1383,9 @@ static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* p
continue;
}
ASSERT(1 == pCtx->midBlk->info.rows);
//A S S E R T(1 == pCtx->midBlk->info.rows);
MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
ASSERT(false == pCtx->midRemains);
//A S S E R T(false == pCtx->midRemains);
break;
} while (true);
@ -1449,10 +1449,10 @@ static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
}
build->pHashCurGrp = *(SArray**)pGrp;
ASSERT(1 == taosArrayGetSize(build->pHashCurGrp));
//A S S E R T(1 == taosArrayGetSize(build->pHashCurGrp));
build->grpRowIdx = 0;
MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, NULL));
ASSERT(build->grpRowIdx < 0);
//A S S E R T(build->grpRowIdx < 0);
}
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
@ -1497,7 +1497,7 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
ASSERT(buildGrp->endIdx >= buildGrp->readIdx);
//A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
@ -1513,9 +1513,9 @@ static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
continue;
}
} else {
ASSERT(1 == pCtx->midBlk->info.rows);
//A S S E R T(1 == pCtx->midBlk->info.rows);
MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
ASSERT(false == pCtx->midRemains);
//A S S E R T(false == pCtx->midRemains);
if (build->grpIdx == buildGrpNum) {
continue;
@ -1555,8 +1555,8 @@ static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
ASSERT(1 == taosArrayGetSize(build->eqGrps));
ASSERT(buildGrp->beginIdx == buildGrp->endIdx);
//A S S E R T(1 == taosArrayGetSize(build->eqGrps));
//A S S E R T(buildGrp->beginIdx == buildGrp->endIdx);
if (probeRows <= rowsLeft) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
@ -1826,7 +1826,7 @@ static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
ASSERT(buildGrp->endIdx >= buildGrp->readIdx);
//A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
@ -2381,7 +2381,7 @@ int32_t mAsofForwardTrimCacheBlk(SMJoinWindowCtx* pCtx) {
if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) {
MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx));
pCtx->pJoin->build->blkRowIdx = 0;
ASSERT(pCtx->pJoin->build->blk == pGrp->blk);
//A S S E R T(pCtx->pJoin->build->blk == pGrp->blk);
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
}
@ -2419,16 +2419,16 @@ int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
ASSERT(pGrp->blk == pCache->outBlk);
//A S S E R T(pGrp->blk == pCache->outBlk);
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
}
//ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
//A S S E R T((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
}
ASSERT(taosArrayGetSize(pCache->grps) == 1);
ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum);
//A S S E R T(taosArrayGetSize(pCache->grps) == 1);
//A S S E R T(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum);
}
do {
@ -2473,7 +2473,7 @@ int32_t mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
ASSERT(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx));
//A S S E R T(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx));
pGrp->endIdx = pGrp->blk->info.rows - 1;
int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1);
@ -2550,8 +2550,8 @@ int32_t mAsofForwardSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
MJOIN_RESTORE_TB_BLK(cache, pTable);
} while (!MJOIN_BUILD_TB_ROWS_DONE(pTable));
ASSERT(pCtx->cache.rowNum == 0);
ASSERT(taosArrayGetSize(pCtx->cache.grps) == 0);
//A S S E R T(pCtx->cache.rowNum == 0);
//A S S E R T(taosArrayGetSize(pCtx->cache.grps) == 0);
if (pTable->dsFetchDone) {
return TSDB_CODE_SUCCESS;
@ -2648,7 +2648,7 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo
if ((probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) && pCtx->cache.rowNum < pCtx->jLimit) {
pJoin->build->newBlk = false;
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
ASSERT(taosArrayGetSize(pCtx->cache.grps) <= 1);
//A S S E R T(taosArrayGetSize(pCtx->cache.grps) <= 1);
buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
@ -3375,7 +3375,7 @@ int32_t mWinJoinMoveAscWinEnd(SMJoinWindowCtx* pCtx) {
continue;
}
ASSERT(pGrp->endIdx > startIdx);
//A S S E R T(pGrp->endIdx > startIdx);
pGrp->endIdx--;
break;
@ -3419,7 +3419,7 @@ int32_t mWinJoinMoveDescWinEnd(SMJoinWindowCtx* pCtx) {
continue;
}
ASSERT(pGrp->endIdx > startIdx);
//A S S E R T(pGrp->endIdx > startIdx);
pGrp->endIdx--;
break;
@ -3676,7 +3676,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
MJ_ERR_RET(terrno);
}
ASSERT(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
//A S S E R T(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));

View File

@ -490,8 +490,6 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
ASSERT(0 < pCtx->midBlk->info.rows);
TSWAP(pCtx->midBlk, pCtx->finBlk);
pCtx->midRemains = false;
@ -567,7 +565,6 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
ASSERT(secondRows > 0);
for (int32_t c = 0; c < probe->finNum; ++c) {
SMJoinColMap* pFirstCol = probe->finCols + c;
@ -581,9 +578,15 @@ int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
} else {
ASSERT(pRes->info.capacity >= (pRes->info.rows + firstRows * secondRows));
if (pRes->info.capacity < (pRes->info.rows + firstRows * secondRows)) {
qError("capacity:%d not enough, rows:%" PRId64 ", firstRows:%d, secondRows:%d", pRes->info.capacity, pRes->info.rows, firstRows, secondRows);
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes);
ASSERT((startOffset + 1 * pOutCol->info.bytes) <= pRes->info.capacity * pOutCol->info.bytes);
if ((startOffset + 1 * pOutCol->info.bytes) > pRes->info.capacity * pOutCol->info.bytes) {
qError("col buff not enough, startOffset:%d, bytes:%d, capacity:%d", startOffset, pOutCol->info.bytes, pRes->info.capacity);
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true));
}
}
@ -1097,7 +1100,6 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa
}
SMJoinTableCtx* pProbe = pJoin->probe;
ASSERT(pProbe->lastInGid);
while (true) {
if (pTable->remainInBlk) {

View File

@ -1236,14 +1236,11 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
taosRUnLockLatch(&pTaskInfo->lock);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(pInfo->base.dataReader == NULL);
QUERY_CHECK_CONDITION((pInfo->base.dataReader == NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), &pInfo->pIgnoreTables);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->filesetDelimited) {
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
@ -1585,7 +1582,6 @@ static bool isCountWindow(SStreamScanInfo* pInfo) {
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
ASSERT(rowIndex < pBlock->info.rows);
pInfo->groupId = groupCol[rowIndex];
}
@ -1654,9 +1650,8 @@ _end:
bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) {
// coverity scan
ASSERTS(pVal != NULL, "pVal should not be NULL");
if (!pVal) {
qError("failed to compare primary key, since primary key is null");
if (!pVal || !pCol) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA));
return false;
}
void* pData = colDataGetData(pCol, rowId);
@ -1720,6 +1715,8 @@ static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
}
static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex, bool* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pBlock->info.rows == 0) {
if (pRes) {
(*pRes) = false;
@ -1733,7 +1730,6 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
goto _end;
}
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -1765,27 +1761,25 @@ static void prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
win.skey = TMIN(win.skey, startData[*pRowIndex]);
continue;
}
ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
!(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
break;
}
STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info;
// coverity scan
ASSERTS(pInfo->pUpdateInfo != NULL, "Failed to set data version, since pInfo->pUpdateInfo is NULL");
if (pInfo->pUpdateInfo) {
qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey,
pInfo->pUpdateInfo->maxDataVersion);
resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
}
QUERY_CHECK_NULL(pInfo->pUpdateInfo, code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey,
pInfo->pUpdateInfo->maxDataVersion);
resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
pInfo->pTableScanOp->status = OP_OPENED;
if (pRes) {
(*pRes) = true;
}
_end:
qTrace("%s success", __func__);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
@ -2193,7 +2187,6 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
}
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
int64_t ver = pSrcBlock->info.version - 1;
@ -2291,7 +2284,6 @@ static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock*
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
int64_t ver = pSrcBlock->info.version - 1;
@ -2352,7 +2344,6 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock
pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
}
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
int64_t ver = pSrcBlock->info.version - 1;
@ -2472,7 +2463,6 @@ static int32_t checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBl
QUERY_CHECK_CODE(code, lino, _end);
}
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
SColumnInfoData* pPkColDataInfo = NULL;
if (hasPrimaryKeyCol(pInfo)) {
@ -2549,7 +2539,7 @@ static int32_t doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STi
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
ASSERT(pCol->pData != NULL);
QUERY_CHECK_NULL(pCol->pData, code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
@ -2598,7 +2588,8 @@ static int32_t doBlockDataPrimaryKeyFilter(SSDataBlock* pBlock, STqOffsetVal* of
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
qDebug("doBlockDataWindowFilter primary key, ts:%" PRId64 " %" PRId64, offset->ts, offset->primaryKey.val);
ASSERT(pColPk->info.type == offset->primaryKey.type);
QUERY_CHECK_CONDITION((pColPk->info.type == offset->primaryKey.type), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
__compar_fn_t func = getComparFunc(pColPk->info.type, 0);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
@ -3940,7 +3931,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1);
code = pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS);
QUERY_CHECK_CONDITION((pInfo->pUpdateInfo->minTS > pUpInfo->minTS), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion);
SHashObj* curMap = pInfo->pUpdateInfo->pMap;
void* pIte = taosHashIterate(curMap, NULL);
@ -4100,12 +4093,11 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
}
if (pHandle->initTqReader) {
ASSERT(pHandle->tqReader == NULL);
pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode);
ASSERT(pInfo->tqReader);
QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, terrno);
} else {
ASSERT(pHandle->tqReader);
pInfo->tqReader = pHandle->tqReader;
QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
pInfo->pUpdateInfo = NULL;
@ -5882,7 +5874,10 @@ void destroyTableMergeScanOperatorInfo(void* param) {
}
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL);
if (pOptr == NULL) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_INVALID_PARA));
return TSDB_CODE_INVALID_PARA;
}
// TODO: merge these two info into one struct
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
if (!execInfo) {
@ -6195,7 +6190,7 @@ int32_t fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, ch
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSupp->dbNameSlotId != -1) {
ASSERT(strlen(dbName));
QUERY_CHECK_CONDITION((strlen(dbName) > 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
QUERY_CHECK_NULL(colInfoData, code, lino, _end, terrno);

View File

@ -735,7 +735,12 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
}
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP);
if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
pOperator->pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->matchInfo.pList, pInfo, &pBlock);
if (pBlock != NULL && (code == 0)) {

View File

@ -88,7 +88,7 @@ int32_t setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t group
winCode = TSDB_CODE_FAILED;
} else if (pBuffInfo->winBuffOp == MOVE_NEXT_WINDOW) {
ASSERT(pBuffInfo->pCur);
QUERY_CHECK_NULL(pBuffInfo->pCur, code, lino, _end, terrno);
pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur);
winCode = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
(void**)&pCurWin->winInfo.pStatePos, &size);
@ -345,7 +345,6 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
if (slidingRows + winRows > pAggSup->windowSliding) {
buffInfo.winBuffOp = CREATE_NEW_WINDOW;
winRows = pAggSup->windowSliding - slidingRows;
ASSERT(i >= 0);
}
} else {
buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
@ -690,7 +689,10 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pInfo->scalarSupp.pExprInfo != NULL) {

View File

@ -378,7 +378,6 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild, &winRows);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(winRows >= 1);
if (rebuild) {
uint64_t uid = 0;
code = appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey,
@ -660,7 +659,10 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
QUERY_CHECK_CODE(code, lino, _end);
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
@ -779,7 +781,6 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
qDebug("===stream=== event window operator reload state. get result count:%d", num);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);

View File

@ -331,7 +331,7 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
pFillInfo->pLinearInfo->winIndex = 0;
} break;
default:
ASSERT(0);
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
break;
}
}
@ -419,7 +419,6 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
pFillInfo->preRowKey = INT64_MIN;
} else {
ASSERT(hasNextWindow(pFillSup));
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
}
@ -447,7 +446,6 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false;
} else {
ASSERT(hasNextWindow(pFillSup));
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
@ -458,10 +456,9 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
}
} break;
default:
ASSERT(0);
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
break;
}
ASSERT(pFillInfo->pos != FILL_POS_INVALID);
}
static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) {
@ -628,7 +625,9 @@ static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SRes
SWinKey key = {.groupId = groupId, .ts = pRow->key};
int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 " code:%d", key.ts, key.groupId, code);
ASSERT(code == TSDB_CODE_SUCCESS);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
}
static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
@ -795,7 +794,6 @@ static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint
if (winCode != TSDB_CODE_SUCCESS) {
colDataSetNULL(pTableCol, pBlock->info.rows);
} else {
ASSERT(tbname);
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
@ -1125,7 +1123,7 @@ static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
return code;
} break;
default:
ASSERTS(false, "invalid SSDataBlock type");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
}

View File

@ -213,7 +213,6 @@ static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) {
}
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0");
return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
}
@ -415,7 +414,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
code = appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
} else {
ASSERT(tbname);
QUERY_CHECK_CONDITION((tbname), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
code = appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
@ -921,7 +920,6 @@ void buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBl
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
break;
}
pGroupResInfo->index += 1;
@ -1371,7 +1369,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
int32_t winCode = TSDB_CODE_SUCCESS;
code = pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(winCode == TSDB_CODE_SUCCESS);
QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
code = tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
QUERY_CHECK_CODE(code, lino, _end);
@ -1699,7 +1697,10 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
} else if (IS_FINAL_INTERVAL_OP(pOperator) && pBlock->info.type == STREAM_MID_RETRIEVE) {
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
@ -1888,7 +1889,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
.deleteMarkSaved = 0,
.calTriggerSaved = 0,
};
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
@ -2096,7 +2096,6 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
pSup->pCtx[i].saveHandle.pBuf = NULL;
}
ASSERT(numOfCols > 0);
return TSDB_CODE_SUCCESS;
}
@ -2397,7 +2396,6 @@ _end:
static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
ASSERT(pWinInfo->sessionWin.win.skey <= pWinInfo->sessionWin.win.ekey);
*pResult = (SResultRow*)pWinInfo->pStatePos->pRowBuff;
// set time window for current result
(*pResult)->win = pWinInfo->sessionWin.win;
@ -3028,7 +3026,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
break;
}
@ -3268,7 +3265,7 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(winCode == TSDB_CODE_SUCCESS);
QUERY_CHECK_CONDITION((winCode == TSDB_CODE_SUCCESS), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
code =
@ -3282,7 +3279,6 @@ int32_t doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpe
// 3.pChildren
int32_t size = 0;
buf = taosDecodeFixedI32(buf, &size);
ASSERT(size <= taosArrayGetSize(pInfo->pChildren));
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
code = doStreamSessionDecodeOpState(buf, 0, pChOp, false, &buf);
@ -3453,7 +3449,10 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
@ -3627,7 +3626,6 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
code = getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &winInfo);
@ -3673,7 +3671,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
@ -3996,7 +3993,10 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
doStreamSessionSaveCheckpoint(pOperator);
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
@ -4211,7 +4211,8 @@ int32_t getStateWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey,
pCurWin->winInfo.sessionWin.win.ekey = pKey->win.ekey;
code = getSessionWindowInfoByKey(pAggSup, pKey, &pCurWin->winInfo);
QUERY_CHECK_CODE(code, lino, _end);
ASSERT(IS_VALID_SESSION_WIN(pCurWin->winInfo));
QUERY_CHECK_CONDITION((IS_VALID_SESSION_WIN(pCurWin->winInfo)), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize));
@ -4581,7 +4582,6 @@ int32_t doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
// 3.pChildren
int32_t size = 0;
buf = taosDecodeFixedI32(buf, &size);
ASSERT(size <= taosArrayGetSize(pInfo->pChildren));
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i);
code = doStreamStateDecodeOpState(buf, 0, pChOp, false, &buf);
@ -4723,7 +4723,10 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
@ -4835,7 +4838,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey);
qDebug("===stream=== reload state. get result count:%d", num);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY));
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
@ -5141,7 +5143,10 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_NORMAL && pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
@ -5260,8 +5265,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
pOperator->pTaskInfo = pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
@ -5642,7 +5645,6 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
} else {
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
}
ASSERT(taosArrayGetSize(pInfo->pUpdated) == 0);
(*ppRes) = pInfo->pDelRes;
return code;
}
@ -5691,7 +5693,10 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
pInfo->clearState = true;
break;
} else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
if (pBlock->info.type != STREAM_INVALID) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
}
if (pInfo->scalarSupp.pExprInfo != NULL) {

View File

@ -114,7 +114,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
char* p = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
memcpy(pLinearInfo->start.val, p, varDataTLen(p));
} else {
memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes);
@ -127,7 +126,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
char* p = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
memcpy(pLinearInfo->end.val, p, varDataTLen(p));
} else {
memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
@ -143,7 +141,6 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
char* p = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
ASSERT(varDataTLen(p) <= pColInfoData->info.bytes);
memcpy(pLinearInfo->end.val, p, varDataTLen(p));
} else {
memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);

View File

@ -136,7 +136,6 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
// }
}
ASSERT(forwardRows >= 0);
return forwardRows;
}
@ -211,8 +210,6 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
ASSERT(startPos >= 0 && startPos < pDataBlockInfo->rows);
int32_t num = -1;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
@ -259,8 +256,6 @@ void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY pr
SFunctParam* pParam = &pCtx[k].param[0];
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
ASSERT(pColInfo->info.type == pParam->pCol->type && curTs != windowKey);
double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) {
SGroupKeys* p = taosArrayGet(pPrevValues, index);
@ -356,9 +351,11 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
return true;
}
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
TSKEY blockEkey, STimeWindow* win) {
TSKEY blockEkey, STimeWindow* win, bool* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t order = pInfo->binfo.inputTsOrder;
TSKEY actualEndKey = tsCols[endRowIndex];
@ -367,21 +364,27 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
// not ended in current data block, do not invoke interpolation
if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
return false;
(*pRes) = false;
return code;
}
// there is actual end point of current time window, no interpolation needs
if (key == actualEndKey) {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
return true;
(*pRes) = true;
return code;
}
ASSERT(nextRowIndex >= 0);
if (nextRowIndex < 0) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
TSKEY nextKey = tsCols[nextRowIndex];
doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
RESULT_ROW_END_INTERP, pSup);
return true;
(*pRes) = true;
return code;
}
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
@ -437,13 +440,7 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl
* This time window does not cover any data, try next time window,
* this case may happen when the time window is too small
*/
if (primaryKeys == NULL) {
if (ascQuery) {
ASSERT(pDataBlockInfo->window.skey <= pNext->ekey);
} else {
ASSERT(pDataBlockInfo->window.ekey >= pNext->skey);
}
} else {
if (primaryKeys != NULL) {
if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
TSKEY next = primaryKeys[startPos];
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
@ -469,7 +466,6 @@ int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBl
}
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
ASSERT(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP));
if (type == RESULT_ROW_START_INTERP) {
return pResult->startInterp == true;
} else {
@ -485,15 +481,21 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
}
}
static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pInfo->timeWindowInterpo) {
return;
return code;
}
if (pBlock == NULL) {
code = TSDB_CODE_INVALID_PARA;
return code;
}
ASSERT(pBlock != NULL);
if (pBlock->pDataBlock == NULL) {
return;
return code;
}
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
@ -528,14 +530,22 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
}
TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
endKey, win);
bool interp = false;
code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols,
endKey, win, &interp);
QUERY_CHECK_CODE(code, lino, _end);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
} else {
setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
@ -558,7 +568,6 @@ static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, S
char* val = colDataGetData(pColInfo, i);
if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val));
ASSERT(varDataTLen(val) <= pkey->bytes);
} else {
memcpy(pkey->pData, val, pkey->bytes);
}
@ -594,11 +603,17 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (pr->closed) {
ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
isResultRowInterpolated(pr, RESULT_ROW_END_INTERP));
if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)) ) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
taosMemoryFree(pNode);
continue;
@ -611,7 +626,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
if(isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
if (!pTsKey) {
@ -764,11 +782,14 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, ret);
}
// window start key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
@ -796,7 +817,10 @@ static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
pInfo->binfo.inputTsOrder);
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// TODO: add to open window? how to close the open windows after input blocks exhausted?
#if 0
if ((ascScan && ekey <= pBlock->info.window.ekey) ||
@ -869,7 +893,10 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo
}
tsCols = (int64_t*)pColDataInfo->pData;
ASSERT(tsCols[0] != 0);
if(tsCols[0] == 0) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
// no data in primary ts
if (tsCols[0] == 0 && tsCols[pBlock->info.rows - 1] == 0) {
@ -1959,7 +1986,10 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
} else {
if (pMiaInfo->groupId != pBlock->info.id.groupId) {
// if there are unclosed time window, close it firstly.
ASSERT(pMiaInfo->curTs != INT64_MIN);
if (pMiaInfo->curTs == INT64_MIN) {
pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
T_LONG_JMP(pTaskInfo->env, terrno);
}
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
@ -2216,7 +2246,9 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
TSKEY ekey = ascScan ? win.ekey : win.skey;
int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
iaInfo->binfo.inputTsOrder);
ASSERT(forwardRows > 0);
if(forwardRows <= 0) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
@ -2227,11 +2259,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, ret);
}
// window start key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
if (ret != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, ret);
}
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
@ -2268,7 +2303,10 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
iaInfo->binfo.inputTsOrder);
// window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,

View File

@ -338,38 +338,62 @@ typedef struct SQWorkerMgmt {
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
} else { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
} \
#define QW_LOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value before read lock"); \
break; \
} \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) <= 0) { \
qError("invalid lock value after read lock"); \
break; \
} \
} else { \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value before write lock"); \
break; \
} \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
qError("invalid lock value after write lock"); \
break; \
} \
} \
} while (0)
#define QW_UNLOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
} else { \
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
} \
#define QW_UNLOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
if (atomic_load_32((_lock)) <= 0) { \
qError("invalid lock value before read unlock"); \
break; \
} \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value after read unlock"); \
break; \
} \
} else { \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
qError("invalid lock value before write unlock"); \
break; \
} \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value after write unlock"); \
break; \
} \
} \
} while (0)
extern SQWorkerMgmt gQwMgmt;

View File

@ -501,34 +501,58 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOCK(type, _lock) \
do { \
if (SCH_READ == (type)) { \
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value before read lock"); \
break; \
} \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock"); \
if (atomic_load_32((_lock)) <= 0) { \
qError("invalid lock value after read lock"); \
break; \
} \
} else { \
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value before write lock"); \
break; \
} \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
qError("invalid lock value after write lock"); \
break; \
} \
} \
} while (0)
#define SCH_UNLOCK(type, _lock) \
do { \
if (SCH_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
if (atomic_load_32((_lock)) <= 0) { \
qError("invalid lock value before read unlock"); \
break; \
} \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value after read unlock"); \
break; \
} \
} else { \
ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
qError("invalid lock value before write unlock"); \
break; \
} \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
if (atomic_load_32((_lock)) < 0) { \
qError("invalid lock value after write unlock"); \
break; \
} \
} \
} while (0)

View File

@ -282,7 +282,6 @@ _end:
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
SWinKey* pTmpkey = pKey;
ASSERT(keyLen == sizeof(SWinKey));
SSessionKey pWinKey = {.groupId = pTmpkey->groupId, .win.skey = pTmpkey->ts, .win.ekey = pTmpkey->ts};
return getSessionWinResultBuff(pFileState, &pWinKey, 0, pVal, pVLen, pWinCode);
}
@ -455,7 +454,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
SSessionKey pTmpKey = *pWinKey;
int32_t winCode = TSDB_CODE_SUCCESS;
code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen, &winCode);
ASSERT(winCode == TSDB_CODE_FAILED);
QUERY_CHECK_CONDITION((winCode == TSDB_CODE_FAILED), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
QUERY_CHECK_CODE(code, lino, _end);
goto _end;
}

View File

@ -564,7 +564,7 @@ _end:
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
ASSERT(pInfo);
QUERY_CHECK_NULL(pInfo, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
@ -572,6 +572,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
int32_t size = 0;
if (tDecodeI32(&decoder, &size) < 0) return -1;
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno);
TSKEY ts = INT64_MIN;
for (int32_t i = 0; i < size; i++) {
if (tDecodeI64(&decoder, &ts) < 0) return -1;
@ -623,7 +625,8 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
QUERY_CHECK_CODE(code, lino, _error);
}
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
QUERY_CHECK_CONDITION((mapSize == taosHashGetSize(pInfo->pMap)), code, lino, _error,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1;
@ -634,7 +637,6 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
if (pInfo->pkColLen != 0) {
pInfo->comparePkRowFn = compareKeyTsAndPk;
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);
;
} else {
pInfo->comparePkRowFn = compareKeyTs;
pInfo->comparePkCol = NULL;

View File

@ -125,7 +125,6 @@ static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
int32_t tmp = taosEncodeFixedI64(&buff, *pKey);
ASSERT(tmp == sizeof(TSKEY));
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
@ -204,7 +203,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
int32_t len = 0;
int32_t tmpRes = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
if (tmpRes == TSDB_CODE_SUCCESS) {
ASSERT(len == sizeof(TSKEY));
QUERY_CHECK_CONDITION((len == sizeof(TSKEY)), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
streamFileStateDecode(&pFileState->flushMark, valBuf, len);
qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
}
@ -361,7 +360,7 @@ int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList,
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (pPos->beUsed == used) {
if (used && !pPos->pRowBuff) {
ASSERT(pPos->needFree == true);
QUERY_CHECK_CONDITION((pPos->needFree == true), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
continue;
}
code = tdListAppend(pFlushList, &pPos);
@ -496,13 +495,13 @@ _end:
code = tdListAppend(pFileState->usedBuffs, &pPos);
QUERY_CHECK_CODE(code, lino, _error);
QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}
ASSERT(pPos->pRowBuff != NULL);
return pPos;
}
@ -636,7 +635,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
QUERY_CHECK_CODE(code, lino, _end);
pPos->pRowBuff = getFreeBuff(pFileState);
}
ASSERT(pPos->pRowBuff);
QUERY_CHECK_CONDITION((pPos->pRowBuff != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
code = recoverSessionRowBuff(pFileState, pPos);
@ -877,7 +876,10 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
taosMemoryFreeClear(pVal);
break;
}
ASSERT(vlen == pFileState->rowSize);
if (vlen != pFileState->rowSize) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;

View File

@ -161,7 +161,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
if (pgno == 0) {
tdbError("tdb/btree-open: pgno cannot be zero.");
tdbOsFree(pBt);
ASSERT(0);
return TSDB_CODE_INTERNAL_ERROR;
}
pBt->root = pgno;
/*
@ -418,10 +418,6 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2
int mlen;
int cret;
if (ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL)) {
// -1 is less than
}
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
@ -571,14 +567,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
nOlds = 3;
}
for (int i = 0; i < nOlds; i++) {
if (ASSERT(sIdx + i <= nCells)) {
if (!(sIdx + i <= nCells)) {
return TSDB_CODE_FAILED;
}
SPgno pgno;
if (sIdx + i == nCells) {
if (ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pParent))) {
return TSDB_CODE_FAILED;
if (TDB_BTREE_PAGE_IS_LEAF(pParent)) {
return TSDB_CODE_INTERNAL_ERROR;
}
pgno = ((SIntHdr *)(pParent->pData))->pgno;
} else {
@ -685,8 +681,6 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
// page is full, use a new page
nNews++;
ASSERT(infoNews[nNews].size + cellBytes <= TDB_PAGE_USABLE_SIZE(pPage));
if (childNotLeaf) {
// for non-child page, this cell is used as the right-most child,
// the divider cell to parent as well
@ -732,7 +726,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
szRCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL);
}
if (ASSERT(infoNews[iNew - 1].cnt > 0)) {
if (!(infoNews[iNew - 1].cnt > 0)) {
return TSDB_CODE_FAILED;
}
@ -822,10 +816,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
pCell = tdbPageGetCell(pPage, oIdx);
szCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL);
if (ASSERT(nNewCells <= infoNews[iNew].cnt)) {
if (!(nNewCells <= infoNews[iNew].cnt)) {
return TSDB_CODE_FAILED;
}
if (ASSERT(iNew < nNews)) {
if (!(iNew < nNews)) {
return TSDB_CODE_FAILED;
}
@ -866,10 +860,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
}
} else {
if (ASSERT(childNotLeaf)) {
if (!(childNotLeaf)) {
return TSDB_CODE_FAILED;
}
if (ASSERT(iNew < nNews - 1)) {
if (!(iNew < nNews - 1)) {
return TSDB_CODE_FAILED;
}
@ -877,7 +871,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
((SIntHdr *)pNews[iNew]->pData)->pgno = ((SPgno *)pCell)[0];
// insert to parent as divider cell
if (ASSERT(iNew < nNews - 1)) {
if (!(iNew < nNews - 1)) {
return TSDB_CODE_FAILED;
}
((SPgno *)pCell)[0] = TDB_PAGE_PGNO(pNews[iNew]);
@ -894,7 +888,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
if (childNotLeaf) {
if (ASSERT(TDB_PAGE_TOTAL_CELLS(pNews[nNews - 1]) == infoNews[nNews - 1].cnt)) {
if (!(TDB_PAGE_TOTAL_CELLS(pNews[nNews - 1]) == infoNews[nNews - 1].cnt)) {
return TSDB_CODE_FAILED;
}
((SIntHdr *)(pNews[nNews - 1]->pData))->pgno = rPgno;
@ -1091,7 +1085,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
nLeft -= kLen;
// pack partial val to local if any space left
if (nLocal > nHeader + kLen + sizeof(SPgno)) {
if (ASSERT(pVal != NULL && vLen != 0)) {
if (!(pVal != NULL && vLen != 0)) {
tdbFree(pBuf);
return TSDB_CODE_FAILED;
}
@ -1259,14 +1253,14 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
int nPayload;
int ret;
if (ASSERT(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen)) {
return TSDB_CODE_FAILED;
if (!(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen)) {
return TSDB_CODE_INVALID_PARA;
}
if (ASSERT(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen)) {
return TSDB_CODE_FAILED;
if (!(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen)) {
return TSDB_CODE_INVALID_PARA;
}
if (ASSERT(pKey != NULL && kLen > 0)) {
return TSDB_CODE_FAILED;
if (!(pKey != NULL && kLen > 0)) {
return TSDB_CODE_INVALID_PARA;
}
nPayload = 0;
@ -1645,7 +1639,6 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
SArray *ofps = pPage->pPager->ofps;
if (ofps) {
if (taosArrayPush(ofps, &ofp) == NULL) {
ASSERT(0);
return terrno;
}
}
@ -2438,7 +2431,10 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
lidx = 0;
ridx = nCells - 1;
ASSERT(nCells > 0);
if (nCells <= 0) {
tdbError("tdb/btc-move-to: empty page.");
return TSDB_CODE_FAILED;
}
// compare first cell
pBtc->idx = lidx;

View File

@ -522,7 +522,9 @@ static int tdbPageDefragment(SPage *pPage) {
SCell *pCell = TDB_PAGE_CELL_AT(pPage, aCellIdx[iCell].iCell);
int32_t szCell = pPage->xCellSize(pPage, pCell, 0, NULL, NULL);
ASSERT(pNextCell - szCell >= pCell);
if (pNextCell - szCell < pCell) {
return TSDB_CODE_INTERNAL_ERROR;
}
pNextCell -= szCell;
if (pNextCell > pCell) {
@ -535,7 +537,11 @@ static int tdbPageDefragment(SPage *pPage) {
TDB_PAGE_FCELL_SET(pPage, 0);
tdbOsFree(aCellIdx);
ASSERT(pPage->pFreeEnd - pPage->pFreeStart == nFree);
if (pPage->pFreeEnd - pPage->pFreeStart != nFree) {
tdbError("tdb/page-defragment: nFree: %d, pFreeStart: %p, pFreeEnd: %p.", nFree, pPage->pFreeStart,
pPage->pFreeEnd);
return TSDB_CODE_INTERNAL_ERROR;
}
return 0;
}

View File

@ -16,19 +16,7 @@
#include "crypt.h"
#include "tdbInt.h"
#include "tglobal.h"
/*
#pragma pack(push, 1)
typedef struct {
u8 hdrString[16];
u16 pageSize;
SPgno freePage;
u32 nFreePages;
u8 reserved[102];
} SFileHdr;
#pragma pack(pop)
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
*/
struct hashset_st {
size_t nbits;
size_t mask;
@ -450,7 +438,6 @@ static char *tdbEncryptPage(SPager *pPager, char *pPageData, int32_t pageSize, c
if (encryptAlgorithm == DND_CA_SM4) {
// tdbInfo("CBC_Encrypt key:%d %s %s", encryptAlgorithm, encryptKey, __FUNCTION__);
// ASSERT(strlen(encryptKey) > 0);
// tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d before Encrypt", offset, pPage->pData[0]);
@ -915,7 +902,6 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
if (encryptAlgorithm == DND_CA_SM4) {
// tdbInfo("CBC_Decrypt key:%d %s %s", encryptAlgorithm, encryptKey, __FUNCTION__);
// ASSERT(strlen(encryptKey) > 0);
// uint8_t flags = pPage->pData[0];
// tdbInfo("CBC tdb offset:%" PRId64 ", flag:%d before Decrypt", ((i64)pPage->pageSize) * (pgno - 1), flags);

View File

@ -40,7 +40,6 @@ int tdbTxnCloseImpl(TXN *pTxn) {
if (pTxn->jfd) {
TAOS_UNUSED(tdbOsClose(pTxn->jfd));
ASSERT(pTxn->jfd == NULL);
}
tdbOsFree(pTxn);

View File

@ -319,7 +319,6 @@ static inline int tdbTryLockPage(tdb_spinlock_t *pLock) {
} else if (ret == EBUSY) {
return P_LOCK_BUSY;
} else {
ASSERT(0);
return P_LOCK_FAIL;
}
}
@ -354,7 +353,10 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
int iOvfl;
int lidx;
ASSERT(idx >= 0 && idx < TDB_PAGE_TOTAL_CELLS(pPage));
if (idx < 0 || idx >= TDB_PAGE_TOTAL_CELLS(pPage)) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
iOvfl = 0;
for (; iOvfl < pPage->nOverflow; iOvfl++) {
@ -367,7 +369,6 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
}
lidx = idx - iOvfl;
ASSERT(lidx >= 0 && lidx < pPage->pPageMethods->getCellNum(pPage));
pCell = pPage->pData + pPage->pPageMethods->getCellOffset(pPage, lidx);
return pCell;

View File

@ -22,12 +22,6 @@
extern "C" {
#endif
#if __STDC_VERSION__ >= 201112LL
#define TDB_STATIC_ASSERT(op, info) static_assert(op, info)
#else
#define TDB_STATIC_ASSERT(op, info)
#endif
#define TDB_ROUND8(x) (((x) + 7) & ~7)
int tdbGnrtFileID(tdb_fd_t fd, uint8_t *fileid, bool unique);

View File

@ -454,7 +454,6 @@ int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t
#if FILE_WITH_LOCK
taosThreadRwlockWrlock(&(pFile->rwlock));
#endif
ASSERT(pFile->hFile != NULL); // Please check if you have closed the file.
if (pFile->hFile == NULL) {
#if FILE_WITH_LOCK
taosThreadRwlockUnlock(&(pFile->rwlock));
@ -867,7 +866,6 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
#endif
int32_t code = 0;
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
#ifdef WINDOWS
int64_t ret = _lseeki64(pFile->fd, offset, whence);
@ -1479,7 +1477,6 @@ int32_t taosEOFFile(TdFilePtr pFile) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
ASSERT(pFile->fp != NULL);
if (pFile->fp == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;

View File

@ -326,10 +326,8 @@ void *taosMemoryRealloc(void *ptr, int64_t size) {
if (ptr == NULL) return taosMemoryMalloc(size);
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
+return NULL;
+
return NULL;
}
TdMemoryInfo tdMemoryInfo;
@ -366,7 +364,6 @@ char *taosStrdup(const char *ptr) {
if (ptr == NULL) return NULL;
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
return NULL;
}
@ -413,7 +410,6 @@ int64_t taosMemorySize(void *ptr) {
#ifdef USE_TD_MEMORY
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
return NULL;
}
@ -441,7 +437,7 @@ void taosMemoryTrim(int32_t size) {
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
#ifdef USE_TD_MEMORY
ASSERT(0);
return NULL;
#else
#if defined(LINUX)
#ifdef BUILD_WITH_RAND_ERR

View File

@ -312,8 +312,7 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
return -1;
}
#ifdef WINDOWS
ASSERT(0);
return 0;
return -1;
#else
return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen);
#endif
@ -681,8 +680,7 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
int taosGetLocalIp(const char *eth, char *ip) {
#if defined(WINDOWS)
// DO NOTHAING
ASSERT(0);
return 0;
return -1;
#else
int fd;
struct ifreq ifr;
@ -708,8 +706,7 @@ int taosGetLocalIp(const char *eth, char *ip) {
int taosValidIp(uint32_t ip) {
#if defined(WINDOWS)
// DO NOTHAING
ASSERT(0);
return 0;
return -1;
#else
int ret = -1;
int fd;
@ -1111,7 +1108,7 @@ int32_t taosIgnSIGPIPE() {
int32_t taosSetMaskSIGPIPE() {
#ifdef WINDOWS
// ASSERT(0);
return -1;
#else
sigset_t signal_mask;
(void)sigemptyset(&signal_mask);

View File

@ -425,10 +425,6 @@ int64_t taosStr2Int64(const char *str, char **pEnd, int32_t radix) {
int64_t tmp = strtoll(str, pEnd, radix);
#if defined(DARWIN) || defined(_ALPINE)
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -437,10 +433,6 @@ uint64_t taosStr2UInt64(const char *str, char **pEnd, int32_t radix) {
uint64_t tmp = strtoull(str, pEnd, radix);
#if defined(DARWIN) || defined(_ALPINE)
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -449,10 +441,6 @@ int32_t taosStr2Int32(const char *str, char **pEnd, int32_t radix) {
int32_t tmp = strtol(str, pEnd, radix);
#if defined(DARWIN) || defined(_ALPINE)
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -461,10 +449,6 @@ uint32_t taosStr2UInt32(const char *str, char **pEnd, int32_t radix) {
uint32_t tmp = strtol(str, pEnd, radix);
#if defined(DARWIN) || defined(_ALPINE)
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
#endif
return tmp;
}
@ -473,12 +457,6 @@ int16_t taosStr2Int16(const char *str, char **pEnd, int32_t radix) {
int32_t tmp = strtol(str, pEnd, radix);
#if defined(DARWIN) || defined(_ALPINE)
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp >= SHRT_MIN);
ASSERT(tmp <= SHRT_MAX);
#endif
return (int16_t)tmp;
}
@ -487,23 +465,12 @@ uint16_t taosStr2UInt16(const char *str, char **pEnd, int32_t radix) {
uint32_t tmp = strtoul(str, pEnd, radix);
#if defined(DARWIN) || defined(_ALPINE)
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp <= USHRT_MAX);
#endif
return (uint16_t)tmp;
}
int8_t taosStr2Int8(const char *str, char **pEnd, int32_t radix) {
int32_t tmp = strtol(str, pEnd, radix);
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp >= SCHAR_MIN);
ASSERT(tmp <= SCHAR_MAX);
#endif
return tmp;
}
@ -511,33 +478,17 @@ uint8_t taosStr2UInt8(const char *str, char **pEnd, int32_t radix) {
uint32_t tmp = strtoul(str, pEnd, radix);
#if defined(DARWIN) || defined(_ALPINE)
if (errno == EINVAL) errno = 0;
#endif
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp <= UCHAR_MAX);
#endif
return tmp;
}
double taosStr2Double(const char *str, char **pEnd) {
double tmp = strtod(str, pEnd);
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp != HUGE_VAL);
#endif
return tmp;
}
float taosStr2Float(const char *str, char **pEnd) {
float tmp = strtof(str, pEnd);
#ifdef TD_CHECK_STR_TO_INT_ERROR
ASSERT(errno != ERANGE);
ASSERT(errno != EINVAL);
ASSERT(tmp != HUGE_VALF);
ASSERT(tmp != NAN);
#endif
return tmp;
}

View File

@ -287,7 +287,7 @@ void taosGetSystemInfo() {
int32_t taosGetEmail(char *email, int32_t maxLen) {
#ifdef WINDOWS
// ASSERT(0);
return 0;
#elif defined(_TD_DARWIN_64)
#ifdef CUS_PROMPT
const char *filepath = "/usr/local/"CUS_PROMPT"/email";
@ -1040,7 +1040,6 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
char *taosGetCmdlineByPID(int pid) {
#ifdef WINDOWS
ASSERT(0);
return "";
#elif defined(_TD_DARWIN_64)
static char cmdline[1024];

View File

@ -91,7 +91,6 @@ typedef struct FILE TdCmd;
#ifdef BUILD_NO_CALL
void* taosLoadDll(const char* filename) {
#if defined(WINDOWS)
ASSERT(0);
return NULL;
#elif defined(_TD_DARWIN_64)
return NULL;
@ -110,7 +109,6 @@ void* taosLoadDll(const char* filename) {
void* taosLoadSym(void* handle, char* name) {
#if defined(WINDOWS)
ASSERT(0);
return NULL;
#elif defined(_TD_DARWIN_64)
return NULL;
@ -131,7 +129,6 @@ void* taosLoadSym(void* handle, char* name) {
void taosCloseDll(void* handle) {
#if defined(WINDOWS)
ASSERT(0);
return;
#elif defined(_TD_DARWIN_64)
return;

View File

@ -784,8 +784,7 @@ int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) {
int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) {
#ifdef TD_USE_SPINLOCK_AS_MUTEX
ASSERT(pshared == 0);
if (pshared != 0) return -1;
if (pshared != 0) return TSDB_CODE_INVALID_PARA;
return pthread_mutex_init((pthread_mutex_t *)lock, NULL);
#else
int32_t code = pthread_spin_init((pthread_spinlock_t *)lock, pshared);

View File

@ -111,6 +111,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connec
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_BUFFER, "Out of buffer")
TAOS_DEFINE_ERROR(TSDB_CODE_INTERNAL_ERROR, "Internal error")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")

View File

@ -71,8 +71,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t le
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
code = TSDB_CODE_OUT_OF_BUFFER;
QUERY_CHECK_CODE(code, lino, _error);
return code;
}
int32_t size = taosArrayGetSize(pSBf->bfArray);
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
@ -105,8 +104,8 @@ int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len, int3
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
code = TSDB_CODE_OUT_OF_BUFFER;
QUERY_CHECK_CODE(code, lino, _end);
(*winRes) = TSDB_CODE_FAILED;
return code;
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);