other: merge 3.0

This commit is contained in:
Haojun Liao 2024-09-08 22:52:17 +08:00
commit 31ab46f210
57 changed files with 733 additions and 483 deletions

View File

@ -1384,7 +1384,7 @@ SELECT SERVER_VERSION();
SELECT SERVER_STATUS();
```
**Description**: The server status.
**Description**: The server status. When checking the status of a cluster, the recommended way is to use `SHOW CLUSTER ALIVE;`. Unlike `SELECT SERVER_STATUS();`, it does not return an error when some nodes in the cluster are unavailable; instead, it returns different status codes. Plese check [SHOW CLUSTER ALIVE](https://docs.tdengine.com/reference/taos-sql/show/#show-cluster-alive) for details.
### CURRENT_USER

View File

@ -1374,7 +1374,7 @@ SELECT SERVER_VERSION();
SELECT SERVER_STATUS();
```
**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。
**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。如果想要查询集群的状态,推荐使用 `SHOW CLUSTER ALIVE;`, 与 `SELECT SERVER_STATUS();` 不同,当集群中的部分节点不可用时,它不会返回错误,而是返回不同的状态码,详见:[SHOW CLUSTER ALIVE](https://docs.taosdata.com/reference/taos-sql/show/#show-cluster-alive)
### CURRENT_USER

View File

@ -41,7 +41,7 @@ typedef struct SBlockOrderInfo {
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
#define QRY_OPTR_CHECK(_o) \
#define QRY_PARAM_CHECK(_o) \
do { \
if ((_o) == NULL) { \
return TSDB_CODE_INVALID_PARA; \

View File

@ -687,7 +687,9 @@ void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->msgBuf);
doFreeReqResultInfo(&pRequest->body.resInfo);
(void)tsem_destroy(&pRequest->body.rspSem);
if (TSDB_CODE_SUCCESS != tsem_destroy(&pRequest->body.rspSem)) {
tscError("failed to destroy semaphore");
}
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->targetTableList);
@ -700,7 +702,9 @@ void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->pDb);
taosArrayDestroy(pRequest->dbList);
if (pRequest->body.interParam) {
(void)tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem);
if (TSDB_CODE_SUCCESS != tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem)) {
tscError("failed to destroy semaphore in pRequest");
}
}
taosMemoryFree(pRequest->body.interParam);

View File

@ -1636,7 +1636,9 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (pReq) {
tFreeClientHbReq(pReq);
(void)taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (TSDB_CODE_SUCCESS != taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey))) {
tscError("failed to remove connKey from activeInfo");
}
taosHashRelease(pAppHbMgr->activeInfo, pReq);
(void)atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}

View File

@ -209,7 +209,9 @@ _return:
void freeQueryParam(SSyncQueryParam* param) {
if (param == NULL) return;
(void)tsem_destroy(&param->sem);
if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
tscError("failed to destroy semaphore in freeQueryParam");
}
taosMemoryFree(param);
}
@ -351,7 +353,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
// int64_t transporterId = 0;
TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
(void)tsem_wait(&pRequest->body.rspSem);
TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
return TSDB_CODE_SUCCESS;
}
@ -1165,14 +1167,18 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d,QID:0x%" PRIx64, pRequest->self,
tstrerror(code), pRequest->retry, pRequest->requestId);
(void)removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
}
restartAsyncQuery(pRequest, code);
return;
}
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
(void)removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
}
}
pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
@ -1266,7 +1272,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
(void)removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
code = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
if (TSDB_CODE_SUCCESS != code) {
tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
}
}
if (TSDB_CODE_SUCCESS == code) {
@ -1479,12 +1488,12 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
continue;
}
(void)tNameGetFullDbName(pViewName, dbFName);
(void)catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0);
TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
}
} else {
for (int32_t i = 0; i < tbNum; ++i) {
SName* pTbName = taosArrayGet(tbList, i);
(void)catalogRemoveTableMeta(pCatalog, pTbName);
TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
}
}
@ -1584,7 +1593,11 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
tscError("failed to send connect msg to server, code:%s", tstrerror(code));
return code;
}
(void)tsem_wait(&pRequest->body.rspSem);
if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
destroyTscObj(*pTscObj);
tscError("failed to wait sem, code:%s", terrstr());
return terrno;
}
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
tscError("failed to connect to server, reason: %s", errorMsg);
@ -1736,7 +1749,9 @@ int32_t doProcessMsgFromServer(void* param) {
tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
pRequest->self, pSendInfo->requestObjRefId);
(void)taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
tscError("doProcessMsgFromServer taosReleaseRef failed");
}
taosMemoryFree(arg->pEpset);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
@ -1769,7 +1784,12 @@ int32_t doProcessMsgFromServer(void* param) {
(void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
if (pTscObj) {
(void)taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
if (TSDB_CODE_SUCCESS != code) {
tscError("doProcessMsgFromServer taosReleaseRef failed");
terrno = code;
pMsg->code = code;
}
}
rpcFreeCont(pMsg->pCont);
@ -1946,7 +1966,9 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
tsem_t* sem = param;
(void)tsem_post(sem);
if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
tscError("failed to post sem, code:%s", terrstr());
}
}
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
@ -1965,10 +1987,16 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
// convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4;
tsem_t sem;
(void)tsem_init(&sem, 0, 0);
if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
tscError("failed to init sem, code:%s", terrstr());
}
taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
(void)tsem_wait(&sem);
(void)tsem_destroy(&sem);
if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
tscError("failed to wait sem, code:%s", terrstr());
}
if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
tscError("failed to destroy sem, code:%s", terrstr());
}
pRequest->inCallback = false;
}
@ -2742,7 +2770,9 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
SSyncQueryParam* pParam = param;
pParam->pRequest->code = code;
(void)tsem_post(&pParam->sem);
if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
tscError("failed to post semaphore since %s", tstrerror(terrno));
}
}
void syncQueryFn(void* param, void* res, int32_t code) {
@ -2753,7 +2783,9 @@ void syncQueryFn(void* param, void* res, int32_t code) {
pParam->pRequest->code = code;
}
(void)tsem_post(&pParam->sem);
if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
tscError("failed to post semaphore since %s", tstrerror(terrno));
}
}
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
@ -2831,10 +2863,20 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
(void)tsem_init(&param->sem, 0, 0);
int32_t code = tsem_init(&param->sem, 0, 0);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
(void)tsem_wait(&param->sem);
code = tsem_wait(&param->sem);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}
SRequestObj* pRequest = NULL;
if (param->pRequest != NULL) {
@ -2860,10 +2902,20 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
(void)tsem_init(&param->sem, 0, 0);
int32_t code = tsem_init(&param->sem, 0, 0);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
(void)tsem_wait(&param->sem);
code = tsem_wait(&param->sem);
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
taosMemoryFree(param);
return NULL;
}
SRequestObj* pRequest = NULL;
if (param->pRequest != NULL) {
param->pRequest->syncQuery = true;

View File

@ -312,7 +312,9 @@ void taos_close_internal(void *taos) {
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
(void)taosRemoveRef(clientConnRefPool, pTscObj->id);
if (TSDB_CODE_SUCCESS != taosRemoveRef(clientConnRefPool, pTscObj->id)) {
tscError("0x%" PRIx64 " failed to remove ref from conn pool", pTscObj->id);
}
}
void taos_close(TAOS *taos) {
@ -1313,7 +1315,10 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
(void)refreshMeta(pRequest->pTscObj, pRequest); // ignore return code,try again
if (TSDB_CODE_SUCCESS != refreshMeta(pRequest->pTscObj, pRequest)) {
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId);
}
pRequest->prevCode = code;
doAsyncQuery(pRequest, true);
return;
@ -1614,8 +1619,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
}
SSyncQueryParam *pParam = pRequest->body.interParam;
(void)tsem_wait(&pParam->sem);
code = tsem_wait(&pParam->sem);
if (code) {
tscError("tsem wait failed, code:%d - %s", code, tstrerror(code));
goto _return;
}
_return:
destoryCatalogReq(&catalogReq);
destroyRequest(pRequest);

View File

@ -849,7 +849,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount, SSDataBlock** pResBlock) {
int32_t code = 0;
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
return TSDB_CODE_INVALID_PARA;
@ -1263,18 +1263,19 @@ static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBloc
}
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
*ppCols = NULL;
int32_t code = 0;
int32_t rows = pDataBlock->info.capacity;
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t i = 0;
*ppCols = NULL;
SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
if (pCols == NULL) {
return terrno;
}
for (int32_t i = 0; i < numOfCols; ++i) {
for (i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
if (pColInfoData == NULL) {
continue;
@ -1309,6 +1310,10 @@ static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoD
return code;
_error:
for(int32_t j = 0; j < i; ++j) {
colDataDestroy(&pCols[j]);
}
taosMemoryFree(pCols);
return code;
}
@ -1753,7 +1758,7 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
}
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
QRY_OPTR_CHECK(pBlock);
QRY_PARAM_CHECK(pBlock);
int32_t code = 0;
SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
@ -1846,7 +1851,7 @@ _err:
}
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
@ -1946,7 +1951,7 @@ _end:
}
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
@ -2029,7 +2034,7 @@ int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataB
}
int32_t createDataBlock(SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
return terrno;
@ -2080,7 +2085,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData) {
int32_t code = 0;
QRY_OPTR_CHECK(pColInfoData);
QRY_PARAM_CHECK(pColInfoData);
if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
return TSDB_CODE_INVALID_PARA;
@ -2402,10 +2407,18 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
} else {
buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
}
if(buf == NULL) {
uError("failed to decode null bitmap/offset, type:%d", data.info.type);
goto _error;
}
int32_t len = 0;
buf = taosDecodeFixedI32(buf, &len);
buf = taosDecodeBinary(buf, (void**)&data.pData, len);
if (buf == NULL) {
uError("failed to decode data, type:%d", data.info.type);
goto _error;
}
if (IS_VAR_DATA_TYPE(data.info.type)) {
data.varmeta.length = len;
data.varmeta.allocLen = len;
@ -2418,6 +2431,15 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
}
return (void*)buf;
_error:
for (int32_t i = 0; i < sz; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
if (pColInfoData == NULL) {
break;
}
colDataDestroy(pColInfoData);
}
return NULL;
}
static char* formatTimestamp(char* buf, int64_t val, int precision) {
@ -2854,7 +2876,7 @@ bool alreadyAddGroupId(char* ctbName, int64_t groupId) {
}
int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char** pName) {
QRY_OPTR_CHECK(pName);
QRY_PARAM_CHECK(pName);
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (!pBuf) {

View File

@ -147,12 +147,14 @@ static void dmSetSignalHandle() {
(void)taosSetSignal(SIGQUIT, dmStopDnode);
#endif
#if 0
#ifndef WINDOWS
(void)taosSetSignal(SIGBUS, dmLogCrash);
#endif
(void)taosSetSignal(SIGABRT, dmLogCrash);
(void)taosSetSignal(SIGFPE, dmLogCrash);
(void)taosSetSignal(SIGSEGV, dmLogCrash);
#endif
}
static int32_t dmParseArgs(int32_t argc, char const *argv[]) {

View File

@ -1859,3 +1859,73 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
}
return code;
}
uint32_t seed = 0;
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
return rpcMsg;
}
rpcMsg.info.traceId.rootId = traceId;
rpcMsg.info.notFreeAhandle = 1;
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
return rpcMsg;
}
void streamTransRandomErrorGen(STransAction *pAction, STrans *pTrans, int64_t signature) {
if ((pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT && pAction->id > 2) ||
(pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) ||
(pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE && pAction->id > 2)) {
if (seed == 0) {
seed = taosGetTimestampSec();
}
uint32_t v = taosRandR(&seed);
int32_t choseItem = v % 5;
if (choseItem == 0) {
// 1. one of update-checkpoint not send, restart and send it again
taosMsleep(5000);
if (pAction->msgType == TDMT_STREAM_TASK_UPDATE_CHKPT) {
mError(
"***sleep 5s and core dump, following tasks will not recv update-checkpoint info, so the checkpoint will "
"rollback***");
exit(-1);
} else if (pAction->msgType == TDMT_STREAM_CONSEN_CHKPT) { // pAction->msgType == TDMT_STREAM_CONSEN_CHKPT
mError(
"***sleep 5s and core dump, following tasks will not recv consen-checkpoint info, so the tasks will "
"not started***");
} else { // pAction->msgType == TDMT_VND_STREAM_CHECK_POINT_SOURCE
mError(
"***sleep 5s and core dump, following tasks will not recv checkpoint-source info, so the tasks will "
"started after restart***");
exit(-1);
}
} else if (choseItem == 1) {
// 2. repeat send update chkpt msg
mError("***repeat send update-checkpoint/consensus/checkpoint trans msg 3times to vnode***");
mError("***repeat 1***");
SRpcMsg rpcMsg1 = createRpcMsg(pAction, pTrans->mTraceId, signature);
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg1);
mError("***repeat 2***");
SRpcMsg rpcMsg2 = createRpcMsg(pAction, pTrans->mTraceId, signature);
code = tmsgSendReq(&pAction->epSet, &rpcMsg2);
mError("***repeat 3***");
SRpcMsg rpcMsg3 = createRpcMsg(pAction, pTrans->mTraceId, signature);
code = tmsgSendReq(&pAction->epSet, &rpcMsg3);
} else if (choseItem == 2) {
// 3. sleep 40s and then send msg
mError("***idle for 30s, and then send msg***");
taosMsleep(30000);
} else {
// do nothing
// mInfo("no error triggered");
}
}
}

View File

@ -2158,6 +2158,11 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
uint64_t uid = uidList[j];
STableLoadInfo *pInfo = getTableLoadInfo(pReader, uid);
if (!pInfo) {
(void)tTombBlockDestroy(&block);
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
}
@ -2200,8 +2205,18 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
if (newTable) {
pInfo = getTableLoadInfo(pReader, uid);
if (!pInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
finished = true;
break;
}
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
if (!pInfo->pTombData) {
code = TSDB_CODE_OUT_OF_MEMORY;
finished = true;
break;
}
}
}
@ -2998,6 +3013,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
uint64_t uid = pIter->idx.uid;
STableLoadInfo *pInfo = getTableLoadInfo(pIter->pr, uid);
TSDB_CHECK_NULL(pInfo, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
if (pInfo->pTombData == NULL) {
pInfo->pTombData = taosArrayInit(4, sizeof(SDelData));
TSDB_CHECK_NULL(pInfo->pTombData, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);

View File

@ -50,7 +50,11 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl
}
code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
if (code) return code;
if (code) {
(void)tsdbSttLvlClear(lvl);
taosMemoryFree(fobj);
return code;
}
}
return 0;
}

View File

@ -379,7 +379,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
int32_t lino = 0;
void *px = NULL;
int32_t startIndex = 0;
double el = 0;
int32_t ret = 0;
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
if (numOfBlocks <= 0) {
@ -409,9 +409,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
for (int32_t k = startIndex; k < endIndex; ++k) {
code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _end);
int32_t i = 0;
int32_t rows = block.numOfRecords;
@ -539,16 +537,15 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
}
_end:
el = (taosGetTimestampUs() - st) / 1000.0;
pBlockLoadInfo->cost.statisElapsedTime += el;
ret = tStatisBlockDestroy(&block);
if (code != 0) {
tsdbError("%s failed to load block data statistics, %s at line:%d, code:%s", id, __func__, lino, tstrerror(code));
tsdbError("%s error happens at:%s line number: %d, code:%s", id, __func__, lino, tstrerror(code));
} else {
double el = (taosGetTimestampUs() - st) / 1000.0;
pBlockLoadInfo->cost.statisElapsedTime += el;
tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
}
int32_t ret = tStatisBlockDestroy(&block);
return code;
}
@ -964,6 +961,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
pMTree->pIter = NULL;
pMTree->backward = pConf->backward;
pMTree->idStr = pConf->idstr;
int32_t lino = 0;
if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
@ -1038,9 +1036,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
// let's record the time window for current table of uid in the stt files
if (pSttDataInfo != NULL && numOfRows > 0) {
void *px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
if (px == NULL) {
return terrno;
}
QUERY_CHECK_NULL(px, code, lino, _end, terrno);
pSttDataInfo->numOfRows += numOfRows;
}
} else {

View File

@ -403,7 +403,7 @@ static void initReaderStatus(SReaderStatus* pStatus) {
}
static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);

View File

@ -396,15 +396,21 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
switch (req->msgType) {
case TDMT_VND_TABLE_META:
// error code has been set into reqMsg, no need to handle it here.
(void)vnodeGetTableMeta(pVnode, &reqMsg, false);
if (TSDB_CODE_SUCCESS != vnodeGetTableMeta(pVnode, &reqMsg, false)) {
qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
}
break;
case TDMT_VND_TABLE_CFG:
// error code has been set into reqMsg, no need to handle it here.
(void)vnodeGetTableCfg(pVnode, &reqMsg, false);
if (TSDB_CODE_SUCCESS != vnodeGetTableCfg(pVnode, &reqMsg, false)) {
qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
}
break;
case TDMT_VND_GET_STREAM_PROGRESS:
// error code has been set into reqMsg, no need to handle it here.
(void)vnodeGetStreamProgress(pVnode, &reqMsg, false);
if (TSDB_CODE_SUCCESS != vnodeGetStreamProgress(pVnode, &reqMsg, false)) {
qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
}
break;
default:
qError("invalid req msgType %d", req->msgType);

View File

@ -78,7 +78,7 @@ static int32_t getSchemaBytes(const SSchema* pSchema) {
}
static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -236,7 +236,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
static int32_t execResetQueryCache() { return catalogClearCache(); }
static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -475,7 +475,7 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
}
static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -499,7 +499,7 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
}
static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
@ -929,7 +929,7 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
}
static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) {
QRY_OPTR_CHECK(pOutput);
QRY_PARAM_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);

View File

@ -70,7 +70,7 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t lino = 0;
int32_t code = 0;

View File

@ -97,7 +97,7 @@ _end:
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -295,7 +295,7 @@ _end:
int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -204,6 +204,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
_return:
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
return code;
}

View File

@ -889,7 +889,7 @@ int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
QRY_OPTR_CHECK(pRes);
QRY_PARAM_CHECK(pRes);
if (pOperator->status == OP_EXEC_DONE) {
return code;
}
@ -958,7 +958,7 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
__optr_fn_t nextFp = NULL;

View File

@ -61,7 +61,7 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -401,7 +401,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1924,7 +1924,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
}
int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** pExprInfo, int32_t* numOfExprs) {
QRY_OPTR_CHECK(pExprInfo);
QRY_PARAM_CHECK(pExprInfo);
int32_t code = 0;
int32_t numOfFuncs = LIST_LENGTH(pNodeList);

View File

@ -1280,7 +1280,7 @@ void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType
FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
int32_t code = 0;
if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {

View File

@ -423,7 +423,7 @@ static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiN
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1444,7 +1444,7 @@ static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam*
int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));

View File

@ -353,38 +353,6 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (!hasRemainResults(&pInfo->groupResInfo)) {
setOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
return (pRes->info.rows == 0) ? NULL : pRes;
}
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable;
@ -463,25 +431,23 @@ _end:
}
static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
int32_t order = pInfo->binfo.inputTsOrder;
int64_t st = taosGetTimestampUs();
QRY_PARAM_CHECK(ppRes);
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
return code;
}
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
int32_t order = pInfo->binfo.inputTsOrder;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
@ -511,10 +477,12 @@ static int32_t hashGroupbyAggregateNext(SOperatorInfo* pOperator, SSDataBlock**
if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows);
}
if (pGroupResInfo->pBuf) {
taosMemoryFree(pGroupResInfo->pBuf);
pGroupResInfo->pBuf = NULL;
}
pGroupResInfo->index = 0;
pGroupResInfo->iter = 0;
pGroupResInfo->dataPos = NULL;
@ -525,15 +493,16 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} else {
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
}
(*ppRes) = buildGroupResultDataBlockByHash(pOperator);
return code;
}
int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1127,7 +1096,7 @@ static void destroyPartitionOperatorInfo(void* param) {
int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1668,7 +1637,7 @@ void freePartItem(void* ptr) {
int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -996,7 +996,7 @@ static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** p
SSDataBlock* pRes = pJoin->finBlk;
int64_t st = 0;
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
}
@ -1182,7 +1182,7 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));

View File

@ -1867,7 +1867,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t oldNum = numOfDownstream;
bool newDownstreams = false;

View File

@ -336,7 +336,7 @@ int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
}
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
@ -419,7 +419,7 @@ int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock
}
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
@ -499,7 +499,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
}
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return 0;
@ -556,7 +556,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai
int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode;
int32_t lino = 0;

View File

@ -180,7 +180,7 @@ ERetType extractOperatorInfo(SOperatorInfo* pOperator, STraverParam* pParam, con
// QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
if (pOperator == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id);
@ -282,7 +282,7 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SSto
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t type = nodeType(pPhyNode);
@ -878,7 +878,7 @@ SSDataBlock* getNextBlockFromDownstreamRemain(struct SOperatorInfo* pOperator, i
}
int32_t optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SSDataBlock** pRes) {
QRY_OPTR_CHECK(pRes);
QRY_PARAM_CHECK(pRes);
int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -93,7 +93,7 @@ void streamOperatorReloadState(SOperatorInfo* pOperator) {
int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
@ -262,7 +262,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
}
int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
@ -441,7 +441,7 @@ int32_t doProjectOperation(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
int32_t numOfRows = 4096;
@ -572,7 +572,7 @@ SSDataBlock* doApplyIndefinitFunction1(SOperatorInfo* pOperator) {
}
int32_t doApplyIndefinitFunction(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SIndefOperatorInfo* pIndefInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
@ -717,7 +717,7 @@ int32_t setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
}
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList) {
QRY_OPTR_CHECK(pResList);
QRY_PARAM_CHECK(pResList);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
if (pList == NULL) {
return terrno;

View File

@ -67,7 +67,7 @@ typedef struct STableCountScanOperatorInfo {
SArray* stbUidList; // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
@ -937,13 +937,13 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false;
int64_t st = taosGetTimestampUs();
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false;
QRY_PARAM_CHECK(ppRes);
pBlock->info.dataLoad = false;
int64_t st = taosGetTimestampUs();
while (true) {
code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
if (code != TSDB_CODE_SUCCESS) {
@ -957,7 +957,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
return pTaskInfo->code;
}
if (pOperator->status == OP_EXEC_DONE) {
@ -993,6 +993,7 @@ static int32_t doTableScanImplNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
pBlock->info.scanFlag = pTableScanInfo->base.scanFlag;
(*ppRes) = pBlock;
return code;
}
@ -1001,9 +1002,7 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
}
@ -1014,7 +1013,7 @@ static int32_t doGroupedTableScan(SOperatorInfo* pOperator, SSDataBlock** pBlock
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
QRY_OPTR_CHECK(pBlock);
QRY_PARAM_CHECK(pBlock);
// The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
@ -1194,7 +1193,7 @@ static int32_t startNextGroupScan(SOperatorInfo* pOperator, SSDataBlock** pResul
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTables = 0;
QRY_OPTR_CHECK(pResult);
QRY_PARAM_CHECK(pResult);
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
@ -1242,7 +1241,7 @@ _end:
return code;
}
static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
static int32_t groupSeqTableScan(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableScanInfo* pInfo = pOperator->info;
@ -1250,12 +1249,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t num = 0;
STableKeyInfo* pList = NULL;
SSDataBlock* result = NULL;
SSDataBlock* pResult = NULL;
QRY_PARAM_CHECK(pResBlock);
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
setOperatorCompleted(pOperator);
return NULL;
return code;
}
taosRLockLatch(&pTaskInfo->lock);
@ -1273,28 +1274,32 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
if (pInfo->filesetDelimited) {
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
}
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
}
}
result = NULL;
code = doGroupedTableScan(pOperator, &result);
pResult = NULL;
code = doGroupedTableScan(pOperator, &pResult);
QUERY_CHECK_CODE(code, lino, _end);
if (result != NULL) {
if (pResult != NULL) {
if (pOperator->dynamicTask) {
result->info.id.groupId = result->info.id.uid;
pResult->info.id.groupId = pResult->info.id.uid;
}
return result;
*pResBlock = pResult;
return code;
}
while (true) {
code = startNextGroupScan(pOperator, &result);
code = startNextGroupScan(pOperator, &pResult);
QUERY_CHECK_CODE(code, lino, _end);
if (result || pOperator->status == OP_EXEC_DONE) {
return result;
if (pResult || pOperator->status == OP_EXEC_DONE) {
*pResBlock = pResult;
return code;
}
}
@ -1302,9 +1307,9 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return result;
return code;
}
static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -1313,7 +1318,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
QRY_OPTR_CHECK(ppRes);
QRY_PARAM_CHECK(ppRes);
if (pOperator->pOperatorGetParam) {
pOperator->dynamicTask = true;
@ -1394,8 +1399,7 @@ static int32_t doTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
pInfo->scanTimes = 0;
}
} else { // scan table group by group sequentially
(*ppRes) = groupSeqTableScan(pOperator);
return code;
code = groupSeqTableScan(pOperator, ppRes);
}
_end:
@ -1447,7 +1451,7 @@ static void destroyTableScanOperatorInfo(void* param) {
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1547,7 +1551,7 @@ _error:
}
int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
@ -3704,7 +3708,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pInfo->dataReader && hasNext) {
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
return code;
}
SSDataBlock* pBlock = NULL;
@ -3806,6 +3810,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
}
(*ppRes) = NULL;
return code;
}
@ -3814,8 +3819,8 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
(*ppRes) = NULL;
return code;
}
@ -3837,7 +3842,7 @@ int32_t createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo
// create meta reader
// create tq reader
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4029,7 +4034,7 @@ _end:
int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4262,7 +4267,7 @@ _error:
return code;
}
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
SStorageAPI* pAPI) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4276,7 +4281,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
GET_TASKID(pTaskInfo));
tDecoderClear(&(*mr).coder);
pAPI->metaReaderFn.clearReader(mr);
T_LONG_JMP(pTaskInfo->env, terrno);
goto _end;
}
code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
@ -4285,7 +4290,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
pAPI->metaReaderFn.clearReader(mr);
T_LONG_JMP(pTaskInfo->env, terrno);
goto _end;
}
char str[512];
@ -4314,12 +4319,13 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
} else {
data = (char*)p;
}
code = colDataSetVal(pDst, (count), data,
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
QUERY_CHECK_CODE(code, lino, _end);
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
data != NULL) {
if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
(data != NULL)) {
taosMemoryFree(data);
}
}
@ -4329,8 +4335,9 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
static void tagScanFreeUidTag(void* p) {
@ -4558,10 +4565,6 @@ _end:
}
static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4569,6 +4572,12 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
STagScanInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
QRY_PARAM_CHECK(ppRes);
if (pOperator->status == OP_EXEC_DONE) {
return TSDB_CODE_SUCCESS;
}
blockDataCleanup(pRes);
if (pInfo->pCtbCursor == NULL) {
@ -4635,28 +4644,23 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
pInfo->pCtbCursor = NULL;
setOperatorCompleted(pOperator);
}
pRes->info.rows = count;
pRes->info.rows = count;
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
if (bLimitReached) {
setOperatorCompleted(pOperator);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
(*ppRes) = (pRes->info.rows == 0) ? NULL : pInfo->pRes;
return code;
}
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTagScanFromCtbIdxNext(pOperator, &pRes);
return pRes;
return code;
}
static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -4687,18 +4691,18 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
return code;
}
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
++count;
if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator);
}
}
pRes->info.rows = count;
pAPI->metaReaderFn.clearReader(&mr);
@ -4706,6 +4710,7 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
if (bLimitReached) {
setOperatorCompleted(pOperator);
}
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
if (pOperator->status == OP_EXEC_DONE) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
@ -4745,7 +4750,7 @@ static void destroyTagScanOperatorInfo(void* param) {
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4913,7 +4918,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock,
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
}
@ -4921,18 +4926,20 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
while (true) {
bool hasNext = false;
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
return code;
}
*pSubTableHasBlock = false;
break;
}
@ -4947,8 +4954,9 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
if (code != 0) {
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
*pSubTableHasBlock = false;
break;
@ -5215,49 +5223,56 @@ _end:
return code;
}
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
static int32_t getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t capacity, SSDataBlock** pResBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
bool finished = false;
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pBlock);
blockDataCleanup(pResBlock);
bool finished = false;
while (true) {
while (1) {
while (true) {
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
finished = true;
break;
}
code = appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
code = appendChosenRowToDataBlock(pSubTblsInfo, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = adjustSubTableForNextRow(pOperator, pSubTblsInfo);
QUERY_CHECK_CODE(code, lino, _end);
if (pResBlock->info.rows >= capacity) {
if (pBlock->info.rows >= capacity) {
break;
}
}
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code);
return pTaskInfo->code;
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
if (finished || limitReached || pResBlock->info.rows > 0) {
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
if (finished || limitReached || pBlock->info.rows > 0) {
break;
}
}
if (pBlock->info.rows > 0) {
*pResBlock = pBlock;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
return code;
}
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
@ -5307,23 +5322,22 @@ static void stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
}
int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return TSDB_CODE_SUCCESS;
}
QRY_PARAM_CHECK(ppRes);
int32_t lino = 0;
int32_t tableListSize = 0;
int64_t st = taosGetTimestampUs();
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
int32_t lino = 0;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
if (pOperator->status == OP_EXEC_DONE) {
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampUs();
int32_t code = pOperator->fpSet._openFn(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
int32_t tableListSize = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &tableListSize);
QUERY_CHECK_CODE(code, lino, _end);
@ -5335,6 +5349,7 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
(*ppRes) = NULL;
return code;
}
pInfo->tableStartIndex = 0;
STableKeyInfo* pTmpGpId = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(pTmpGpId, code, lino, _end, terrno);
@ -5346,15 +5361,19 @@ int32_t doTableMergeScanParaSubTablesNext(SOperatorInfo* pOperator, SSDataBlock*
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
break;
}
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
code = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity, &pBlock);
QUERY_CHECK_CODE(code, lino, _end);
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
QUERY_CHECK_NULL(tbInfo, code, lino, _end, terrno);
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
}
if (pBlock != NULL) {
pBlock->info.id.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
@ -5385,16 +5404,11 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} else {
(*ppRes) = pBlock;
}
(*ppRes) = pBlock;
return code;
}
static SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL;
int32_t code = doTableMergeScanParaSubTablesNext(pOperator, &pRes);
return pRes;
return code;
}
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeOpInfo) {
@ -5426,7 +5440,7 @@ _end:
}
}
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
static int32_t doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
@ -5439,7 +5453,8 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
pTaskInfo->code = code;
return code;
}
if (!hasNext || isTaskKilled(pTaskInfo)) {
@ -5448,7 +5463,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
}
*pFinished = true;
return;
return code;
}
uint32_t status = 0;
@ -5456,21 +5471,22 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
pTaskInfo->code = code;
return code;
}
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
*pFinished = true;
return;
return code;
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
*pSkipped = true;
return;
return code;
}
return;
return code;
}
static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
@ -5481,7 +5497,7 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = NULL;
int64_t st = taosGetTimestampUs();
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
while (true) {
if (pInfo->rtnNextDurationBlocks) {
@ -5505,7 +5521,12 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
} else {
bool bFinished = false;
bool bSkipped = false;
doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
code = doGetBlockForTableMergeScan(pOperator, &bFinished, &bSkipped);
if (code != 0) {
return code;
}
pBlock = pInfo->pReaderBlock;
qDebug("%s table merge scan fetch block. finished %d skipped %d next-duration-block %d new-fileset %d",
GET_TASKID(pTaskInfo), bFinished, bSkipped, pInfo->bNextDurationBlockEvent, pInfo->bNewFilesetEvent);
@ -5518,7 +5539,6 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
if (!bSkipped) {
code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]);
if (code) {
terrno = code;
*ppBlock = NULL;
return code;
}
@ -5657,7 +5677,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
param->pOperator = pOperator;
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
QUERY_CHECK_NULL(ps, code, lino, _end, terrno);
if (ps == NULL) {
taosMemoryFree(param);
QUERY_CHECK_NULL(ps, code, lino, _end, terrno);
}
ps->param = param;
ps->onlyRef = false;
code = tsortAddSource(pInfo->pSortHandle, ps);
@ -5852,7 +5876,7 @@ int32_t doTableMergeScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
goto _end;
}
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
@ -5896,9 +5920,10 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} else {
(*ppRes) = pBlock;
}
(*ppRes) = pBlock;
return code;
}
@ -5971,7 +5996,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6106,11 +6131,11 @@ static int32_t buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo*
SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
@ -6213,7 +6238,7 @@ int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList*
int32_t createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6396,23 +6421,29 @@ static int32_t doTableCountScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
STableCountScanOperatorInfo* pInfo = pOperator->info;
STableCountScanSupp* pSupp = &pInfo->supp;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
QRY_PARAM_CHECK(ppRes);
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return code;
}
if (pInfo->readHandle.mnd != NULL) {
(*ppRes) = buildSysDbTableCount(pOperator, pInfo);
return code;
}
(*ppRes) = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
code = buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
if ((pRes->info.rows > 0) && (code == 0)) {
*ppRes = pRes;
}
return code;
}
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
static int32_t buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
const char* db = NULL;
@ -6424,27 +6455,29 @@ static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCount
// get dbname
pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
SName sn = {0};
code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
QUERY_CHECK_CODE(code, lino, _end);
code = tNameGetDbName(&sn, dbName);
QUERY_CHECK_CODE(code, lino, _end);
if (pSupp->groupByDbName || pSupp->groupByStbName) {
buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
code = buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
} else {
buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
code = buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return pRes->info.rows > 0 ? pRes : NULL;
return code;
}
static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6458,6 +6491,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
code = pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList);
QUERY_CHECK_CODE(code, lino, _end);
}
if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
code = buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
@ -6487,11 +6521,11 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
static int32_t buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -6527,9 +6561,10 @@ _end:
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
setOperatorCompleted(pOperator);
return code;
}
static int32_t buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,

View File

@ -55,7 +55,7 @@ static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
// todo add limit/offset impl
int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
@ -252,7 +252,7 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t lino = 0;
int32_t code = 0;
@ -355,25 +355,26 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = NULL;
int32_t code = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str,
pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
int32_t code =
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, pInfo->maxRows,
pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024, &pInfo->pSortHandle);
if (code) {
return code;
}
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
if (ps == NULL) {
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
return terrno;
}
ps->param = pOperator->pDownstream[0];
ps->onlyRef = true;
pSource->param = pOperator->pDownstream[0];
pSource->onlyRef = true;
code = tsortAddSource(pInfo->pSortHandle, ps);
code = tsortAddSource(pInfo->pSortHandle, pSource);
if (code) {
taosMemoryFree(ps);
taosMemoryFree(pSource);
return code;
}
@ -390,7 +391,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
}
int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return 0;
}
@ -400,7 +401,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
// multi-group case not handle here
@ -408,7 +409,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
while (1) {
if (tsortIsClosed(pInfo->pSortHandle)) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
@ -516,7 +517,7 @@ typedef struct SGroupSortOperatorInfo {
int32_t getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
SGroupSortOperatorInfo* pInfo, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
blockDataCleanup(pDataBlock);
int32_t code = blockDataEnsureCapacity(pDataBlock, capacity);
@ -598,7 +599,7 @@ int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
SSDataBlock* block = NULL;
QRY_OPTR_CHECK(ppBlock);
QRY_PARAM_CHECK(ppBlock);
if (grpSortOpInfo->prefetchedSortInput) {
block = grpSortOpInfo->prefetchedSortInput;
@ -648,23 +649,22 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
if (ps == NULL || param == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
taosMemoryFree(ps);
taosMemoryFree(param);
return terrno;
}
param->childOpInfo = pOperator->pDownstream[0];
param->grpSortOpInfo = pInfo;
ps->param = param;
ps->onlyRef = false;
code = tsortAddSource(pInfo->pCurrSortHandle, ps);
if (code) {
T_LONG_JMP(pTaskInfo->env, code);
if (code != 0) {
return code;
}
code = tsortOpen(pInfo->pCurrSortHandle);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
@ -686,7 +686,7 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) {
}
int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
QRY_PARAM_CHECK(pResBlock);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupSortOperatorInfo* pInfo = pOperator->info;
@ -696,7 +696,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
if (!pInfo->hasGroupId) {
@ -720,15 +720,14 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
while (pInfo->pCurrSortHandle != NULL) {
if (tsortIsClosed(pInfo->pCurrSortHandle)) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
pOperator->pTaskInfo->code = code;
pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pOperator->pTaskInfo->env, code);
return code;
}
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
@ -777,7 +776,7 @@ void destroyGroupSortOperatorInfo(void* param) {
int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -807,7 +807,7 @@ _end:
int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;

View File

@ -858,7 +858,7 @@ _end:
int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode;
int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId;

View File

@ -1349,7 +1349,7 @@ _end:
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -1886,7 +1886,7 @@ _end:
int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -3754,7 +3754,7 @@ _end:
int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
@ -4081,7 +4081,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -4931,7 +4931,7 @@ _end:
int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;
@ -5248,7 +5248,7 @@ _end:
int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -2233,7 +2233,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -2864,7 +2864,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
int32_t createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1104,7 +1104,7 @@ static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn
}
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = 0;
int32_t lino = 0;

View File

@ -1320,7 +1320,7 @@ _end:
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1616,7 +1616,7 @@ _end:
// todo make this as an non-blocking operator
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -1727,7 +1727,7 @@ void destroySWindowOperatorInfo(void* param) {
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -2027,7 +2027,7 @@ static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -2366,7 +2366,7 @@ _end:
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
QRY_PARAM_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -79,9 +79,8 @@ struct SSortHandle {
bool forceUsePQSort;
BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx;
int64_t mergeLimit;
int64_t currMergeLimitTs;
int64_t mergeLimit;
int64_t currMergeLimitTs;
int32_t sourceId;
SSDataBlock* pDataBlock;
@ -288,7 +287,7 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
int32_t code = 0;
int32_t lino = 0;
QRY_OPTR_CHECK(pHandle);
QRY_PARAM_CHECK(pHandle);
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
QUERY_CHECK_NULL(pSortHandle, code, lino, _err, terrno);
@ -367,7 +366,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
return TSDB_CODE_SUCCESS;
}
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
void tsortClearOrderedSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
if (NULL == *pSource) {
@ -413,10 +412,11 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
destroyDiskbasedBuf(pSortHandle->pBuf);
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
int64_t fetchUs = 0, fetchNum = 0;
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
taosArrayDestroy(pSortHandle->pOrderedSource);
@ -465,7 +465,13 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
int32_t code = blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
if (code != 0) {
qError("sort failed at: %s:%d", __func__, __LINE__);
taosArrayDestroy(pPageIdList);
}
return code;
}
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
@ -935,9 +941,99 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
}
}
}
return 0;
}
static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32_t numOfSorted,
int32_t numOfInputSources, SArray* pResList, int32_t sortGroup, int32_t numOfRows) {
int32_t code = 0;
int32_t lino = 0;
SArray* pPageIdList = NULL;
for (int32_t i = 0; i < sortGroup; ++i) {
qDebug("internal merge sort pass %d group %d. num input sources %d ", sortTimes, i, numOfInputSources);
pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
if (end > numOfSorted - 1) {
end = numOfSorted - 1;
}
pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
QUERY_CHECK_CODE(code, lino, _err);
code =
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
QUERY_CHECK_CODE(code, lino, _err);
int32_t nMergedRows = 0;
pPageIdList = taosArrayInit(4, sizeof(int32_t));
QUERY_CHECK_NULL(pPageIdList, code, lino, _err, terrno);
while (1) {
if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
goto _err;
}
SSDataBlock* pDataBlock = NULL;
code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
if (pDataBlock == NULL || code != 0) {
break;
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
QUERY_CHECK_NULL(pPage, code, lino, _err, terrno);
void* px = taosArrayPush(pPageIdList, &pageId);
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
int32_t size =
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
if (size > getBufPageSize(pHandle->pBuf)) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _err;
}
code = blockDataToBuf(pPage, pDataBlock);
QUERY_CHECK_CODE(code, lino, _err);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
nMergedRows += pDataBlock->info.rows;
blockDataCleanup(pDataBlock);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
break;
}
}
code = sortComparCleanup(&pHandle->cmpParam);
QUERY_CHECK_CODE(code, lino, _err);
tMergeTreeDestroy(&pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
QUERY_CHECK_CODE(code, lino, _err);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
QUERY_CHECK_CODE(code, lino, _err);
}
return code;
_err:
taosArrayDestroy(pPageIdList);
qError("%s error happens:%s line:%d, code:%s", pHandle->idStr, __func__, lino, tstrerror(code));
return code;
}
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
if (numOfSources == 0) {
@ -959,8 +1055,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
pHandle->numOfPages);
}
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
int32_t size = (int32_t) blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock));
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, size);
if (numOfRows < 0) {
return terrno;
}
@ -985,117 +1081,22 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;
// Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for (int32_t i = 0; i < sortGroup; ++i) {
qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1;
if (end > numOfSorted - 1) {
end = numOfSorted - 1;
}
pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code;
}
code =
tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code;
}
int32_t nMergedRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
if (pPageIdList == NULL) {
taosArrayDestroy(pResList);
return terrno;
}
while (1) {
if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
return code;
}
SSDataBlock* pDataBlock = NULL;
code = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows, &pDataBlock);
if (pDataBlock == NULL || code != 0) {
break;
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
return terrno;
}
void* px = taosArrayPush(pPageIdList, &pageId);
if (px == NULL) {
taosArrayDestroy(pResList);
taosArrayDestroy(pPageIdList);
return terrno;
}
int32_t size =
blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
if (size > getBufPageSize(pHandle->pBuf)) {
qError("sort failed at: %s:%d", __func__, __LINE__);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
code= blockDataToBuf(pPage, pDataBlock);
if (code) {
return code;
}
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
nMergedRows += pDataBlock->info.rows;
blockDataCleanup(pDataBlock);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
break;
}
}
code = sortComparCleanup(&pHandle->cmpParam);
if (code) {
return code;
}
tMergeTreeDestroy(&pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
if (code) {
taosArrayDestroy(pResList);
return code;
}
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code;
}
code = doSortForEachGroup(pHandle, t, numOfSorted, numOfInputSources, pResList, sortGroup, numOfRows);
if (code != 0) {
tsortClearOrderedSource(pResList, NULL, NULL);
taosArrayDestroy(pResList);
return code;
}
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
tsortClearOrderedSource(pResList, NULL, NULL);
taosArrayDestroy(pResList);
return terrno;
}
taosArrayDestroy(pResList);
numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
int64_t el = taosGetTimestampUs() - st;
@ -2346,7 +2347,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
}
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
if (!tsortIsClosed(pHandle)) {
void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
@ -2378,37 +2379,44 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
return code;
}
static void freeSSortSource(SSortSource* source) {
if (NULL == source) {
static void freeSortSource(SSortSource* pSource) {
if (NULL == pSource) {
return;
}
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
if (!pSource->onlyRef && pSource->param) {
taosMemoryFree(pSource->param);
}
if (!source->onlyRef && source->src.pBlock) {
blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL;
if (!pSource->onlyRef && pSource->src.pBlock) {
blockDataDestroy(pSource->src.pBlock);
pSource->src.pBlock = NULL;
}
taosMemoryFree(source);
taosMemoryFree(pSource);
}
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
if (pSource == NULL) {
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
if (p == NULL) {
return terrno;
}
SSortSource* source = *pSource;
*pSource = NULL;
SSortSource* pSource = *p;
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayRemove(pHandle->pOrderedSource, 0);
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
while (1) {
SSDataBlock* pBlock = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
code = pHandle->fetchfp(pSource->param, &pBlock);
if (code != 0) {
freeSortSource(pSource);
return code;
}
if (pBlock == NULL) {
break;
}
@ -2422,7 +2430,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
if (code) {
freeSSortSource(source);
freeSortSource(pSource);
return code;
}
}
@ -2433,47 +2441,45 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
freeSSortSource(source);
freeSortSource(pSource);
return code;
}
size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
freeSSortSource(source);
freeSortSource(pSource);
return code;
}
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
pHandle->sortElapsed += (taosGetTimestampUs() - st);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
freeSSortSource(source);
freeSortSource(pSource);
return code;
}
}
}
freeSSortSource(source);
freeSortSource(pSource);
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
return code;
}
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
pHandle->sortElapsed += (taosGetTimestampUs() - st);
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize && pHandle->pBuf == NULL) {
@ -2488,6 +2494,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
code = doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
return code;
}
@ -2500,7 +2507,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
code = createBlocksMergeSortInitialSources(pHandle);
}
qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
qDebug("%s %zu sources created", pHandle->idStr, taosArrayGetSize(pHandle->pOrderedSource));
return code;
}

View File

@ -6009,6 +6009,7 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pInfo->buf = taosMemoryMalloc(pInfo->colBytes);
if (NULL == pInfo->buf) {
taosHashCleanup(pInfo->pHash);
pInfo->pHash = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -6017,6 +6018,7 @@ int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
static void modeFunctionCleanup(SModeInfo * pInfo) {
taosHashCleanup(pInfo->pHash);
pInfo->pHash = NULL;
taosMemoryFreeClear(pInfo->buf);
}

View File

@ -431,7 +431,9 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
(*pPartialFunc)->originalFuncId = pSrcFunc->hasOriginalFunc ? pSrcFunc->originalFuncId : pSrcFunc->funcId;
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
(void)taosHashBinary(name, len);
if (taosHashBinary(name, len) < 0) {
return TSDB_CODE_FAILED;
}
(void)strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
(*pPartialFunc)->hasPk = pSrcFunc->hasPk;
(*pPartialFunc)->pkBytes = pSrcFunc->pkBytes;

View File

@ -524,7 +524,12 @@ void udfdDeinitScriptPlugins() {
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
if(decodeUdfRequest(uvUdf->input.base, &request) == NULL) return;
if(decodeUdfRequest(uvUdf->input.base, &request) == NULL)
{
taosMemoryFree(uvUdf->input.base);
fnError("udf request decode failed");
return;
}
switch (request.type) {
case UDF_TASK_SETUP: {

View File

@ -2254,10 +2254,16 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SToken* pFilePath,
SRowsDataContext rowsDataCxt) {
char filePathStr[PATH_MAX] = {0};
char filePathStr[PATH_MAX + 16] = {0};
if (TK_NK_STRING == pFilePath->type) {
(void)trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
if (strlen(filePathStr) >= PATH_MAX) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
}
} else {
if (pFilePath->n >= PATH_MAX) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is too long, max length is 4096", pFilePath->z);
}
strncpy(filePathStr, pFilePath->z, pFilePath->n);
}
pStmt->fp = taosOpenFile(filePathStr, TD_FILE_READ | TD_FILE_STREAM);

View File

@ -1274,6 +1274,16 @@ int32_t filterAddUnitToGroup(SFilterGroup *group, uint32_t unitIdx) {
return TSDB_CODE_SUCCESS;
}
static void filterFreeGroup(void *pItem) {
if (pItem == NULL) {
return;
}
SFilterGroup *p = (SFilterGroup *)pItem;
taosMemoryFreeClear(p->unitIdxs);
taosMemoryFreeClear(p->unitFlags);
}
int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
SOperatorNode *node = (SOperatorNode *)tree;
int32_t ret = TSDB_CODE_SUCCESS;
@ -1336,9 +1346,11 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
SFilterGroup fgroup = {0};
code = filterAddUnitToGroup(&fgroup, uidx);
if (TSDB_CODE_SUCCESS != code) {
filterFreeGroup((void*)&fgroup);
break;
}
if (NULL == taosArrayPush(group, &fgroup)) {
filterFreeGroup((void*)&fgroup);
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
@ -1658,16 +1670,6 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRan
return TSDB_CODE_SUCCESS;
}
static void filterFreeGroup(void *pItem) {
if (pItem == NULL) {
return;
}
SFilterGroup *p = (SFilterGroup *)pItem;
taosMemoryFreeClear(p->unitIdxs);
taosMemoryFreeClear(p->unitFlags);
}
EDealRes fltTreeToGroup(SNode *pNode, void *pContext) {
int32_t code = TSDB_CODE_SUCCESS;
SArray *preGroup = NULL;
@ -2247,7 +2249,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
return TSDB_CODE_SCALAR_CONVERT_ERROR;
}
varDataSetLen(newValData, len);
(void)varDataCopy(fi->data, newValData);
varDataCopy(fi->data, newValData);
}
}
@ -2944,25 +2946,44 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t gResNum
for (int32_t n = 0; n < usize; ++n) {
SFilterUnit *u = (SFilterUnit *)taosArrayGetP((SArray *)colInfo->info, n);
if (NULL == u) {
FLT_ERR_JRET(TSDB_CODE_OUT_OF_RANGE);
code = TSDB_CODE_OUT_OF_RANGE;
break;
}
FLT_ERR_JRET(filterAddUnitFromUnit(info, &oinfo, u, &uidx));
FLT_ERR_JRET(filterAddUnitToGroup(&ng, uidx));
code = filterAddUnitFromUnit(info, &oinfo, u, &uidx);
if (TSDB_CODE_SUCCESS != code) {
break;
}
code = filterAddUnitToGroup(&ng, uidx);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
continue;
}
if (TSDB_CODE_SUCCESS != code) {
filterFreeGroup((void*)&ng);
FLT_ERR_JRET(code);
}
if (colInfo->type != RANGE_TYPE_MR_CTX) {
fltError("filterRewrite get invalid col type : %d", colInfo->type);
FLT_ERR_JRET(TSDB_CODE_QRY_FILTER_INVALID_TYPE);
}
FLT_ERR_JRET(filterAddGroupUnitFromCtx(info, &oinfo, colInfo->info, res->colIdx[m], &ng, optr, group));
code = filterAddGroupUnitFromCtx(info, &oinfo, colInfo->info, res->colIdx[m], &ng, optr, group);
if (TSDB_CODE_SUCCESS != code) {
filterFreeGroup((void*)&ng);
FLT_ERR_JRET(code);
}
}
if (ng.unitNum > 0) {
if (NULL == taosArrayPush(group, &ng)) {
filterFreeGroup((void*)&ng);
FLT_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
@ -4571,7 +4592,7 @@ int32_t filterConverNcharColumns(SFilterInfo *info, int32_t rows, bool *gotNchar
if (k == varSrcLen) {
/* NULL */
varDataLen(dst) = (VarDataLenT)varSrcLen;
(void)varDataCopy(dst, src);
varDataCopy(dst, src);
continue;
}
bool ret = taosMbsToUcs4(varDataVal(src), varDataLen(src), (TdUcs4 *)varDataVal(dst), bufSize, &len);

View File

@ -1585,7 +1585,10 @@ EDealRes sclWalkTarget(SNode *pNode, SScalarCtx *ctx) {
block->info.rows = res->numOfRows;
sclFreeParam(res);
(void)taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
ctx->code = taosHashRemove(ctx->pRes, (void *)&target->pExpr, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != ctx->code) {
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
@ -1811,7 +1814,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
}
sclFreeParam(res);
(void)taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
SCL_ERR_JRET(taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES));
}
_return:

View File

@ -76,7 +76,7 @@ int32_t flttMakeValueNode(SNode **pNode, int32_t dataType, void *value) {
if (NULL == vnode->datum.p) {
FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
(void)varDataCopy(vnode->datum.p, value);
varDataCopy(vnode->datum.p, value);
vnode->node.resType.bytes = varDataLen(value);
} else {
vnode->node.resType.bytes = tDataTypes[dataType].bytes;

View File

@ -120,7 +120,9 @@ int32_t scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t
SCL_ERR_RET(TSDB_CODE_APP_ERROR);
}
(void)taosArrayPush(pBlockList, &res);
if (NULL == taosArrayPush(pBlockList, &res)) {
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*dataBlockId = taosArrayGetSize(pBlockList) - 1;
res->info.id.blockId = *dataBlockId;
*slotId = 0;
@ -161,7 +163,7 @@ int32_t scltMakeValueNode(SNode **pNode, int32_t dataType, void *value) {
if (NULL == vnode->datum.p) {
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
(void)varDataCopy(vnode->datum.p, value);
varDataCopy(vnode->datum.p, value);
vnode->node.resType.bytes = varDataTLen(value);
} else {
vnode->node.resType.bytes = tDataTypes[dataType].bytes;
@ -1382,7 +1384,9 @@ int32_t makeCalculate(void *json, void *key, int32_t rightType, void *rightData,
SNode *opNode = NULL;
SCL_ERR_RET(makeJsonArrow(&src, &opNode, json, (char *)key));
(void)taosArrayPush(blockList, &src);
if (NULL == taosArrayPush(blockList, &src)) {
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCL_ERR_RET(makeOperator(&opNode, blockList, opType, rightType, rightData, isReverse));
@ -1909,7 +1913,7 @@ TEST(columnTest, bigint_column_multi_binary_column) {
SArray *blockList = taosArrayInit(1, POINTER_BYTES);
ASSERT_NE(blockList, nullptr);
(void)taosArrayPush(blockList, &src);
ASSERT_NE(taosArrayPush(blockList, &src), nullptr);
SColumnInfo colInfo = createColumnInfo(1, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
int16_t dataBlockId = 0, slotId = 0;

View File

@ -1224,7 +1224,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
}
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
QRY_OPTR_CHECK(pList);
QRY_PARAM_CHECK(pList);
int32_t code = 0;
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);

View File

@ -231,7 +231,7 @@ _end:
static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (ts <= 0) {
if (ts < 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}

View File

@ -1421,12 +1421,12 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
if (lastVer != -1 && endIndex != lastVer + 1) {
code = TSDB_CODE_WAL_LOG_INCOMPLETE;
sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
TAOS_RETURN(code);
sWarn("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "",
pSyncNode->vgId, terrstr(), endIndex - 1, lastVer);
// TAOS_RETURN(code);
}
if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
// if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);

View File

@ -1227,7 +1227,11 @@ static void checkRegexCache(void* param, void* tmrId) {
if(sRegexCache.exit) {
goto _exit;
}
(void)taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId);
bool ret = taosTmrReset(checkRegexCache, REGEX_CACHE_CLEAR_TIME * 1000, param, sRegexCache.regexCacheTmr, &tmrId);
if (!ret) {
uError("failed to reset regex cache timer");
goto _exit;
}
if (taosHashGetSize(sRegexCache.regexHash) < MAX_REGEX_CACHE_SIZE) {
goto _exit;
}
@ -1238,7 +1242,10 @@ static void checkRegexCache(void* param, void* tmrId) {
if (taosGetTimestampSec() - (*ppUsingRegex)->lastUsedTime > REGEX_CACHE_CLEAR_TIME) {
size_t len = 0;
char* key = (char*)taosHashGetKey(ppUsingRegex, &len);
(void)taosHashRemove(sRegexCache.regexHash, key, len);
if (TSDB_CODE_SUCCESS != taosHashRemove(sRegexCache.regexHash, key, len)) {
uError("failed to remove regex pattern %s from cache", key);
goto _exit;
}
}
ppUsingRegex = taosHashIterate(sRegexCache.regexHash, ppUsingRegex);
}
@ -1285,7 +1292,10 @@ void DestroyRegexCache(){
#endif
int32_t code = 0;
uInfo("[regex cache] destory regex cache");
(void)taosTmrStopA(&sRegexCache.timer);
bool ret = taosTmrStopA(&sRegexCache.timer);
if (!ret) {
uError("failed to stop regex cache timer");
}
taosWLockLatch(&sRegexCache.mutex);
sRegexCache.exit = true;
taosHashCleanup(sRegexCache.regexHash);

View File

@ -24,7 +24,7 @@
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_null.py
,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f cluster/clusterBasic.py -N 3
#,,y,army,./pytest.sh python3 ./test.py -f cluster/clusterBasic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_query_accuracy.py
,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3
@ -351,7 +351,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/splitVGroupWal.py -N 3 -n 3
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/empty_identifier.py
@ -1270,7 +1270,7 @@
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic2.sim
,,y,script,./test.sh -f tsim/mnode/basic3.sim
#,,y,script,./test.sh -f tsim/mnode/basic3.sim
,,y,script,./test.sh -f tsim/mnode/basic4.sim
,,y,script,./test.sh -f tsim/mnode/basic5.sim
,,y,script,./test.sh -f tsim/show/basic.sim

View File

@ -50,12 +50,14 @@ int main(int argc, char *argv[]) {
shell.args.local = false;
#endif
#if 0
#if !defined(WINDOWS)
taosSetSignal(SIGBUS, shellCrashHandler);
#endif
taosSetSignal(SIGABRT, shellCrashHandler);
taosSetSignal(SIGFPE, shellCrashHandler);
taosSetSignal(SIGSEGV, shellCrashHandler);
#endif
if (shellCheckIntSize() != 0) {
return -1;