[TD-225]refactor codes.

This commit is contained in:
Haojun Liao 2021-01-17 22:50:23 +08:00
parent 795690cd0c
commit 6402c2b053
10 changed files with 196 additions and 202 deletions

View File

@ -38,7 +38,7 @@ typedef struct SLocalDataSource {
tFilePage filePage;
} SLocalDataSource;
typedef struct SLocalReducer {
typedef struct SLocalMerger {
SLocalDataSource ** pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;
@ -62,7 +62,7 @@ typedef struct SLocalReducer {
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
} SLocalReducer;
} SLocalMerger;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
@ -89,10 +89,10 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
/*
* create local reducer to launch the second-stage reduce process at client site
*/
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql);
void tscDestroyLocalReducer(SSqlObj *pSql);
void tscDestroyLocalMerger(SSqlObj *pSql);
int32_t tscDoLocalMerge(SSqlObj *pSql);

View File

@ -225,7 +225,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables);
SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
@ -292,7 +292,7 @@ uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaClone(STableMeta* pTableMeta);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
void* malloc_throw(size_t size);

View File

@ -39,7 +39,7 @@ extern "C" {
// forward declaration
struct SSqlInfo;
struct SLocalReducer;
struct SLocalMerger;
// data source from sql string or from file
enum {
@ -292,7 +292,7 @@ typedef struct {
SColumnIndex* pColumnIndex;
SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions
struct SLocalReducer *pLocalReducer;
struct SLocalMerger *pLocalMerger;
} SSqlRes;
typedef struct STscObj {

View File

@ -56,7 +56,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
}
static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDescriptor *pDesc) {
static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescriptor *pDesc) {
/*
* the fields and offset attributes in pCmd and pModel may be different due to
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
@ -166,7 +166,7 @@ static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
return pFillCol;
}
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
@ -212,9 +212,9 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
return;
}
size_t size = sizeof(SLocalReducer) + POINTER_BYTES * numOfFlush;
size_t size = sizeof(SLocalMerger) + POINTER_BYTES * numOfFlush;
SLocalReducer *pReducer = (SLocalReducer *) calloc(1, size);
SLocalMerger *pReducer = (SLocalMerger *) calloc(1, size);
if (pReducer == NULL) {
tscError("%p failed to create local merge structure, out of memory", pSql);
@ -372,7 +372,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->offset = (int32_t)pQueryInfo->limit.offset;
pRes->pLocalReducer = pReducer;
pRes->pLocalMerger = pReducer;
pRes->numOfGroups = 0;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
@ -477,13 +477,13 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
return 0;
}
void tscDestroyLocalReducer(SSqlObj *pSql) {
void tscDestroyLocalMerger(SSqlObj *pSql) {
if (pSql == NULL) {
return;
}
SSqlRes *pRes = &(pSql->res);
if (pRes->pLocalReducer == NULL) {
if (pRes->pLocalMerger == NULL) {
return;
}
@ -491,14 +491,14 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// there is no more result, so we release all allocated resource
SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL);
if (pLocalReducer != NULL) {
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL);
if (pLocalMerge != NULL) {
pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo);
if (pLocalReducer->pCtx != NULL) {
if (pLocalMerge->pCtx != NULL) {
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i];
tVariantDestroy(&pCtx->tag);
tfree(pCtx->resultInfo);
@ -508,31 +508,31 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
}
}
tfree(pLocalReducer->pCtx);
tfree(pLocalMerge->pCtx);
}
tfree(pLocalReducer->prevRowOfInput);
tfree(pLocalMerge->prevRowOfInput);
tfree(pLocalReducer->pTempBuffer);
tfree(pLocalReducer->pResultBuf);
tfree(pLocalMerge->pTempBuffer);
tfree(pLocalMerge->pResultBuf);
if (pLocalReducer->pLoserTree) {
tfree(pLocalReducer->pLoserTree->param);
tfree(pLocalReducer->pLoserTree);
if (pLocalMerge->pLoserTree) {
tfree(pLocalMerge->pLoserTree->param);
tfree(pLocalMerge->pLoserTree);
}
tfree(pLocalReducer->pFinalRes);
tfree(pLocalReducer->discardData);
tfree(pLocalMerge->pFinalRes);
tfree(pLocalMerge->discardData);
tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel, pLocalReducer->finalModel,
pLocalReducer->numOfVnode);
for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) {
tfree(pLocalReducer->pLocalDataSrc[i]);
tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, pLocalMerge->finalModel,
pLocalMerge->numOfVnode);
for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) {
tfree(pLocalMerge->pLocalDataSrc[i]);
}
pLocalReducer->numOfBuffer = 0;
pLocalReducer->numOfCompleted = 0;
free(pLocalReducer);
pLocalMerge->numOfBuffer = 0;
pLocalMerge->numOfCompleted = 0;
free(pLocalMerge);
} else {
tscDebug("%p already freed or another free function is invoked", pSql);
}
@ -604,7 +604,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
}
bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) {
bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pReducer, char *pPrev, tFilePage *tmpBuffer) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// disable merge procedure for column projection query
@ -795,12 +795,12 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe
/**
*
* @param pLocalReducer
* @param pLocalMerge
* @param pOneInterDataSrc
* @param treeList
* @return the number of remain input source. if ret == 0, all data has been handled
*/
int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSource *pOneInterDataSrc,
int32_t loadNewDataFromDiskFor(SLocalMerger *pLocalMerge, SLocalDataSource *pOneInterDataSrc,
bool *needAdjustLoserTree) {
pOneInterDataSrc->rowIdx = 0;
pOneInterDataSrc->pageId += 1;
@ -817,17 +817,17 @@ int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSource *p
#endif
*needAdjustLoserTree = true;
} else {
pLocalReducer->numOfCompleted += 1;
pLocalMerge->numOfCompleted += 1;
pOneInterDataSrc->rowIdx = -1;
pOneInterDataSrc->pageId = -1;
*needAdjustLoserTree = true;
}
return pLocalReducer->numOfBuffer;
return pLocalMerge->numOfBuffer;
}
void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *pOneInterDataSrc,
void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOneInterDataSrc,
SLoserTreeInfo *pTree) {
/*
* load a new data page into memory for intermediate dataset source,
@ -835,7 +835,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
*/
bool needToAdjust = true;
if (pOneInterDataSrc->filePage.num <= pOneInterDataSrc->rowIdx) {
loadNewDataFromDiskFor(pLocalReducer, pOneInterDataSrc, &needToAdjust);
loadNewDataFromDiskFor(pLocalMerge, pOneInterDataSrc, &needToAdjust);
}
/*
@ -843,7 +843,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
* if the loser tree is rebuild completed, we do not need to adjust
*/
if (needToAdjust) {
int32_t leafNodeIdx = pTree->pNode[0].index + pLocalReducer->numOfBuffer;
int32_t leafNodeIdx = pTree->pNode[0].index + pLocalMerge->numOfBuffer;
#ifdef _DEBUG_VIEW
printf("before adjust:\t");
@ -860,7 +860,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
}
}
void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
void savePrevRecordAndSetupFillInfo(SLocalMerger *pLocalMerge, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
// discard following dataset in the same group and reset the interpolation information
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
@ -873,28 +873,28 @@ void savePrevRecordAndSetupFillInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQ
taosResetFillInfo(pFillInfo, revisedSTime);
}
pLocalReducer->discard = true;
pLocalReducer->discardData->num = 0;
pLocalMerge->discard = true;
pLocalMerge->discardData->num = 0;
SColumnModel *pModel = pLocalReducer->pDesc->pColumnModel;
tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1);
SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel;
tColModelAppend(pModel, pLocalMerge->discardData, pLocalMerge->prevRowOfInput, 0, 1, 1);
}
static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, SQueryInfo* pQueryInfo) {
static void genFinalResWithoutFill(SSqlRes* pRes, SLocalMerger *pLocalMerge, SQueryInfo* pQueryInfo) {
assert(pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
tFilePage * pBeforeFillData = pLocalReducer->pResultBuf;
tFilePage * pBeforeFillData = pLocalMerge->pResultBuf;
pRes->data = pLocalReducer->pFinalRes;
pRes->data = pLocalMerge->pFinalRes;
pRes->numOfRows = (int32_t) pBeforeFillData->num;
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = (int32_t) pBeforeFillData->num;
tColModelErase(pLocalReducer->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
tColModelErase(pLocalMerge->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
/* remove the hole in column model */
tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize);
pRes->numOfRows -= (int32_t) pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
@ -907,7 +907,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
if (pRes->numOfRowsGroup >= pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) {
pRes->numOfRows = 0;
pBeforeFillData->num = 0;
pLocalReducer->discard = true;
pLocalMerge->discard = true;
return;
}
@ -923,29 +923,29 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer,
pRes->numOfRows -= overflow;
pBeforeFillData->num -= overflow;
tColModelCompact(pLocalReducer->finalModel, pBeforeFillData, prevSize);
tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize);
// set remain data to be discarded, and reset the interpolation information
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pLocalMerge->pFillInfo);
}
memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalReducer->finalModel->rowSize));
memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalMerge->finalModel->rowSize));
pRes->numOfClauseTotal += pRes->numOfRows;
pBeforeFillData->num = 0;
}
/*
* Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
* Note: pRes->pLocalMerge may be null, due to the fact that "tscDestroyLocalMerger" is called
* by "interuptHandler" function in shell
*/
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutput) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tFilePage *pBeforeFillData = pLocalReducer->pResultBuf;
tFilePage *pBeforeFillData = pLocalMerge->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
// todo extract function
int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
@ -953,11 +953,11 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalMerge->resColModel->capacity);
}
while (1) {
int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity);
int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity);
if (pQueryInfo->limit.offset < newRows) {
newRows -= pQueryInfo->limit.offset;
@ -970,7 +970,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
}
pRes->data = pLocalReducer->pFinalRes;
pRes->data = pLocalMerge->pFinalRes;
pRes->numOfRows = (int32_t) newRows;
pQueryInfo->limit.offset = 0;
@ -985,7 +985,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
// all output in current group are completed
int32_t totalRemainRows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
int32_t totalRemainRows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, actualETime, pLocalMerge->resColModel->capacity);
if (totalRemainRows <= 0) {
break;
}
@ -1003,7 +1003,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
assert(pRes->numOfRows >= 0);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pFillInfo);
savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pFillInfo);
}
int32_t offset = 0;
@ -1025,8 +1025,8 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
tfree(pResPages);
}
static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
SColumnModel *pColumnModel = pLocalReducer->pDesc->pColumnModel;
static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
SColumnModel *pColumnModel = pLocalMerge->pDesc->pColumnModel;
assert(pColumnModel->capacity == 1 && tmpBuffer->num == 1);
// copy to previous temp buffer
@ -1034,20 +1034,20 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
SSchema *pSchema = getColumnModelSchema(pColumnModel, i);
int16_t offset = getColumnModelOffset(pColumnModel, i);
memcpy(pLocalReducer->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes);
memcpy(pLocalMerge->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes);
}
tmpBuffer->num = 0;
pLocalReducer->hasPrevRow = true;
pLocalMerge->hasPrevRow = true;
}
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool needInit) {
// the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[j];
// tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
int32_t functionId = pCtx->functionId;
@ -1074,20 +1074,20 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
}
for (int32_t j = 0; j < size; ++j) {
int32_t functionId = pLocalReducer->pCtx[j].functionId;
int32_t functionId = pLocalMerge->pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
aAggs[functionId].mergeFunc(&pLocalReducer->pCtx[j]);
aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]);
}
}
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
if (pLocalReducer->hasUnprocessedRow) {
pLocalReducer->hasUnprocessedRow = false;
doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
savePreviousRow(pLocalReducer, tmpBuffer);
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
if (pLocalMerge->hasUnprocessedRow) {
pLocalMerge->hasUnprocessedRow = false;
doExecuteSecondaryMerge(pCmd, pLocalMerge, true);
savePreviousRow(pLocalMerge, tmpBuffer);
}
}
@ -1120,7 +1120,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
* filled with the same result, which is the tags, specified in group by clause
*
*/
static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) {
static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalMerger *pLocalMerge) {
int32_t maxBufSize = 0; // find the max tags column length to prepare the buffer
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
@ -1135,7 +1135,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
char *buf = malloc((size_t)maxBufSize);
for (int32_t k = 0; k < size; ++k) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
if (pCtx->functionId != TSDB_FUNC_TAG) {
continue;
}
@ -1153,20 +1153,20 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
free(buf);
}
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SQLFunctionCtx* pCtx = &pLocalReducer->pCtx[k];
SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k];
aAggs[pCtx->functionId].xFinalize(pCtx);
}
pLocalReducer->hasPrevRow = false;
pLocalMerge->hasPrevRow = false;
int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalReducer->pCtx);
pLocalReducer->pResultBuf->num += numOfRes;
int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalMerge->pCtx);
pLocalMerge->pResultBuf->num += numOfRes;
fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalReducer);
fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalMerge);
return numOfRes;
}
@ -1177,22 +1177,22 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
* results generated by simple aggregation function, we merge them all into one points
* *Exception*: column projection query, required no merge procedure
*/
bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
int32_t ret = 0; // merge all result by default
int16_t functionId = pLocalReducer->pCtx[0].functionId;
int16_t functionId = pLocalMerge->pCtx[0].functionId;
// todo opt performance
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0))) { // column projection query
ret = 1; // disable merge procedure
} else {
tOrderDescriptor *pDesc = pLocalReducer->pDesc;
tOrderDescriptor *pDesc = pLocalMerge->pDesc;
if (pDesc->orderInfo.numOfCols > 0) {
if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc
// todo refactor comparator
ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
} else { // desc
ret = compare_d(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
}
}
}
@ -1230,17 +1230,17 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
/**
*
* @param pSql
* @param pLocalReducer
* @param pLocalMerge
* @param noMoreCurrentGroupRes
* @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
*/
bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurrentGroupRes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage * pResBuf = pLocalReducer->pResultBuf;
SColumnModel *pModel = pLocalReducer->resColModel;
tFilePage * pResBuf = pLocalMerge->pResultBuf;
SColumnModel *pModel = pLocalMerge->resColModel;
pRes->code = TSDB_CODE_SUCCESS;
@ -1251,11 +1251,11 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
if (pQueryInfo->slimit.offset > 0) {
pRes->numOfRows = 0;
pQueryInfo->slimit.offset -= 1;
pLocalReducer->discard = !noMoreCurrentGroupRes;
pLocalMerge->discard = !noMoreCurrentGroupRes;
if (pLocalReducer->discard) {
SColumnModel *pInternModel = pLocalReducer->pDesc->pColumnModel;
tColModelAppend(pInternModel, pLocalReducer->discardData, pLocalReducer->pTempBuffer->data, 0, 1, 1);
if (pLocalMerge->discard) {
SColumnModel *pInternModel = pLocalMerge->pDesc->pColumnModel;
tColModelAppend(pInternModel, pLocalMerge->discardData, pLocalMerge->pTempBuffer->data, 0, 1, 1);
}
return false;
@ -1264,19 +1264,14 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
tColModelCompact(pModel, pResBuf, pModel->capacity);
if (tscIsSecondStageQuery(pQueryInfo)) {
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalModel->rowSize);
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize);
}
#ifdef _DEBUG_VIEW
printf("final result before interpo:\n");
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
#endif
// no interval query, no fill operation
if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
genFinalResWithoutFill(pRes, pLocalReducer, pQueryInfo);
genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo);
} else {
SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
SFillInfo* pFillInfo = pLocalMerge->pFillInfo;
if (pFillInfo != NULL) {
TSKEY ekey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
@ -1284,34 +1279,34 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
}
doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
doFillResult(pSql, pLocalMerge, noMoreCurrentGroupRes);
}
return true;
}
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// reset output buffer to the beginning
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning
size_t t = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < t; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity;
pLocalMerge->pCtx[i].aOutputBuf = pLocalMerge->pResultBuf->data + pExpr->offset * pLocalMerge->resColModel->capacity;
if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM || pExpr->functionId == TSDB_FUNC_DIFF) {
pLocalReducer->pCtx[i].ptsOutputBuf = pLocalReducer->pCtx[0].aOutputBuf;
pLocalMerge->pCtx[i].ptsOutputBuf = pLocalMerge->pCtx[0].aOutputBuf;
}
}
memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
memset(pLocalMerge->pResultBuf, 0, pLocalMerge->nResultBufSize + sizeof(tFilePage));
}
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) {
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalMerger *pLocalMerge) {
// In handling data in other groups, we need to reset the interpolation information for a new group data
pRes->numOfRows = 0;
pRes->numOfRowsGroup = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pQueryInfo->limit.offset = pLocalReducer->offset;
pQueryInfo->limit.offset = pLocalMerge->offset;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
@ -1320,12 +1315,12 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
TSKEY skey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey:pQueryInfo->window.ekey;//MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t newTime = taosTimeTruncate(skey, &pQueryInfo->interval, tinfo.precision);
taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
taosResetFillInfo(pLocalMerge->pFillInfo, newTime);
}
}
static bool isAllSourcesCompleted(SLocalReducer *pLocalReducer) {
return (pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted);
static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) {
return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted);
}
static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
@ -1333,19 +1328,19 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
if (pFillInfo != NULL && taosFillHasMoreResults(pFillInfo)) {
assert(pQueryInfo->fillType != TSDB_FILL_NONE);
tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf;
tFilePage *pFinalDataBuf = pLocalMerge->pResultBuf;
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
// the first column must be the timestamp column
int32_t rows = (int32_t) getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalReducer->resColModel->capacity);
int32_t rows = (int32_t) getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity);
if (rows > 0) { // do fill gap
doFillResult(pSql, pLocalReducer, false);
doFillResult(pSql, pLocalMerge, false);
}
return true;
@ -1358,23 +1353,23 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
bool prevGroupCompleted = (!pLocalMerge->discard) && pLocalMerge->hasUnprocessedRow;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
if ((isAllSourcesCompleted(pLocalMerge) && !pLocalMerge->hasPrevRow) || pLocalMerge->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
// if fillType == TSDB_FILL_NONE, return directly
if (pQueryInfo->fillType != TSDB_FILL_NONE &&
((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) {
int64_t etime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey : pQueryInfo->window.skey;
int32_t rows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalReducer->resColModel->capacity);
int32_t rows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity);
if (rows > 0) {
doFillResult(pSql, pLocalReducer, true);
doFillResult(pSql, pLocalMerge, true);
}
}
@ -1384,7 +1379,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
*
* No results will be generated and query completed.
*/
if (pRes->numOfRows > 0 || (isAllSourcesCompleted(pLocalReducer) && (!pLocalReducer->hasUnprocessedRow))) {
if (pRes->numOfRows > 0 || (isAllSourcesCompleted(pLocalMerge) && (!pLocalMerge->hasUnprocessedRow))) {
return true;
}
@ -1393,7 +1388,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
return true;
}
resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
}
return false;
@ -1403,12 +1398,12 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
pCtx->aOutputBuf += pCtx->outputBytes * numOfRes;
// set the correct output timestamp column position
@ -1417,7 +1412,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
}
}
doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
doExecuteSecondaryMerge(pCmd, pLocalMerge, true);
}
int32_t tscDoLocalMerge(SSqlObj *pSql) {
@ -1426,14 +1421,14 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
tscResetForNextRetrieve(pRes);
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalMerger == NULL) { // all data has been processed
tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code));
return pRes->code;
}
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage *tmpBuffer = pLocalReducer->pTempBuffer;
tFilePage *tmpBuffer = pLocalMerge->pTempBuffer;
if (doHandleLastRemainData(pSql)) {
return TSDB_CODE_SUCCESS;
@ -1443,24 +1438,24 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
SLoserTreeInfo *pTree = pLocalReducer->pLoserTree;
SLoserTreeInfo *pTree = pLocalMerge->pLoserTree;
// clear buffer
handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
SColumnModel *pModel = pLocalReducer->pDesc->pColumnModel;
handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel;
while (1) {
if (isAllSourcesCompleted(pLocalReducer)) {
if (isAllSourcesCompleted(pLocalMerge)) {
break;
}
#ifdef _DEBUG_VIEW
printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
#endif
assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0);
assert((pTree->pNode[0].index < pLocalMerge->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0);
// chosen from loser tree
SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index];
SLocalDataSource *pOneDataSrc = pLocalMerge->pLocalDataSrc[pTree->pNode[0].index];
tColModelAppend(pModel, tmpBuffer, pOneDataSrc->filePage.data, pOneDataSrc->rowIdx, 1,
pOneDataSrc->pMemBuffer->pColumnModel->capacity);
@ -1473,76 +1468,76 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo);
#endif
if (pLocalReducer->discard) {
assert(pLocalReducer->hasUnprocessedRow == false);
if (pLocalMerge->discard) {
assert(pLocalMerge->hasUnprocessedRow == false);
/* current record belongs to the same group of previous record, need to discard it */
if (isSameGroup(pCmd, pLocalReducer, pLocalReducer->discardData->data, tmpBuffer)) {
if (isSameGroup(pCmd, pLocalMerge, pLocalMerge->discardData->data, tmpBuffer)) {
tmpBuffer->num = 0;
pOneDataSrc->rowIdx += 1;
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
// all inputs are exhausted, abort current process
if (isAllSourcesCompleted(pLocalReducer)) {
if (isAllSourcesCompleted(pLocalMerge)) {
break;
}
// data belongs to the same group needs to be discarded
continue;
} else {
pLocalReducer->discard = false;
pLocalReducer->discardData->num = 0;
pLocalMerge->discard = false;
pLocalMerge->discardData->num = 0;
if (saveGroupResultInfo(pSql)) {
return TSDB_CODE_SUCCESS;
}
resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
}
}
if (pLocalReducer->hasPrevRow) {
if (needToMerge(pQueryInfo, pLocalReducer, tmpBuffer)) {
if (pLocalMerge->hasPrevRow) {
if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) {
// belong to the group of the previous row, continue process it
doExecuteSecondaryMerge(pCmd, pLocalReducer, false);
doExecuteSecondaryMerge(pCmd, pLocalMerge, false);
// copy to buffer
savePreviousRow(pLocalReducer, tmpBuffer);
savePreviousRow(pLocalMerge, tmpBuffer);
} else {
/*
* current row does not belong to the group of previous row.
* so the processing of previous group is completed.
*/
int32_t numOfRes = finalizeRes(pQueryInfo, pLocalReducer);
bool sameGroup = isSameGroup(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpBuffer);
int32_t numOfRes = finalizeRes(pQueryInfo, pLocalMerge);
bool sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer);
tFilePage *pResBuf = pLocalReducer->pResultBuf;
tFilePage *pResBuf = pLocalMerge->pResultBuf;
/*
* if the previous group does NOT generate any result (pResBuf->num == 0),
* continue to process results instead of return results.
*/
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) {
if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) {
// does not belong to the same group
bool notSkipped = genFinalResults(pSql, pLocalReducer, !sameGroup);
bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup);
// this row needs to discard, since it belongs to the group of previous
if (pLocalReducer->discard && sameGroup) {
pLocalReducer->hasUnprocessedRow = false;
if (pLocalMerge->discard && sameGroup) {
pLocalMerge->hasUnprocessedRow = false;
tmpBuffer->num = 0;
} else { // current row does not belongs to the previous group, so it is not be handled yet.
pLocalReducer->hasUnprocessedRow = true;
pLocalMerge->hasUnprocessedRow = true;
}
resetOutputBuf(pQueryInfo, pLocalReducer);
resetOutputBuf(pQueryInfo, pLocalMerge);
pOneDataSrc->rowIdx += 1;
// here we do not check the return value
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
if (pRes->numOfRows == 0) {
handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
if (!sameGroup) {
/*
@ -1553,7 +1548,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
}
} else {
/*
@ -1561,7 +1556,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* We start the process in a new round.
*/
if (sameGroup) {
handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
}
}
@ -1573,24 +1568,24 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
}
} else { // result buffer is not full
doProcessResultInNextWindow(pSql, numOfRes);
savePreviousRow(pLocalReducer, tmpBuffer);
savePreviousRow(pLocalMerge, tmpBuffer);
}
}
} else {
doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
savePreviousRow(pLocalReducer, tmpBuffer); // copy the processed row to buffer
doExecuteSecondaryMerge(pCmd, pLocalMerge, true);
savePreviousRow(pLocalMerge, tmpBuffer); // copy the processed row to buffer
}
pOneDataSrc->rowIdx += 1;
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
}
if (pLocalReducer->hasPrevRow) {
finalizeRes(pQueryInfo, pLocalReducer);
if (pLocalMerge->hasPrevRow) {
finalizeRes(pQueryInfo, pLocalMerge);
}
if (pLocalReducer->pResultBuf->num) {
genFinalResults(pSql, pLocalReducer, true);
if (pLocalMerge->pResultBuf->num) {
genFinalResults(pSql, pLocalMerge, true);
}
return TSDB_CODE_SUCCESS;
@ -1598,8 +1593,8 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) {
SSqlRes *pRes = &pObj->res;
if (pRes->pLocalReducer != NULL) {
tscDestroyLocalReducer(pObj);
if (pRes->pLocalMerger != NULL) {
tscDestroyLocalMerger(pObj);
}
pRes->qhandle = 1; // hack to pass the safety check in fetch_row function
@ -1607,17 +1602,17 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes->row = 0;
pRes->rspType = 0; // used as a flag to denote if taos_retrieved() has been called yet
pRes->pLocalReducer = (SLocalReducer *)calloc(1, sizeof(SLocalReducer));
pRes->pLocalMerger = (SLocalMerger *)calloc(1, sizeof(SLocalMerger));
/*
* we need one additional byte space
* the sprintf function needs one additional space to put '\0' at the end of string
*/
size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1;
pRes->pLocalReducer->pResultBuf = (tFilePage *)calloc(1, allocSize);
pRes->pLocalMerger->pResultBuf = (tFilePage *)calloc(1, allocSize);
pRes->pLocalReducer->pResultBuf->num = numOfRes;
pRes->data = pRes->pLocalReducer->pResultBuf->data;
pRes->pLocalMerger->pResultBuf->num = numOfRes;
pRes->data = pRes->pLocalMerger->pResultBuf->data;
}
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {

View File

@ -2444,7 +2444,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta* pTableMeta = tscTableMetaClone(pMInfo->pTableMeta);
STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta);
tscAddTableMetaInfo(pNewQueryInfo, pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
}

View File

@ -885,10 +885,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
SSqlObj* psub1 = pParentSql->pSubs[0];
((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo1->pVgroupTables);
((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo1->pVgroupTables);
SSqlObj* psub2 = pParentSql->pSubs[1];
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables);
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo2->pVgroupTables);
pParentSql->subState.numOfSub = 2;
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
@ -1998,7 +1998,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, trsupport->pFinalColModel, trsupport->pFFColModel, pParentSql);
tscDebug("%p build loser tree completed", pParentSql);
pParentSql->res.precision = pSql->res.precision;

View File

@ -420,7 +420,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
}
void tscFreeSqlResult(SSqlObj* pSql) {
tscDestroyLocalReducer(pSql);
tscDestroyLocalMerger(pSql);
SSqlRes* pRes = &pSql->res;
tscDestroyResPointerInfo(pRes);
@ -612,7 +612,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
tfree(pTableMetaInfo->pTableMeta);
}
pTableMetaInfo->pTableMeta = tscTableMetaClone(pDataBlock->pTableMeta);
pTableMetaInfo->pTableMeta = tscTableMetaDup(pDataBlock->pTableMeta);
} else {
assert(strncmp(pTableMetaInfo->name, pDataBlock->tableName, tListLen(pDataBlock->tableName)) == 0);
}
@ -680,7 +680,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
tstrncpy(dataBuf->tableName, name, sizeof(dataBuf->tableName));
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaClone(pTableMeta);
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
assert(initialSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
@ -1810,10 +1810,10 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
info->itemList = taosArrayClone(pInfo->itemList);
info->itemList = taosArrayDup(pInfo->itemList);
}
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return NULL;
}
@ -1881,7 +1881,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
}
pTableMetaInfo->pVgroupTables = tscVgroupTableInfoClone(pVgroupTables);
pTableMetaInfo->pVgroupTables = tscVgroupTableInfoDup(pVgroupTables);
pQueryInfo->numOfTables += 1;
return pTableMetaInfo;
@ -2064,7 +2064,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr;
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayClone(pQueryInfo->groupbyExpr.columnInfo);
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
@ -2119,7 +2119,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
STableMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) {
STableMeta* pTableMeta = tscTableMetaClone(pTableMetaInfo->pTableMeta);
STableMeta* pTableMeta = tscTableMetaDup(pTableMetaInfo->pTableMeta);
assert(pTableMeta != NULL);
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList,
@ -2127,7 +2127,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
} else { // transfer the ownership of pTableMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevTableMeta = tscTableMetaClone(pPrevInfo->pTableMeta);
STableMeta* pPrevTableMeta = tscTableMetaDup(pPrevInfo->pTableMeta);
SVgroupsInfo* pVgroupsInfo = pPrevInfo->vgroupList;
pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pPrevTableMeta, pVgroupsInfo, pTableMetaInfo->tagColList,
pTableMetaInfo->pVgroupTables);
@ -2297,7 +2297,6 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
}
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
@ -2695,7 +2694,7 @@ uint32_t tscGetTableMetaMaxSize() {
return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema);
}
STableMeta* tscTableMetaClone(STableMeta* pTableMeta) {
STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
uint32_t size = tscGetTableMetaSize(pTableMeta);
STableMeta* p = calloc(1, size);

View File

@ -5043,7 +5043,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group);
SArray *tx = taosArrayDup(group);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
@ -5103,7 +5103,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
SArray *tx = taosArrayClone(group);
SArray *tx = taosArrayDup(group);
taosArrayPush(g1, &tx);
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};

View File

@ -110,7 +110,7 @@ void taosArrayCopy(SArray* pDst, const SArray* pSrc);
* clone a new array
* @param pSrc
*/
SArray* taosArrayClone(const SArray* pSrc);
SArray* taosArrayDup(const SArray* pSrc);
/**
* clear the array (remove all element)

View File

@ -165,7 +165,7 @@ void taosArrayCopy(SArray* pDst, const SArray* pSrc) {
pDst->size = pSrc->size;
}
SArray* taosArrayClone(const SArray* pSrc) {
SArray* taosArrayDup(const SArray* pSrc) {
assert(pSrc != NULL);
if (pSrc->size == 0) { // empty array list