feat: optimize encode/decode resultRow
This commit is contained in:
parent
8e42ce1d27
commit
4961b6ab8e
|
@ -232,9 +232,11 @@ typedef struct STaskAttr {
|
|||
} STaskAttr;
|
||||
|
||||
struct SOperatorInfo;
|
||||
struct SAggSupporter;
|
||||
struct SOptrBasicInfo;
|
||||
|
||||
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
|
||||
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
|
||||
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length);
|
||||
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length);
|
||||
|
||||
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
|
||||
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
|
||||
|
@ -753,6 +755,9 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
|||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
|
||||
int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum);
|
||||
|
||||
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length);
|
||||
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -4974,15 +4974,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
|
||||
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||
|
||||
char *result = NULL;
|
||||
int32_t length = 0;
|
||||
pOperator->encodeResultRow(pOperator, &result, &length);
|
||||
SAggSupporter *pSup = &pAggInfo->aggSup;
|
||||
taosHashClear(pSup->pResultRowHashTable);
|
||||
pOperator->decodeResultRow(pOperator, result, length);
|
||||
if(result){
|
||||
taosMemoryFree(result);
|
||||
if(pOperator->encodeResultRow){
|
||||
char *result = NULL;
|
||||
int32_t length = 0;
|
||||
SAggSupporter *pSup = &pAggInfo->aggSup;
|
||||
pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length);
|
||||
taosHashClear(pSup->pResultRowHashTable);
|
||||
pInfo->resultRowInfo.size = 0;
|
||||
pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length);
|
||||
if(result){
|
||||
taosMemoryFree(result);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput);
|
||||
|
@ -5011,43 +5015,33 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
|
|||
return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL;
|
||||
}
|
||||
|
||||
static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) {
|
||||
SAggSupporter *pSup = NULL;
|
||||
switch(pOperator->operatorType){
|
||||
case QUERY_NODE_PHYSICAL_PLAN_AGG:{
|
||||
SAggOperatorInfo *pAggInfo = pOperator->info;
|
||||
pSup = &pAggInfo->aggSup;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{
|
||||
SGroupbyOperatorInfo *pAggInfo = pOperator->info;
|
||||
pSup = &pAggInfo->aggSup;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
|
||||
STableIntervalOperatorInfo *pAggInfo = pOperator->info;
|
||||
pSup = &pAggInfo->aggSup;
|
||||
break;
|
||||
}
|
||||
default:{
|
||||
qDebug("invalid operatorType: %d", pOperator->operatorType);
|
||||
}
|
||||
}
|
||||
|
||||
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length) {
|
||||
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
|
||||
size_t keyLen = POINTER_BYTES; // estimate the key length
|
||||
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
|
||||
int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
|
||||
*result = taosMemoryCalloc(1, totalSize);
|
||||
if (*result == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return;
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
*(int32_t*)(*result) = size;
|
||||
int32_t offset = sizeof(int32_t);
|
||||
|
||||
// prepare memory
|
||||
SResultRowPosition* pos = &pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.curPos];
|
||||
void* pPage = getBufPage(pSup->pResultBuf, pos->pageId);
|
||||
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pSup->pResultBuf, pPage);
|
||||
|
||||
void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
|
||||
while (pIter) {
|
||||
void* key = taosHashGetKey(pIter, &keyLen);
|
||||
SResultRow** p1 = (SResultRow**)pIter;
|
||||
SResultRowPosition** p1 = (SResultRowPosition**)pIter;
|
||||
|
||||
pPage = (SFilePage*) getBufPage(pSup->pResultBuf, (*p1)->pageId);
|
||||
pRow = (SResultRow*)((char*)pPage + (*p1)->offset);
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pSup->pResultBuf, pPage);
|
||||
|
||||
// recalculate the result size
|
||||
int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
|
||||
|
@ -5057,7 +5051,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(*result);
|
||||
*result = NULL;
|
||||
return;
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
|
||||
} else {
|
||||
*result = tmp;
|
||||
}
|
||||
|
@ -5071,7 +5065,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t
|
|||
// save value
|
||||
*(int32_t*)(*result + offset) = pSup->resultRowSize;
|
||||
offset += sizeof(int32_t);
|
||||
memcpy(*result + offset, *p1, pSup->resultRowSize);
|
||||
memcpy(*result + offset, pRow, pSup->resultRowSize);
|
||||
offset += pSup->resultRowSize;
|
||||
|
||||
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
|
||||
|
@ -5083,34 +5077,11 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t
|
|||
return;
|
||||
}
|
||||
|
||||
static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) {
|
||||
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length) {
|
||||
if (!result || length <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAggSupporter *pSup = NULL;
|
||||
switch(pOperator->operatorType){
|
||||
case QUERY_NODE_PHYSICAL_PLAN_AGG:{
|
||||
SAggOperatorInfo *pAggInfo = pOperator->info;
|
||||
//SOptrBasicInfo *pInfo = &pAggInfo->binfo;
|
||||
pSup = &pAggInfo->aggSup;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{
|
||||
SGroupbyOperatorInfo *pAggInfo = pOperator->info;
|
||||
pSup = &pAggInfo->aggSup;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
|
||||
STableIntervalOperatorInfo *pAggInfo = pOperator->info;
|
||||
pSup = &pAggInfo->aggSup;
|
||||
break;
|
||||
}
|
||||
default:{
|
||||
qDebug("invalid operatorType: %d", pOperator->operatorType);
|
||||
}
|
||||
}
|
||||
|
||||
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
|
||||
int32_t count = *(int32_t*)(result);
|
||||
|
||||
|
@ -5122,17 +5093,16 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l
|
|||
uint64_t tableGroupId = *(uint64_t*)(result + offset);
|
||||
SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
|
||||
if (!resultRow) {
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
return false;
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
|
||||
}
|
||||
// add a new result set for a new group
|
||||
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES);
|
||||
SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
|
||||
|
||||
offset += keyLen;
|
||||
int32_t valueLen = *(int32_t*)(result + offset);
|
||||
if (valueLen != pSup->resultRowSize) {
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
return false;
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
|
||||
}
|
||||
offset += sizeof(int32_t);
|
||||
int32_t pageId = resultRow->pageId;
|
||||
|
@ -5143,12 +5113,13 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l
|
|||
offset += valueLen;
|
||||
|
||||
initResultRow(resultRow);
|
||||
//pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
|
||||
pInfo->resultRowInfo.curPos = pInfo->resultRowInfo.size;
|
||||
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||
}
|
||||
|
||||
if (offset != length) {
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
return false;
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -5375,16 +5346,21 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
|
||||
char *result = NULL;
|
||||
int32_t length = 0;
|
||||
pOperator->encodeResultRow(pOperator, &result, &length);
|
||||
SAggSupporter *pSup = &pInfo->aggSup;
|
||||
taosHashClear(pSup->pResultRowHashTable);
|
||||
pOperator->decodeResultRow(pOperator, result, length);
|
||||
if(result){
|
||||
taosMemoryFree(result);
|
||||
#if 0
|
||||
if(pOperator->encodeResultRow){
|
||||
char *result = NULL;
|
||||
int32_t length = 0;
|
||||
SAggSupporter *pSup = &pInfo->aggSup;
|
||||
pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length);
|
||||
taosHashClear(pSup->pResultRowHashTable);
|
||||
pInfo->binfo.resultRowInfo.size = 0;
|
||||
pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length);
|
||||
if(result){
|
||||
taosMemoryFree(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf,
|
||||
|
@ -6335,6 +6311,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
|||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doStateWindowAgg;
|
||||
pOperator->closeFn = destroyStateWindowOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
@ -6374,6 +6352,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
|||
pOperator->info = pInfo;
|
||||
pOperator->getNextFn = doSessionWindowAgg;
|
||||
pOperator->closeFn = destroySWindowOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
|
@ -165,7 +165,7 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
|
|||
// assign the group keys or user input constant values if required
|
||||
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
if (pCtx[i].functionId == -1) {
|
||||
if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
|
||||
|
||||
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
|
||||
|
@ -337,7 +337,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
pOperator->name = "GroupbyAggOperator";
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
// pOperator->operatorType = OP_Groupby;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUPBY;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
@ -345,6 +345,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->getNextFn = hashGroupbyAggregate;
|
||||
pOperator->closeFn = destroyGroupOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
|
Loading…
Reference in New Issue