Merge pull request #15661 from taosdata/feature/3_liaohj

refactor: optimize the build block perf.
This commit is contained in:
Haojun Liao 2022-08-02 13:04:00 +08:00 committed by GitHub
commit bb869800f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 110 additions and 37 deletions

View File

@ -184,6 +184,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
int32_t getJsonValueLen(const char* data);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,

View File

@ -118,6 +118,76 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
return 0;
}
int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return TSDB_CODE_SUCCESS;
}
if (pColumnInfoData->varmeta.allocLen >= newSize) {
return TSDB_CODE_SUCCESS;
}
if (pColumnInfoData->varmeta.allocLen < newSize) {
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pColumnInfoData->pData = buf;
pColumnInfoData->varmeta.allocLen = newSize;
}
return TSDB_CODE_SUCCESS;
}
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, int32_t itemLen, int32_t numOfRows) {
ASSERT(pColumnInfoData->info.bytes >= itemLen);
size_t start = 1;
// the first item
memcpy(pColumnInfoData->pData, pData, itemLen);
int32_t t = 0;
int32_t count = log(numOfRows)/log(2);
while(t < count) {
int32_t xlen = 1 << t;
memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData, xlen * itemLen);
t += 1;
start += xlen;
}
// the tail part
if (numOfRows > start) {
memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData, (numOfRows - start) * itemLen);
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
for(int32_t i = 0; i < numOfRows; ++i) {
pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
}
pColumnInfoData->varmeta.length += numOfRows * itemLen;
}
}
int32_t colDataAppendNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) {
ASSERT(pData != NULL && pColumnInfoData != NULL);
int32_t len = pColumnInfoData->info.bytes;
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
len = varDataTLen(pData);
if (pColumnInfoData->varmeta.allocLen < (numOfRows + currentRow) * len) {
int32_t code = colDataReserve(pColumnInfoData, (numOfRows + currentRow) * len);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows);
return TSDB_CODE_SUCCESS;
}
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
int32_t numOfRow2) {
if (numOfRow2 <= 0) return;
@ -1113,15 +1183,12 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
pColumn->varmeta.length = 0;
if (pColumn->varmeta.offset > 0) {
if (pColumn->varmeta.offset != NULL) {
memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
}
} else {
if (pColumn->nullbitmap != NULL) {
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
if (pColumn->pData != NULL) {
memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows);
}
}
}
}

View File

@ -758,7 +758,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
pReader->cost.blockLoadTime += elapsedTime;
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);

View File

@ -917,9 +917,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);

View File

@ -571,6 +571,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
}
*numOfExprs = numOfFuncs + numOfGroupKeys;
if (*numOfExprs == 0) {
return NULL;
}
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
for (int32_t i = 0; i < (*numOfExprs); ++i) {

View File

@ -4171,16 +4171,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
pPhyNode->pConditions, pTaskInfo);
pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {

View File

@ -405,9 +405,15 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
data = (char*)p;
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data,
(data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
if (isNullVal) {
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
} else { // todo opt for json tag
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, false);
}
}
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&

View File

@ -152,7 +152,7 @@ SSDataBlock* loadNextDataBlock(void* param) {
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->exprSupp.pExprInfo != NULL) {
if (pOperator->exprSupp.pExprInfo != NULL && pOperator->exprSupp.numOfExprs > 0) {
int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
pOperator->exprSupp.numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -2439,12 +2439,14 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup);
cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param);
}
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SNode* pCondition,
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -2455,6 +2457,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -2462,16 +2468,19 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->twAggSup = *pTwAggSupp;
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
pInfo->gap = pSessionNode->gap;
initResultRowInfo(&pInfo->binfo.resultRowInfo);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->tsSlotId = tsSlotId;
pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock;
pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pInfo->pCondition = pCondition;
pInfo->reptScan = false;
pInfo->pCondition = pSessionNode->window.node.pConditions;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true;

View File

@ -1727,16 +1727,11 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
metaGetTableNameByUid(pInput->param, uid, str);
for(int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false);
}
colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows);
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
/** Aggregation functions **/
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
SColumnInfoData *pInputData = pInput->columnData;

View File

@ -31,6 +31,7 @@ void swapStr(char* j, char* J, int width) {
}
#endif
// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually.
void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) {
#ifdef WINDOWS
int64_t i, j;