diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 10ba58af29..0b59e9b6cc 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -76,22 +76,22 @@ void deltaToUtcInitOnce() { static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); -static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); -static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec); +static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); +static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); static char* forwardToTimeStringEnd(char* str); static bool checkTzPresent(const char* str, int32_t len); -static int32_t (*parseLocaltimeFp[])(char* timestr, int64_t* time, int32_t timePrec) = {parseLocaltime, +static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime, parseLocaltimeDst}; -int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { +int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { - return parseTimeWithTz(timestr, time, timePrec, 'T'); + return parseTimeWithTz(timestr, utime, timePrec, 'T'); } else if (checkTzPresent(timestr, len)) { - return parseTimeWithTz(timestr, time, timePrec, 0); + return parseTimeWithTz(timestr, utime, timePrec, 0); } else { - return (*parseLocaltimeFp[day_light])((char*)timestr, time, timePrec); + return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec); } } @@ -309,12 +309,36 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch return 0; } -int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { +static FORCE_INLINE bool validateTm(struct tm* pTm) { + if (pTm == NULL) { + return false; + } + + int32_t dayOfMonth[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; + + int32_t leapYearMonthDay = 29; + int32_t year = pTm->tm_year + 1900; + bool isLeapYear = ((year % 100) == 0)? ((year % 400) == 0):((year % 4) == 0); + + if (isLeapYear && (pTm->tm_mon == 1)) { + if (pTm->tm_mday > leapYearMonthDay) { + return false; + } + } else { + if (pTm->tm_mday > dayOfMonth[pTm->tm_mon]) { + return false; + } + } + + return true; +} + +int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL) { + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { return -1; } @@ -343,13 +367,13 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { return 0; } -int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { +int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; tm.tm_isdst = -1; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL) { + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c07aed86b0..3086f662d3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3665,7 +3665,7 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData) { size_t cols = taosArrayGetSize(pColumnInfoData); for (int32_t i = 0; i < cols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i); - taosMemoryFreeClear(pColInfo->pData); + colDataDestroy(pColInfo); } taosArrayDestroy(pColumnInfoData); @@ -3697,6 +3697,7 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo); taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis); taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist); + taosMemoryFree(pTsdbReadHandle->suppInfo.slotIds); if (!emptyQueryTimewindow(pTsdbReadHandle)) { // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); @@ -3723,5 +3724,7 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr); + taosMemoryFree(pTsdbReadHandle->idStr); + taosMemoryFree(pTsdbReadHandle->pSchema); taosMemoryFreeClear(pTsdbReadHandle); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 265f968d24..94c6512e77 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -781,6 +781,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); +void clearupQueryTableDataCond(SQueryTableDataCond* pCond); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5a68a9d7e1..c68b94d9aa 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2562,7 +2562,8 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray } if (p->info.colId == pmInfo->colId) { - taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId); + colDataAssign(pDst, p, pBlock->info.rows); i++; j++; } else if (p->info.colId < pmInfo->colId) { @@ -4906,6 +4907,11 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi return TSDB_CODE_SUCCESS; } +void clearupQueryTableDataCond(SQueryTableDataCond* pCond) { + taosMemoryFree(pCond->twindows); + taosMemoryFree(pCond->colList); +} + SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { SColumn c = {0}; c.slotId = pColNode->slotId; @@ -5111,7 +5117,10 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* goto _error; } - return tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + clearupQueryTableDataCond(&cond); + + return pReader; _error: terrno = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b5f8ad0ae2..8fb4878cd9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -525,7 +525,9 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; - taosMemoryFree(pTableScanInfo->pResBlock); + blockDataDestroy(pTableScanInfo->pResBlock); + clearupQueryTableDataCond(&pTableScanInfo->cond); + tsdbCleanupReadHandle(pTableScanInfo->dataReader); taosArrayDestroy(pTableScanInfo->pGroupCols); @@ -1439,16 +1441,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pRsp->numOfRows, pInfo->loadInfo.totalRows); if (pRsp->numOfRows == 0) { + + taosMemoryFree(pRsp); return NULL; } } - SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; - setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, + setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info doFilterResult(pInfo); + taosMemoryFree(pRsp); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 8274a33eb1..9fe6584a90 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -155,6 +155,7 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunction(SqlFunctionCtx* pCtx); +int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a1aa1a775c..ba54d1bb39 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1754,12 +1754,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "sample", .type = FUNCTION_TYPE_SAMPLE, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateSample, .getEnvFunc = getSampleFuncEnv, .initFunc = sampleFunctionSetup, .processFunc = sampleFunction, - .finalizeFunc = NULL + .finalizeFunc = sampleFinalize }, { .name = "tail", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index a92126b1a1..085824b29d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -198,7 +198,7 @@ typedef struct SSampleInfo { int32_t numSampled; uint8_t colType; int16_t colBytes; - char *data; + char *data; int64_t *timestamp; } SSampleInfo; @@ -4183,26 +4183,38 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + TSKEY* tsList = NULL; + if (pInput->pPTS != NULL) { + tsList = (int64_t*)pInput->pPTS->pData; + } SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - - int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_s(pInputCol, i)) { - //colDataAppendNULL(pOutput, i); continue; } char* data = colDataGetData(pInputCol, i); - doReservoirSample(pInfo, data, tsList[i], i); + doReservoirSample(pInfo, data, /*tsList[i]*/0, i); } + SET_VAL(pResInfo, pInfo->numSampled, pInfo->numSampled); + return TSDB_CODE_SUCCESS; +} + +int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + + SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + pEntryInfo->complete = true; + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + int32_t currentRow = pBlock->info.rows; for (int32_t i = 0; i < pInfo->numSampled; ++i) { - int32_t pos = startOffset + i; - colDataAppend(pOutput, pos, pInfo->data + i * pInfo->colBytes, false); - //TODO: handle ts output + colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); } return pInfo->numSampled;