Merge pull request #11285 from taosdata/feature/3.0_liaohj
[td-14493] add new api.
This commit is contained in:
commit
628096b3dd
|
@ -188,6 +188,8 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
|
||||||
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
|
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
|
||||||
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
|
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
|
||||||
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
|
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
|
||||||
|
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows);
|
||||||
|
DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData);
|
||||||
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
|
DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
|
||||||
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
|
||||||
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
DLL_EXPORT void taos_reset_current_db(TAOS *taos);
|
||||||
|
|
|
@ -385,6 +385,37 @@ bool taos_is_update_query(TAOS_RES *res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
/*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows);
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) {
|
||||||
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
if (pRequest == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*rows) = NULL;
|
||||||
|
(*numOfRows) = 0;
|
||||||
|
|
||||||
|
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
|
||||||
|
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
doFetchRow(pRequest, false);
|
||||||
|
|
||||||
|
// TODO refactor
|
||||||
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
|
pResultInfo->current = pResultInfo->numOfRows;
|
||||||
|
|
||||||
|
(*rows) = pResultInfo->row;
|
||||||
|
(*numOfRows) = pResultInfo->numOfRows;
|
||||||
|
return pRequest->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
if (pRequest == NULL) {
|
if (pRequest == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -397,12 +428,13 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
||||||
|
|
||||||
doFetchRow(pRequest, false);
|
doFetchRow(pRequest, false);
|
||||||
|
|
||||||
// TODO refactor
|
|
||||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
pResultInfo->current = pResultInfo->numOfRows;
|
|
||||||
*rows = pResultInfo->row;
|
|
||||||
|
|
||||||
return pResultInfo->numOfRows;
|
pResultInfo->current = pResultInfo->numOfRows;
|
||||||
|
(*numOfRows) = pResultInfo->numOfRows;
|
||||||
|
(*pData) = (void*) pResultInfo->pData;
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||||
|
|
|
@ -641,6 +641,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
uint64_t* total, SArray* pColList);
|
uint64_t* total, SArray* pColList);
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity);
|
||||||
|
SSDataBlock* loadNextDataBlock(void* param);
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
||||||
|
@ -665,8 +667,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
|
||||||
#if 0
|
#if 0
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||||
|
|
|
@ -4756,8 +4756,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol,
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
|
||||||
int32_t capacity) {
|
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -4778,7 +4777,6 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB
|
||||||
SSDataBlock* loadNextDataBlock(void* param) {
|
SSDataBlock* loadNextDataBlock(void* param) {
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
||||||
bool newgroup = false;
|
bool newgroup = false;
|
||||||
|
|
||||||
return pOperator->getNextFn(pOperator, &newgroup);
|
return pOperator->getNextFn(pOperator, &newgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4958,7 +4956,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
@ -5103,15 +5101,14 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSortOperatorInfo* pInfo = pOperator->info;
|
SSortOperatorInfo* pInfo = pOperator->info;
|
||||||
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
|
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pDataBlock, "GET_TASKID(pTaskInfo)");
|
pInfo->pDataBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||||
|
|
||||||
|
@ -5125,27 +5122,23 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo,
|
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
taosMemoryFreeClear(pInfo);
|
goto _error;
|
||||||
taosMemoryFreeClear(pOperator);
|
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer
|
pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer
|
||||||
pInfo->bufPageSize = 1024;
|
pInfo->bufPageSize = 1024;
|
||||||
pInfo->numOfRowsInRes = 1024;
|
pInfo->numOfRowsInRes = 1024;
|
||||||
pInfo->pDataBlock = pResBlock;
|
pInfo->pDataBlock = pResBlock;
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
|
|
||||||
pOperator->name = "Sort";
|
pOperator->name = "SortOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blockingOptr = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
@ -5157,6 +5150,12 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
_error:
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; }
|
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; }
|
||||||
|
|
|
@ -345,45 +345,73 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFreeClear(pInfo);
|
taosMemoryFreeClear(pInfo);
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSortOperatorInfo* pInfo = pOperator->info;
|
||||||
|
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
|
||||||
|
|
||||||
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
||||||
|
pInfo->pDataBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||||
|
|
||||||
|
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource));
|
||||||
|
ps->param = pOperator->pDownstream[0];
|
||||||
|
tsortAddSource(pInfo->pSortHandle, ps);
|
||||||
|
|
||||||
|
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
||||||
|
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pGroupCols = pGroupColList;
|
pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->bufPageSize = 1024;
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str);
|
pInfo->numOfRowsInRes = 1024;
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
|
pInfo->pDataBlock = pResultBlock;
|
||||||
|
pInfo->pSortInfo = pSortInfo;
|
||||||
|
|
||||||
int32_t code = initGroupOptrInfo(pInfo, pGroupColList);
|
pOperator->name = "PartitionOperator";
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->name = "PartitionByOperator";
|
|
||||||
pOperator->blockingOptr = true;
|
pOperator->blockingOptr = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
|
||||||
pOperator->pExpr = pExprInfo;
|
|
||||||
pOperator->numOfOutput = numOfCols;
|
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->_openFn = operatorDummyOpenFn;
|
|
||||||
pOperator->getNextFn = hashGroupbyAggregate;
|
|
||||||
pOperator->closeFn = destroyGroupbyOperatorInfo;
|
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
pOperator->getNextFn = doPartitionData;
|
||||||
|
// pOperator->closeFn = destroyOrderOperatorInfo;
|
||||||
|
|
||||||
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
taosMemoryFreeClear(pInfo);
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFree(pInfo);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
Loading…
Reference in New Issue