From 0649e662d3b16684575337d5ccc428522550318b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Mar 2022 17:24:01 +0800 Subject: [PATCH] add encode/decode resultRow function --- source/libs/executor/inc/executorimpl.h | 4 + source/libs/executor/src/executorimpl.c | 107 ++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 04e24dcc79..2d7e2eedc6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -243,6 +243,8 @@ typedef struct STaskAttr { struct SOperatorInfo; +typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length); +typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup); typedef void (*__optr_close_fn_t)(void* param, int32_t num); @@ -332,6 +334,8 @@ typedef struct SOperatorInfo { __optr_fn_t cleanupFn; __optr_close_fn_t closeFn; __optr_open_fn_t _openFn; // DO NOT invoke this function directly + __optr_encode_fn_t encodeResultRow; // + __optr_decode_fn_t decodeResultRow; } SOperatorInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 61440ce803..2417bd352a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6391,6 +6391,111 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL; } +static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) { + SAggOperatorInfo *pAggInfo = pOperator->info; + SAggSupporter *pSup = &pAggInfo->aggSup; + + int32_t size = taosHashGetSize(pSup->pResultRowHashTable); + size_t keyLen = POINTER_BYTES; // estimate the key length + int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); + *result = calloc(1, totalSize); + if(*result == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + return; + } + *(int32_t*)(*result) = size; + int32_t offset = sizeof(int32_t); + void *pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); + while (pIter) { + void *key = taosHashGetKey(pIter, &keyLen); + SResultRow **p1 = (SResultRow **)pIter; + + // recalculate the result size + int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize; + if (realTotalSize > totalSize){ + char *tmp = realloc(*result, realTotalSize); + if (tmp == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + free(*result); + *result = NULL; + return; + }else{ + *result = tmp; + } + } + // save key + *(int32_t*)(*result + offset) = keyLen; + offset += sizeof(int32_t); + memcpy(*result + offset, key, keyLen); + offset += keyLen; + + // save value + *(int32_t*)(*result + offset) = pSup->resultRowSize; + offset += sizeof(int32_t); + memcpy(*result + offset, *p1, pSup->resultRowSize); + offset += pSup->resultRowSize; + + pIter = taosHashIterate(pSup->pResultRowHashTable, pIter); + } + + if(length) { + *length = offset; + } + return; +} + +static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t length) { + if (!result || length <= 0){ + return false; + } + + SAggOperatorInfo *pAggInfo = pOperator->info; + SAggSupporter *pSup = &pAggInfo->aggSup; + SOptrBasicInfo *pInfo = &pAggInfo->binfo; + + // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); + int32_t count = *(int32_t*)(result); + + int32_t offset = sizeof(int32_t); + while(count-- > 0 && length > offset){ + int32_t keyLen = *(int32_t*)(result + offset); + offset += sizeof(int32_t); + + uint64_t tableGroupId = *(uint64_t *)(result + offset); + SResultRow *resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); + if (!resultRow){ + terrno = TSDB_CODE_TSC_INVALID_INPUT; + return false; + } + // add a new result set for a new group + taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES); + + offset += keyLen; + int32_t valueLen = *(int32_t*)(result + offset); + if (valueLen != pSup->resultRowSize){ + terrno = TSDB_CODE_TSC_INVALID_INPUT; + return false; + } + offset += sizeof(int32_t); + int32_t pageId = resultRow->pageId; + int32_t pOffset = resultRow->offset; + memcpy(resultRow, result + offset, valueLen); + resultRow->pageId = pageId; + resultRow->offset = pOffset; + offset += valueLen; + + initResultRow(resultRow); + + pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; + } + + if (offset != length){ + terrno = TSDB_CODE_TSC_INVALID_INPUT; + return false; + } + return true; +} + static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -7312,6 +7417,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->_openFn = doOpenAggregateOptr; pOperator->getNextFn = getAggregateResult; pOperator->closeFn = destroyAggOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) {