From 90bdf06a9ab3f97cfdd08e17ac09d468d3c3ac86 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Jun 2022 20:07:17 +0800 Subject: [PATCH 1/6] fix(query): add day of month check for the input timestamp. --- source/common/src/ttime.c | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 10ba58af29..d2ee289cda 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -309,12 +309,36 @@ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, ch return 0; } +static FORCE_INLINE bool validateTm(struct tm* pTm) { + if (pTm == NULL) { + return false; + } + + int32_t dayOfMonth[12] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; + + int32_t leapYearMonthDay = 29; + int32_t year = pTm->tm_year + 1900; + bool isLeapYear = ((year % 100) == 0)? ((year % 400) == 0):((year % 4) == 0); + + if (isLeapYear && (pTm->tm_mon == 1)) { + if (pTm->tm_mday > leapYearMonthDay) { + return false; + } + } else { + if (pTm->tm_mday > dayOfMonth[pTm->tm_mon]) { + return false; + } + } + + return true; +} + int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL) { + if (str == NULL || !validateTm(&tm)) { return -1; } @@ -349,7 +373,7 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { tm.tm_isdst = -1; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL) { + if (str == NULL || !validateTm(&tm)) { return -1; } From f5dacec38d9e3fb2fafeef28b531c6a9341b3f47 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 12 Jun 2022 00:08:01 +0800 Subject: [PATCH 2/6] fix(query): add more check for timestamp in converting. --- source/common/src/ttime.c | 22 +++++++++++----------- source/libs/function/src/builtinsimpl.c | 6 ++++-- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index d2ee289cda..afae6b860c 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -76,22 +76,22 @@ void deltaToUtcInitOnce() { static int64_t parseFraction(char* str, char** end, int32_t timePrec); static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim); -static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); -static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec); +static int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); +static int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec); static char* forwardToTimeStringEnd(char* str); static bool checkTzPresent(const char* str, int32_t len); -static int32_t (*parseLocaltimeFp[])(char* timestr, int64_t* time, int32_t timePrec) = {parseLocaltime, +static int32_t (*parseLocaltimeFp[])(char* timestr, int32_t len, int64_t* utime, int32_t timePrec) = {parseLocaltime, parseLocaltimeDst}; -int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { +int32_t taosParseTime(const char* timestr, int64_t* utime, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { - return parseTimeWithTz(timestr, time, timePrec, 'T'); + return parseTimeWithTz(timestr, utime, timePrec, 'T'); } else if (checkTzPresent(timestr, len)) { - return parseTimeWithTz(timestr, time, timePrec, 0); + return parseTimeWithTz(timestr, utime, timePrec, 0); } else { - return (*parseLocaltimeFp[day_light])((char*)timestr, time, timePrec); + return (*parseLocaltimeFp[day_light])((char*)timestr, len, utime, timePrec); } } @@ -333,12 +333,12 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) { return true; } -int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { +int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL || !validateTm(&tm)) { + if (str == NULL || ((str - timestr) < len) || !validateTm(&tm)) { return -1; } @@ -367,13 +367,13 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) { return 0; } -int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { +int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec) { *time = 0; struct tm tm = {0}; tm.tm_isdst = -1; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL || !validateTm(&tm)) { + if (str == NULL || ((str - timestr) < len) || !validateTm(&tm)) { return -1; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index a92126b1a1..c7e2e3b9b5 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4188,6 +4188,8 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + int32_t alreadySampled = pInfo->numSampled; + int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_s(pInputCol, i)) { @@ -4199,13 +4201,13 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { doReservoirSample(pInfo, data, tsList[i], i); } - for (int32_t i = 0; i < pInfo->numSampled; ++i) { + for (int32_t i = alreadySampled; i < pInfo->numSampled; ++i) { int32_t pos = startOffset + i; colDataAppend(pOutput, pos, pInfo->data + i * pInfo->colBytes, false); //TODO: handle ts output } - return pInfo->numSampled; + return pInfo->numSampled - alreadySampled; } bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { From c2e8aa9659e5e113251c637da44fd97baba2508a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 12 Jun 2022 00:31:34 +0800 Subject: [PATCH 3/6] refactor: do some internal refactor. --- source/common/src/ttime.c | 4 ++-- source/libs/function/src/builtinsimpl.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index afae6b860c..0b59e9b6cc 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -338,7 +338,7 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr struct tm tm = {0}; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL || ((str - timestr) < len) || !validateTm(&tm)) { + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { return -1; } @@ -373,7 +373,7 @@ int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t tim tm.tm_isdst = -1; char* str = taosStrpTime(timestr, "%Y-%m-%d %H:%M:%S", &tm); - if (str == NULL || ((str - timestr) < len) || !validateTm(&tm)) { + if (str == NULL || (((str - timestr) < len) && (*str != '.')) || !validateTm(&tm)) { return -1; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c7e2e3b9b5..b2b3714bec 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4201,13 +4201,13 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { doReservoirSample(pInfo, data, tsList[i], i); } - for (int32_t i = alreadySampled; i < pInfo->numSampled; ++i) { + for (int32_t i = 0; i < pInfo->numSampled; ++i) { int32_t pos = startOffset + i; colDataAppend(pOutput, pos, pInfo->data + i * pInfo->colBytes, false); //TODO: handle ts output } - return pInfo->numSampled - alreadySampled; + return pInfo->numSampled; } bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { From d0dd451494fc1ae2fc963eee61063a957f994347 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 12 Jun 2022 15:25:37 +0800 Subject: [PATCH 4/6] refactor: do some internal refactor of sample function. --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 4 +-- source/libs/function/src/builtinsimpl.c | 34 ++++++++++++++++--------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 8274a33eb1..9fe6584a90 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -155,6 +155,7 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunction(SqlFunctionCtx* pCtx); +int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index ca586a79c9..607d14cd3c 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1751,12 +1751,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "sample", .type = FUNCTION_TYPE_SAMPLE, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateSample, .getEnvFunc = getSampleFuncEnv, .initFunc = sampleFunctionSetup, .processFunc = sampleFunction, - .finalizeFunc = NULL + .finalizeFunc = sampleFinalize }, { .name = "tail", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b2b3714bec..085824b29d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -198,7 +198,7 @@ typedef struct SSampleInfo { int32_t numSampled; uint8_t colType; int16_t colBytes; - char *data; + char *data; int64_t *timestamp; } SSampleInfo; @@ -4183,28 +4183,38 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + TSKEY* tsList = NULL; + if (pInput->pPTS != NULL) { + tsList = (int64_t*)pInput->pPTS->pData; + } SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - - int32_t alreadySampled = pInfo->numSampled; - - int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { if (colDataIsNull_s(pInputCol, i)) { - //colDataAppendNULL(pOutput, i); continue; } char* data = colDataGetData(pInputCol, i); - doReservoirSample(pInfo, data, tsList[i], i); + doReservoirSample(pInfo, data, /*tsList[i]*/0, i); } + SET_VAL(pResInfo, pInfo->numSampled, pInfo->numSampled); + return TSDB_CODE_SUCCESS; +} + +int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + + SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + pEntryInfo->complete = true; + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + int32_t currentRow = pBlock->info.rows; for (int32_t i = 0; i < pInfo->numSampled; ++i) { - int32_t pos = startOffset + i; - colDataAppend(pOutput, pos, pInfo->data + i * pInfo->colBytes, false); - //TODO: handle ts output + colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); } return pInfo->numSampled; From 1ad2b5c92b0afd6df3f06befa99f7586dfd57f09 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 12 Jun 2022 23:37:29 +0800 Subject: [PATCH 5/6] fix(query): fix memory leak. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 10 +++++++++- source/libs/executor/src/scanoperator.c | 8 ++++++-- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 265f968d24..94c6512e77 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -781,6 +781,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); +void clearupQueryTableDataCond(SQueryTableDataCond* pCond); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5a68a9d7e1..e331564c92 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4906,6 +4906,11 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi return TSDB_CODE_SUCCESS; } +void clearupQueryTableDataCond(SQueryTableDataCond* pCond) { + taosMemoryFree(pCond->twindows); + taosMemoryFree(pCond->colList); +} + SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { SColumn c = {0}; c.slotId = pColNode->slotId; @@ -5111,7 +5116,10 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* goto _error; } - return tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + clearupQueryTableDataCond(&cond); + + return pReader; _error: terrno = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b5f8ad0ae2..847ec32653 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -526,6 +526,8 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; taosMemoryFree(pTableScanInfo->pResBlock); + clearupQueryTableDataCond(&pTableScanInfo->cond); + tsdbCleanupReadHandle(pTableScanInfo->dataReader); taosArrayDestroy(pTableScanInfo->pGroupCols); @@ -1439,16 +1441,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pRsp->numOfRows, pInfo->loadInfo.totalRows); if (pRsp->numOfRows == 0) { + + taosMemoryFree(pRsp); return NULL; } } - SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; - setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, + setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info doFilterResult(pInfo); + taosMemoryFree(pRsp); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } From 0b8dd61f4139d6a974ac61fba3ca99784d146da9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 Jun 2022 09:33:08 +0800 Subject: [PATCH 6/6] fix(query): fix some memory leaks. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 5 ++++- source/libs/executor/src/executorimpl.c | 3 ++- source/libs/executor/src/scanoperator.c | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c07aed86b0..3086f662d3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3665,7 +3665,7 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData) { size_t cols = taosArrayGetSize(pColumnInfoData); for (int32_t i = 0; i < cols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i); - taosMemoryFreeClear(pColInfo->pData); + colDataDestroy(pColInfo); } taosArrayDestroy(pColumnInfoData); @@ -3697,6 +3697,7 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { taosMemoryFreeClear(pTsdbReadHandle->pDataBlockInfo); taosMemoryFreeClear(pTsdbReadHandle->suppInfo.pstatis); taosMemoryFreeClear(pTsdbReadHandle->suppInfo.plist); + taosMemoryFree(pTsdbReadHandle->suppInfo.slotIds); if (!emptyQueryTimewindow(pTsdbReadHandle)) { // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); @@ -3723,5 +3724,7 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr); + taosMemoryFree(pTsdbReadHandle->idStr); + taosMemoryFree(pTsdbReadHandle->pSchema); taosMemoryFreeClear(pTsdbReadHandle); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e331564c92..c68b94d9aa 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2562,7 +2562,8 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray } if (p->info.colId == pmInfo->colId) { - taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId); + colDataAssign(pDst, p, pBlock->info.rows); i++; j++; } else if (p->info.colId < pmInfo->colId) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 847ec32653..8fb4878cd9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -525,7 +525,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; - taosMemoryFree(pTableScanInfo->pResBlock); + blockDataDestroy(pTableScanInfo->pResBlock); clearupQueryTableDataCond(&pTableScanInfo->cond); tsdbCleanupReadHandle(pTableScanInfo->dataReader);