From d8b8ece9c13b9ed904b97f1f4db39efd0eef3f7c Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 29 Dec 2022 18:12:29 +0800 Subject: [PATCH 01/10] fix(query): fix floating type handle sma error --- source/libs/function/src/builtinsimpl.c | 2 +- source/libs/function/src/detail/tminmax.c | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index a7b12bfcc4..0019669428 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -784,7 +784,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pEntryInfo->isNullRes = (pEntryInfo->numOfRes == 0) ? 1 : 0; if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = *(float*)&pRes->v; + float v = GET_DOUBLE_VAL(&pRes->v); colDataAppend(pCol, currentRow, (const char*)&v, pEntryInfo->isNullRes); } else { colDataAppend(pCol, currentRow, (const char*)&pRes->v, pEntryInfo->isNullRes); diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index cb5cea3cc8..b919dc1123 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -737,6 +737,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { pBuf->v = *(int64_t*)tval; + if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { @@ -788,11 +789,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } } else if (type == TSDB_DATA_TYPE_FLOAT) { float prev = 0; - GET_TYPED_DATA(prev, float, type, &pBuf->v); + GET_TYPED_DATA(prev, float, TSDB_DATA_TYPE_DOUBLE, &pBuf->v); float val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(float*)&pBuf->v = val; + *(double*)&pBuf->v = GET_DOUBLE_VAL(tval); } if (pCtx->subsidiaries.num > 0) { @@ -888,4 +889,4 @@ _over: } return numOfElems; -} \ No newline at end of file +} From e105f9c0d29aa9c0065f7413f928bb4fbff94bc0 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 30 Dec 2022 09:16:30 +0800 Subject: [PATCH 02/10] store sma result as float in buf --- source/libs/function/src/builtinsimpl.c | 2 +- source/libs/function/src/detail/tminmax.c | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0019669428..16068eb504 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -784,7 +784,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pEntryInfo->isNullRes = (pEntryInfo->numOfRes == 0) ? 1 : 0; if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = GET_DOUBLE_VAL(&pRes->v); + float v = GET_FLOAT_VAL(&pRes->v); colDataAppend(pCol, currentRow, (const char*)&v, pEntryInfo->isNullRes); } else { colDataAppend(pCol, currentRow, (const char*)&pRes->v, pEntryInfo->isNullRes); diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index b919dc1123..ced491ac5e 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -736,7 +736,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } if (!pBuf->assign) { - pBuf->v = *(int64_t*)tval; + if (type == TSDB_DATA_TYPE_FLOAT) { + GET_FLOAT_VAL(&pBuf->v) = GET_DOUBLE_VAL(tval); + } else { + pBuf->v = *(int64_t*)tval; + } if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); @@ -789,11 +793,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } } else if (type == TSDB_DATA_TYPE_FLOAT) { float prev = 0; - GET_TYPED_DATA(prev, float, TSDB_DATA_TYPE_DOUBLE, &pBuf->v); + GET_TYPED_DATA(prev, float, type, &pBuf->v); float val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(double*)&pBuf->v = GET_DOUBLE_VAL(tval); + *(float*)&pBuf->v = val; } if (pCtx->subsidiaries.num > 0) { From 73a95d3cdaa7c4e67496d4948463cc84b3d641e8 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 30 Dec 2022 09:32:18 +0800 Subject: [PATCH 03/10] fix some format --- source/libs/function/src/detail/tminmax.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index ced491ac5e..7fb5e2bebe 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -358,7 +358,7 @@ static double doubleVectorCmpAVX(const double* pData, int32_t numOfRows, bool is static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows) { int32_t i = start; - + while (i < (start + numOfRows) && (colDataIsNull_f(pCol->nullbitmap, i) == true)) { i += 1; } @@ -739,7 +739,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (type == TSDB_DATA_TYPE_FLOAT) { GET_FLOAT_VAL(&pBuf->v) = GET_DOUBLE_VAL(tval); } else { - pBuf->v = *(int64_t*)tval; + pBuf->v = GET_INT64_VAL(tval); } if (pCtx->subsidiaries.num > 0) { @@ -755,7 +755,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(int64_t*)&pBuf->v = val; + GET_INT64_VAL(&pBuf->v) = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { @@ -769,7 +769,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(uint64_t*)&pBuf->v = val; + GET_UINT64_VAL(&pBuf->v) = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { @@ -783,7 +783,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { double val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(double*)&pBuf->v = val; + GET_DOUBLE_VAL(&pBuf->v) = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { @@ -797,7 +797,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { float val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { - *(float*)&pBuf->v = val; + GET_FLOAT_VAL(&pBuf->v) = val; } if (pCtx->subsidiaries.num > 0) { From 321d2d678701fcd5d6fa45172aa766791440781a Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 3 Jan 2023 11:10:42 +0800 Subject: [PATCH 04/10] add sma test cases --- tests/system-test/2-query/blockSMA.py | 146 ++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 tests/system-test/2-query/blockSMA.py diff --git a/tests/system-test/2-query/blockSMA.py b/tests/system-test/2-query/blockSMA.py new file mode 100644 index 0000000000..85c0189e27 --- /dev/null +++ b/tests/system-test/2-query/blockSMA.py @@ -0,0 +1,146 @@ +from wsgiref.headers import tspecials +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + self.rowNum = 10000 + self.ts = 1537146000000 + + def run(self): + dbname = "db" + tdSql.prepare() + + tdSql.execute(f'''create table {dbname}.ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, + col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') + for i in range(self.rowNum): + tdSql.execute(f"insert into {dbname}.ntb values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" + % (self.ts + i, i % 127 + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i % 255 + 1, i + 1, i + 1, i + 1)) + + + tdSql.execute('flush database db') + + # test functions using sma result + tdSql.query(f"select count(col1),min(col1),max(col1),avg(col1),sum(col1),spread(col1),percentile(col1, 0),first(col1),last(col1) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 127) + tdSql.checkData(0, 3, 63.8449) + tdSql.checkData(0, 4, 638449) + tdSql.checkData(0, 5, 126.0) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 94) + + tdSql.query(f"select count(col2),min(col2),max(col2),avg(col2),sum(col2),spread(col2),percentile(col2, 0),first(col2),last(col2) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 10000) + tdSql.checkData(0, 3, 5000.5) + tdSql.checkData(0, 4, 50005000) + tdSql.checkData(0, 5, 9999.0) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 10000) + + tdSql.query(f"select count(col3),min(col3),max(col3),avg(col3),sum(col3),spread(col3),percentile(col3, 0),first(col3),last(col3) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 10000) + tdSql.checkData(0, 3, 5000.5) + tdSql.checkData(0, 4, 50005000) + tdSql.checkData(0, 5, 9999.0) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 10000) + + tdSql.query(f"select count(col4),min(col4),max(col4),avg(col4),sum(col4),spread(col4),percentile(col4, 0),first(col4),last(col4) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 10000) + tdSql.checkData(0, 3, 5000.5) + tdSql.checkData(0, 4, 50005000) + tdSql.checkData(0, 5, 9999.0) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 10000) + + tdSql.query(f"select count(col5),min(col5),max(col5),avg(col5),sum(col5),spread(col5),percentile(col5, 0),first(col5),last(col5) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 0.1) + tdSql.checkData(0, 2, 9999.09961) + tdSql.checkData(0, 3, 4999.599985846) + tdSql.checkData(0, 4, 49995999.858455874) + tdSql.checkData(0, 5, 9998.999609374) + tdSql.checkData(0, 6, 0.100000001) + tdSql.checkData(0, 7, 0.1) + tdSql.checkData(0, 8, 9999.09961) + + tdSql.query(f"select count(col6),min(col6),max(col6),avg(col6),sum(col6),spread(col6),percentile(col6, 0),first(col6),last(col6) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 0.1) + tdSql.checkData(0, 2, 9999.100000000) + tdSql.checkData(0, 3, 4999.600000001) + tdSql.checkData(0, 4, 49996000.000005305) + tdSql.checkData(0, 5, 9999.000000000) + tdSql.checkData(0, 6, 0.1) + tdSql.checkData(0, 7, 0.1) + tdSql.checkData(0, 8, 9999.1) + + tdSql.query(f"select count(col11),min(col11),max(col11),avg(col11),sum(col11),spread(col11),percentile(col11, 0),first(col11),last(col11) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 255) + tdSql.checkData(0, 3, 127.45) + tdSql.checkData(0, 4, 1274500) + tdSql.checkData(0, 5, 254.000000000) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 55) + + tdSql.query(f"select count(col12),min(col12),max(col12),avg(col12),sum(col12),spread(col12),percentile(col12, 0),first(col12),last(col12) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 10000) + tdSql.checkData(0, 3, 5000.5) + tdSql.checkData(0, 4, 50005000) + tdSql.checkData(0, 5, 9999.0) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 10000) + + tdSql.query(f"select count(col13),min(col13),max(col13),avg(col13),sum(col13),spread(col13),percentile(col13, 0),first(col13),last(col13) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 10000) + tdSql.checkData(0, 3, 5000.5) + tdSql.checkData(0, 4, 50005000) + tdSql.checkData(0, 5, 9999.0) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 10000) + + tdSql.query(f"select count(col14),min(col14),max(col14),avg(col14),sum(col14),spread(col14),percentile(col14, 0),first(col14),last(col14) from {dbname}.ntb") + tdSql.checkData(0, 0, 10000) + tdSql.checkData(0, 1, 1) + tdSql.checkData(0, 2, 10000) + tdSql.checkData(0, 3, 5000.5) + tdSql.checkData(0, 4, 50005000) + tdSql.checkData(0, 5, 9999.0) + tdSql.checkData(0, 6, 1.0) + tdSql.checkData(0, 7, 1) + tdSql.checkData(0, 8, 10000) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From 0a0bb2e697adad03fa78ba2da3bd85130e0d2fd8 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 3 Jan 2023 11:10:53 +0800 Subject: [PATCH 05/10] add test cases --- tests/parallel_test/cases.task | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index a912e925ec..771e09c05e 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -616,6 +616,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/tb_100w_data_order.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py @@ -831,6 +833,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 3 @@ -927,6 +930,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 4 @@ -1033,6 +1037,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py From f93abddc88589d187e89fb26542fc75b2bb573ca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 11:16:49 +0800 Subject: [PATCH 06/10] fix(utility): fix the bug in creating auto delete files. --- source/os/src/osFile.c | 68 +++++++++++------------------------------- 1 file changed, 17 insertions(+), 51 deletions(-) diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index d8cccc83ed..fce276bc35 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -39,14 +39,6 @@ #define _SEND_FILE_STEP_ 1000 #endif -#if defined(WINDOWS) -typedef int32_t FileFd; -typedef int32_t SocketFd; -#else -typedef int32_t FileFd; -typedef int32_t SocketFd; -#endif - typedef int32_t FileFd; typedef struct TdFile { @@ -54,19 +46,10 @@ typedef struct TdFile { int refId; FileFd fd; FILE *fp; -} * TdFilePtr, TdFile; +} TdFile; #define FILE_WITH_LOCK 1 -typedef struct AutoDelFile *AutoDelFilePtr; -typedef struct AutoDelFile { - char *name; - AutoDelFilePtr lastAutoDelFilePtr; -} AutoDelFile; -static TdThreadMutex autoDelFileLock; -static AutoDelFilePtr nowAutoDelFilePtr = NULL; -static TdThreadOnce autoDelFileInit = PTHREAD_ONCE_INIT; - void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) { #ifdef WINDOWS const char *tdengineTmpFileNamePrefix = "tdengine-"; @@ -268,34 +251,6 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) { return 0; } -void autoDelFileList() { - taosThreadMutexLock(&autoDelFileLock); - while (nowAutoDelFilePtr != NULL) { - taosRemoveFile(nowAutoDelFilePtr->name); - AutoDelFilePtr tmp = nowAutoDelFilePtr->lastAutoDelFilePtr; - taosMemoryFree(nowAutoDelFilePtr->name); - taosMemoryFree(nowAutoDelFilePtr); - nowAutoDelFilePtr = tmp; - } - taosThreadMutexUnlock(&autoDelFileLock); - taosThreadMutexDestroy(&autoDelFileLock); -} - -void autoDelFileListInit() { - taosThreadMutexInit(&autoDelFileLock, NULL); - atexit(autoDelFileList); -} - -void autoDelFileListAdd(const char *path) { - taosThreadOnce(&autoDelFileInit, autoDelFileListInit); - taosThreadMutexLock(&autoDelFileLock); - AutoDelFilePtr tmp = taosMemoryMalloc(sizeof(AutoDelFile)); - tmp->lastAutoDelFilePtr = nowAutoDelFilePtr; - tmp->name = taosMemoryStrDup(path); - nowAutoDelFilePtr = tmp; - taosThreadMutexUnlock(&autoDelFileLock); -} - TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { int fd = -1; FILE *fp = NULL; @@ -313,7 +268,6 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { assert(!(tdFileOptions & TD_FILE_EXCL)); fp = fopen(path, mode); if (fp == NULL) { - // terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } } else { @@ -331,32 +285,44 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { access |= (tdFileOptions & TD_FILE_TEXT) ? O_TEXT : 0; access |= (tdFileOptions & TD_FILE_EXCL) ? O_EXCL : 0; #ifdef WINDOWS - fd = _open(path, access, _S_IREAD | _S_IWRITE); + int32_t pmode = _S_IREAD | _S_IWRITE; + if (tdFileOptions & TD_FILE_AUTO_DEL) { + pmode |= _O_TEMPORARY; + } + fd = _open(path, access, pmode); #else fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO); #endif if (fd == -1) { - // terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } } TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile)); if (pFile == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; if (fd >= 0) close(fd); if (fp != NULL) fclose(fp); return NULL; } + #if FILE_WITH_LOCK taosThreadRwlockInit(&(pFile->rwlock), NULL); #endif pFile->fd = fd; pFile->fp = fp; pFile->refId = 0; + if (tdFileOptions & TD_FILE_AUTO_DEL) { - autoDelFileListAdd(path); +#ifdef WINDOWS + // do nothing, since the property of pmode is set with _O_TEMPORARY; the OS will recycle + // the file handle, as well as the space on disk. +#else + // Remove it instantly, so when the program exits normally/abnormally, the file + // will be automatically remove by OS. + unlink(path); +#endif } + return pFile; } From b8aa4fae8a3c78e0717f116e8a4ba001f7eb4d17 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 14:23:27 +0800 Subject: [PATCH 07/10] fix(udf): disable the auto remove for *.so --- source/libs/function/src/udfd.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 6c88e4d5c8..9fcbbddf38 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -461,13 +461,14 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { #else snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name); #endif - TdFilePtr file = - taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); msgInfo->code = TSDB_CODE_FILE_CORRUPTED; goto _return; } + int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); if (count != pFuncInfo->codeSize) { fnError("udfd write udf shared library failed"); From 47e885da07306575b5b18af158a942ddf99d9346 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 3 Jan 2023 15:40:40 +0800 Subject: [PATCH 08/10] fix: evac page failed issue cause of disk full --- source/libs/executor/inc/executil.h | 4 +++ source/libs/executor/src/executil.c | 6 ++-- source/libs/executor/src/executorimpl.c | 36 ++++++++++++++++++- source/libs/executor/src/groupoperator.c | 19 ++++++++-- source/libs/executor/src/scanoperator.c | 4 +++ source/libs/executor/src/timewindowoperator.c | 14 ++++++++ source/libs/executor/src/tsort.c | 9 +++++ 7 files changed, 86 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 4229e8808d..e0d2276e6f 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -115,6 +115,10 @@ struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t i static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) { SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId); + if (NULL == bufPage) { + return NULL; + } + if (forUpdate) { setBufPageDirty(bufPage, true); } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index fc3cfbd0f6..a5468008aa 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1726,8 +1726,10 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI return w; } - w = getResultRowByPos(pBuf, &pResultRowInfo->cur, false)->win; - + SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false); + if (pRow) { + w = pRow->win; + } // in case of typical time window, we can calculate time window directly. if (w.skey > ts || w.ekey < ts) { w = doCalculateTimeWindow(ts, pInterval); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 57e6a63137..72419d8fe4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -150,6 +150,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i pData->num = sizeof(SFilePage); } else { pData = getBufPage(pResultBuf, *currentPageId); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return NULL; + } + pageId = *currentPageId; if (pData->num + interBufSize > getBufPageSize(pResultBuf)) { @@ -200,6 +205,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (isIntervalQuery) { if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists. pResult = getResultRowByPos(pResultBuf, p1, true); + if (NULL == pResult) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); } } else { @@ -208,6 +217,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (p1 != NULL) { // todo pResult = getResultRowByPos(pResultBuf, p1, true); + if (NULL == pResult) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); } } @@ -216,6 +229,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) { SResultRowPosition pos = pResultRowInfo->cur; SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); + if (pPage == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, terrno); + } releaseBufPage(pResultBuf, pPage); } @@ -223,6 +240,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (pResult == NULL) { ASSERT(pSup->resultRowSize > 0); pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); + if (pResult == NULL) { + T_LONG_JMP(pTaskInfo->env, terrno); + } // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; @@ -260,6 +280,11 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes } else { SPageInfo* pi = getLastPageInfo(list); pData = getBufPage(pResultBuf, getPageId(pi)); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } + pageId = getPageId(pi); if (pData->num + size > getBufPageSize(pResultBuf)) { @@ -912,7 +937,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin if (pResultRow->pageId == -1) { int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { - return; + T_LONG_JMP(pTaskInfo->env, terrno); } } @@ -993,6 +1018,11 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, terrno); + } + SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); SqlFunctionCtx* pCtx = pSup->pCtx; @@ -1036,6 +1066,10 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, terrno); + } SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 21ec5afdd6..5676e19cdf 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -492,13 +492,17 @@ _error: static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SPartitionOperatorInfo* pInfo = pOperator->info; - + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); SDataGroupInfo* pGroupInfo = NULL; void* pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len); + if (pPage == NULL) { + T_LONG_JMP(pTaskInfo->env, terrno); + } pGroupInfo->numOfRows += 1; @@ -595,6 +599,10 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf } else { int32_t* curId = taosArrayGetLast(p->pPageList); pPage = getBufPage(pInfo->pBuf, *curId); + if (pPage == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return pPage; + } int32_t* rows = (int32_t*)pPage; if (*rows >= pInfo->rowCapacity) { @@ -674,7 +682,8 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; - + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SDataGroupInfo* pGroupInfo = (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { @@ -692,7 +701,11 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); void* page = getBufPage(pInfo->pBuf, *pageId); - + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, terrno); + } + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0e22195afa..1d7f27d0cf 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -170,6 +170,10 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro } *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId); + if (NULL == *pPage) { + return NULL; + } + return (SResultRow*)((char*)(*pPage) + p1->offset); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 58fadc60b0..d78e9c4edf 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -636,6 +636,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num } SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false); + if (NULL == pr) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); if (pr->closed) { @@ -1315,6 +1319,10 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); + if (NULL == pResult) { + return; + } + SqlFunctionCtx* pCtx = pSup->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset); @@ -1328,6 +1336,9 @@ static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, } } SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId); + if (NULL == bufPage) { + return; + } setBufPageDirty(bufPage, true); releaseBufPage(pResultBuf, bufPage); } @@ -4114,6 +4125,9 @@ void destroyMAIOperatorInfo(void* param) { static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) { SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); + if (NULL == pResult) { + return pResult; + } pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; return pResult; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index fa0cdb3943..06ef36664a 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -270,6 +270,10 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); void* pPage = getBufPage(pHandle->pBuf, *pPgId); + if (NULL == pPage) { + return terrno; + } + code = blockDataFromBuf(pSource->src.pBlock, pPage); if (code != TSDB_CODE_SUCCESS) { return code; @@ -337,6 +341,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); void* pPage = getBufPage(pHandle->pBuf, *pPgId); + if (pPage == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } + int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage); if (code != TSDB_CODE_SUCCESS) { return code; From 9e71a58675345e2f74160c3dfda15b051bf929fa Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 3 Jan 2023 16:45:45 +0800 Subject: [PATCH 09/10] fix: load wal ref when init --- source/dnode/vnode/src/tq/tqOffset.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index dd56c165fd..be645469d4 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -61,6 +61,17 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { ASSERT(0); // TODO } + + if (offset.val.type == TMQ_OFFSET__LOG) { + STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); + if (pHandle) { + if (walRefVer(pHandle->pRef, offset.val.version) < 0) { + tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, + pHandle->subKey, offset.val.version); + } + } + } + taosMemoryFree(memBuf); } From 1567fe2f675f17f4e63445f519e5dffcb77276f9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 3 Jan 2023 17:19:19 +0800 Subject: [PATCH 10/10] fix: skiplist concurrent access --- source/dnode/vnode/src/inc/tsdb.h | 4 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 97 +++++++++++----------- 2 files changed, 50 insertions(+), 51 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5a63af41af..5a2e462c8c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -772,8 +772,8 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { return 0; } -#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) -#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) +// #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) +// #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { if (pIter == NULL) return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 0a7f59e429..5b2cab38bb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -19,9 +19,13 @@ #define SL_MAX_LEVEL 5 // sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2 -#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4)) -#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) -#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) +#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4)) +#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) +#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) +#define SL_GET_NODE_FORWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_FORWARD(n, l))) +#define SL_GET_NODE_BACKWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_BACKWARD(n, l))) +#define SL_SET_NODE_FORWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_FORWARD(n, l), (int64_t)(p)) +#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_BACKWARD(n, l), (int64_t)(p)) #define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_FROM_POS 0x2 @@ -246,18 +250,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa if (pFrom == NULL) { // create from head or tail if (backward) { - pIter->pNode = SL_NODE_BACKWARD(pTbData->sl.pTail, 0); + pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0); } else { - pIter->pNode = SL_NODE_FORWARD(pTbData->sl.pHead, 0); + pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0); } } else { // create from a key if (backward) { tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD); - pIter->pNode = SL_NODE_BACKWARD(pos[0], 0); + pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0); } else { tbDataMovePosTo(pTbData, pos, pFrom, 0); - pIter->pNode = SL_NODE_FORWARD(pos[0], 0); + pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0); } } } @@ -271,7 +275,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { return false; } - pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0); + pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0); if (pIter->pNode == pIter->pTbData->sl.pHead) { return false; } @@ -282,7 +286,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { return false; } - pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0); + pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0); if (pIter->pNode == pIter->pTbData->sl.pTail) { return false; } @@ -335,7 +339,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel; ASSERT(pPool != NULL); - pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2); + pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2); if (pTbData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -408,7 +412,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p if (fromPos) px = pos[pTbData->sl.level - 1]; for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { - pn = SL_NODE_BACKWARD(px, iLevel); + pn = SL_GET_NODE_BACKWARD(px, iLevel); while (pn != pTbData->sl.pHead) { tKey.version = pn->version; tKey.ts = pn->pTSRow->ts; @@ -418,7 +422,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p break; } else { px = pn; - pn = SL_NODE_BACKWARD(px, iLevel); + pn = SL_GET_NODE_BACKWARD(px, iLevel); } } @@ -438,7 +442,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p if (fromPos) px = pos[pTbData->sl.level - 1]; for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { - pn = SL_NODE_FORWARD(px, iLevel); + pn = SL_GET_NODE_FORWARD(px, iLevel); while (pn != pTbData->sl.pTail) { tKey.version = pn->version; tKey.ts = pn->pTSRow->ts; @@ -448,7 +452,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p break; } else { px = pn; - pn = SL_NODE_FORWARD(px, iLevel); + pn = SL_GET_NODE_FORWARD(px, iLevel); } } @@ -474,58 +478,53 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN int8_t level; SMemSkipListNode *pNode; SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; + int64_t nSize; - // node + // create node level = tsdbMemSkipListRandLevel(&pTbData->sl); - ASSERT(pPool != NULL); - pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level)); + nSize = SL_NODE_SIZE(level); + pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->len); if (pNode == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } pNode->level = level; pNode->version = version; - pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len); - if (NULL == pNode->pTSRow) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } + pNode->pTSRow = (STSRow *)((char *)pNode + nSize); memcpy(pNode->pTSRow, pRow, pRow->len); - for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { - SMemSkipListNode *pn = pos[iLevel]; - SMemSkipListNode *px; - - if (forward) { - px = SL_NODE_FORWARD(pn, iLevel); - - SL_NODE_BACKWARD(pNode, iLevel) = pn; - SL_NODE_FORWARD(pNode, iLevel) = px; - } else { - px = SL_NODE_BACKWARD(pn, iLevel); - - SL_NODE_BACKWARD(pNode, iLevel) = px; - SL_NODE_FORWARD(pNode, iLevel) = pn; + // set node + if (forward) { + for (int8_t iLevel = 0; iLevel < level; iLevel++) { + SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel); + SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel]; + } + } else { + for (int8_t iLevel = 0; iLevel < level; iLevel++) { + SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel]; + SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel); } } - for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { - SMemSkipListNode *pn = pos[iLevel]; - SMemSkipListNode *px; + // set forward and backward + if (forward) { + for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { + SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel]; - if (forward) { - px = SL_NODE_FORWARD(pn, iLevel); + SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode); + SL_SET_NODE_BACKWARD(pNext, iLevel, pNode); - SL_NODE_FORWARD(pn, iLevel) = pNode; - SL_NODE_BACKWARD(px, iLevel) = pNode; - } else { - px = SL_NODE_BACKWARD(pn, iLevel); - - SL_NODE_FORWARD(px, iLevel) = pNode; - SL_NODE_BACKWARD(pn, iLevel) = pNode; + pos[iLevel] = pNode; } + } else { + for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { + SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel]; - pos[iLevel] = pNode; + SL_SET_NODE_FORWARD(pPrev, iLevel, pNode); + SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode); + + pos[iLevel] = pNode; + } } pTbData->sl.size++;