diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9ea327636d..88f4bdbd3d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -156,13 +156,11 @@ typedef struct STaskAttr { } STaskAttr; struct SOperatorInfo; -struct SAggSupporter; -struct SOptrBasicInfo; +//struct SAggSupporter; +//struct SOptrBasicInfo; -typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup, - struct SOptrBasicInfo* pInfo, char** result, int32_t* length); -typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup, - struct SOptrBasicInfo* pInfo, char* result, int32_t length); +typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); +typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result, int32_t length); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); @@ -445,14 +443,16 @@ typedef struct STimeWindowSupp { } STimeWindowAggSupp; typedef struct SIntervalAggOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info + SAggSupporter aggSup; // aggregate supporter + SGroupResInfo groupResInfo; // multiple results build supporter SInterval interval; // interval info int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. STimeWindow win; // query time range bool timeWindowInterpo; // interpolation needed or not char** pRow; // previous row/tuple of already processed datablock - SAggSupporter aggSup; // aggregate supporter STableQueryInfo* pCurrent; // current tableQueryInfo struct int32_t order; // current SSDataBlock scan order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -463,19 +463,23 @@ typedef struct SIntervalAggOperatorInfo { } SIntervalAggOperatorInfo; typedef struct SStreamFinalIntervalOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info + SAggSupporter aggSup; // aggregate supporter + SGroupResInfo groupResInfo; // multiple results build supporter SInterval interval; // interval info int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - SAggSupporter aggSup; // aggregate supporter int32_t order; // current SSDataBlock scan order STimeWindowAggSupp twAggSup; SArray* pChildren; } SStreamFinalIntervalOperatorInfo; typedef struct SAggOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; + STableQueryInfo *current; uint64_t groupId; SGroupResInfo groupResInfo; @@ -488,8 +492,10 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; typedef struct SProjectOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; + SSDataBlock* existDataBlock; SArray* pPseudoColInfo; SLimit limit; @@ -513,7 +519,10 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; + SAggSupporter aggSup; + SArray* pGroupCols; // group by columns, SArray SArray* pGroupColVals; // current group column values, SArray SNode* pCondition; @@ -521,7 +530,6 @@ typedef struct SGroupbyOperatorInfo { char* keyBuf; // group by keys for hash int32_t groupKeyLen; // total group by column width SGroupResInfo groupResInfo; - SAggSupporter aggSup; SExprInfo* pScalarExprInfo; int32_t numOfScalarExpr; // the number of scalar expression in group operator SqlFunctionCtx* pScalarFuncCtx; @@ -558,8 +566,10 @@ typedef struct SWindowRowsSup { } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; + SGroupResInfo groupResInfo; SWindowRowsSup winSup; bool reptScan; // next round scan @@ -598,8 +608,10 @@ typedef struct STimeSliceOperatorInfo { } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; + SGroupResInfo groupResInfo; SWindowRowsSup winSup; SColumn stateCol; // start row index @@ -611,8 +623,10 @@ typedef struct SStateWindowOperatorInfo { } SStateWindowOperatorInfo; typedef struct SSortedMergeOperatorInfo { - + // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; + SAggSupporter aggSup; + SArray* pSortInfo; int32_t numOfSources; SSortHandle *pSortHandle; @@ -624,7 +638,6 @@ typedef struct SSortedMergeOperatorInfo { int32_t numOfResPerPage; char** groupVal; SArray *groupInfo; - SAggSupporter aggSup; } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { @@ -786,16 +799,31 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); int32_t getMaximumIdleDurationSec(); +/* + * ops: root operator + * data: *data save the result of encode, need to be freed by caller + * length: *length save the length of *data + * return: result code, 0 means success + */ +int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length); + +/* + * ops: root operator, created by caller + * data: save the result of decode + * length: the length of data + * return: result code, 0 means success + */ +int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length); + void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t* resNum); -bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result, - int32_t length); -void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, - int32_t* length); +int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length); +int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); + STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t precision, STimeWindow* win); int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 339f1f261c..ce46573830 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3498,17 +3498,24 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pInfo->pRes; } -void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, - int32_t* length) { +int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) { + if(result == NULL || length == NULL){ + return TSDB_CODE_TSC_INVALID_INPUT; + } + SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); + SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); int32_t size = taosHashGetSize(pSup->pResultRowHashTable); size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length - int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); - *result = taosMemoryCalloc(1, totalSize); + int32_t totalSize = sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); + + *result = (char*)taosMemoryCalloc(1, totalSize); if (*result == NULL) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + return TSDB_CODE_OUT_OF_MEMORY; } - *(int32_t*)(*result) = size; + int32_t offset = sizeof(int32_t); + *(int32_t*)(*result + offset) = size; + offset += sizeof(int32_t); // prepare memory SResultRowPosition* pos = &pInfo->resultRowInfo.cur; @@ -3530,12 +3537,11 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi // recalculate the result size int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize; if (realTotalSize > totalSize) { - char* tmp = taosMemoryRealloc(*result, realTotalSize); + char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize); if (tmp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(*result); *result = NULL; - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + return TSDB_CODE_OUT_OF_MEMORY; } else { *result = tmp; } @@ -3555,17 +3561,18 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi pIter = taosHashIterate(pSup->pResultRowHashTable, pIter); } - if (length) { - *length = offset; - } - return; + *(int32_t*)(*result) = offset; + *length = offset; + + return TDB_CODE_SUCCESS; } -bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result, - int32_t length) { - if (!result || length <= 0) { - return false; +int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) { + if(result == NULL || length <= 0){ + return TSDB_CODE_TSC_INVALID_INPUT; } + SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); + SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t count = *(int32_t*)(result); @@ -3578,7 +3585,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi uint64_t tableGroupId = *(uint64_t*)(result + offset); SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); if (!resultRow) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); + return TSDB_CODE_TSC_INVALID_INPUT; } // add a new result set for a new group @@ -3588,7 +3595,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi offset += keyLen; int32_t valueLen = *(int32_t*)(result + offset); if (valueLen != pSup->resultRowSize) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); + return TSDB_CODE_TSC_INVALID_INPUT; } offset += sizeof(int32_t); int32_t pageId = resultRow->pageId; @@ -3607,9 +3614,9 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi } if (offset != length) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); + return TSDB_CODE_TSC_INVALID_INPUT; } - return true; + return TDB_CODE_SUCCESS; } enum { @@ -4986,6 +4993,91 @@ _error: return NULL; } +int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){ + int32_t code = TDB_CODE_SUCCESS; + char *pCurrent = NULL; + int32_t currLength = 0; + if(ops->fpSet.encodeResultRow){ + if(result == NULL || length == NULL){ + return TSDB_CODE_TSC_INVALID_INPUT; + } + code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength); + + if(code != TDB_CODE_SUCCESS){ + if(*result != NULL){ + taosMemoryFree(*result); + *result = NULL; + } + return code; + } + + if(*result == NULL){ + *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t)); + if (*result == NULL) { + taosMemoryFree(pCurrent); + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(*result + sizeof(int32_t), pCurrent, currLength); + *(int32_t*)(*result) = currLength + sizeof(int32_t); + }else{ + int32_t sizePre = *(int32_t*)(*result); + char* tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength); + if (tmp == NULL) { + taosMemoryFree(pCurrent); + taosMemoryFree(*result); + *result = NULL; + return TSDB_CODE_OUT_OF_MEMORY; + } + *result = tmp; + memcpy(*result + sizePre, pCurrent, currLength); + *(int32_t*)(*result) += currLength; + } + taosMemoryFree(pCurrent); + *length = *(int32_t*)(*result); + } + + for (int32_t i = 0; i < ops->numOfDownstream; ++i) { + code = encodeOperator(ops->pDownstream[i], result, length); + if(code != TDB_CODE_SUCCESS){ + return code; + } + } + return TDB_CODE_SUCCESS; +} + +int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){ + int32_t code = TDB_CODE_SUCCESS; + if(ops->fpSet.decodeResultRow){ + if(result == NULL || length <= 0){ + return TSDB_CODE_TSC_INVALID_INPUT; + } + char* data = result + 2 * sizeof(int32_t); + int32_t dataLength = *(int32_t*)(result + sizeof(int32_t)); + code = ops->fpSet.decodeResultRow(ops, data, dataLength - sizeof(int32_t)); + if(code != TDB_CODE_SUCCESS){ + return code; + } + + int32_t totalLength = *(int32_t*)result; + if(totalLength == dataLength + sizeof(int32_t)) { // the last data + result = NULL; + length = 0; + }else{ + result += dataLength; + *(int32_t*)(result) = totalLength - dataLength; + length = totalLength - dataLength; + } + } + + for (int32_t i = 0; i < ops->numOfDownstream; ++i) { + code = decodeOperator(ops->pDownstream[i], result, length); + if(code != TDB_CODE_SUCCESS){ + return code; + } + } + return TDB_CODE_SUCCESS; +} + int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model) { uint64_t queryId = pPlan->id.queryId;