diff --git a/include/common/tname.h b/include/common/tname.h
index ffa4f8f253..ae2dc32335 100644
--- a/include/common/tname.h
+++ b/include/common/tname.h
@@ -17,6 +17,7 @@
#define _TD_COMMON_NAME_H_
#include "tdef.h"
+#include "tarray.h"
#ifdef __cplusplus
extern "C" {
@@ -62,6 +63,18 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId);
bool tNameDBNameEqual(SName* left, SName* right);
+typedef struct {
+ // input
+ SArray *tags; // element is SSmlKV
+ const char *sTableName; // super table name
+ uint8_t sTableNameLen; // the length of super table name
+
+ // output
+ char *childTableName; // must have size of TSDB_TABLE_NAME_LEN;
+ uint64_t uid; // child table uid, may be useful
+} RandTableName;
+
+void buildChildTableName(RandTableName *rName);
#ifdef __cplusplus
}
diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index 347303a051..88ac1532c2 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -336,6 +336,17 @@ int32_t udfcOpen();
*/
int32_t udfcClose();
+/**
+ * start udfd that serves udf function invocation under dnode startDnodeId
+ * @param startDnodeId
+ * @return
+ */
+int32_t udfStartUdfd(int32_t startDnodeId);
+/**
+ * stop udfd
+ * @return
+ */
+int32_t udfStopUdfd();
#ifdef __cplusplus
}
#endif
diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c
index 2a0e85092b..15d1500860 100644
--- a/source/client/src/clientSml.c
+++ b/source/client/src/clientSml.c
@@ -15,6 +15,7 @@
#include "tcommon.h"
#include "catalog.h"
#include "clientInt.h"
+#include "tname.h"
//=================================================================================================
#define SPACE ' '
@@ -97,6 +98,21 @@ typedef struct {
char *buf;
} SSmlMsgBuf;
+typedef struct {
+ int32_t code;
+ int32_t lineNum;
+
+ int32_t numOfSTables;
+ int32_t numOfCTables;
+ int32_t numOfCreateSTables;
+
+ int64_t parseTime;
+ int64_t schemaTime;
+ int64_t insertBindTime;
+ int64_t insertRpcTime;
+ int64_t endTime;
+} SSmlCostInfo;
+
typedef struct {
uint64_t id;
@@ -114,6 +130,7 @@ typedef struct {
SRequestObj *pRequest;
SQuery *pQuery;
+ SSmlCostInfo cost;
int32_t affectedRows;
SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key
@@ -147,45 +164,6 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const
return TSDB_CODE_SML_INVALID_DATA;
}
-static int smlCompareKv(const void* p1, const void* p2) {
- SSmlKv* kv1 = *(SSmlKv**)p1;
- SSmlKv* kv2 = *(SSmlKv**)p2;
- int32_t kvLen1 = kv1->keyLen;
- int32_t kvLen2 = kv2->keyLen;
- int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
- if (res != 0) {
- return res;
- } else {
- return kvLen1-kvLen2;
- }
-}
-
-static void smlBuildChildTableName(SSmlTableInfo *tags) {
- int32_t size = taosArrayGetSize(tags->tags);
- ASSERT(size > 0);
- taosArraySort(tags->tags, smlCompareKv);
-
- SStringBuilder sb = {0};
- taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen);
- for (int j = 0; j < size; ++j) {
- SSmlKv *tagKv = taosArrayGetP(tags->tags, j);
- taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
- taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
- }
- size_t len = 0;
- char* keyJoined = taosStringBuilderGetResult(&sb, &len);
- T_MD5_CTX context;
- tMD5Init(&context);
- tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
- tMD5Final(&context);
- uint64_t digest1 = *(uint64_t*)(context.digest);
- //uint64_t digest2 = *(uint64_t*)(context.digest + 8);
- //snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
- snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1);
- taosStringBuilderDestroy(&sb);
- tags->uid = digest1;
-}
-
static int32_t smlGenerateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) {
// char fieldName[TSDB_COL_NAME_LEN] = {0};
@@ -444,6 +422,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, schemaAction.createSTable.sTableName);
return code;
}
+ info->cost.numOfCreateSTables++;
}else if (code == TSDB_CODE_SUCCESS) {
} else {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
@@ -926,20 +905,6 @@ static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return false;
}
-static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlHandle* info) {
- char *val = NULL;
- val = taosHashGet(pHash, key, strlen(key));
- if (val) {
- uError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
- return true;
- }
-
- uint8_t dummy_val = 0;
- taosHashPut(pHash, key, strlen(key), &dummy_val, sizeof(uint8_t));
-
- return false;
-}
-
static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){
if(!sql) return TSDB_CODE_SML_INVALID_DATA;
while (*sql != '\0') { // jump the space at the begining
@@ -1546,8 +1511,10 @@ static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
tinfo->sTableName = elements.measure;
tinfo->sTableNameLen = elements.measureLen;
- smlBuildChildTableName(tinfo);
- uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName);
+ RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=tinfo->sTableNameLen,
+ .childTableName=tinfo->childTableName};
+ buildChildTableName(&rName);
+ tinfo->uid = rName.uid;
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta
@@ -1604,7 +1571,7 @@ static void smlDestroyInfo(SSmlHandle* info){
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){
int32_t code = TSDB_CODE_SUCCESS;
- SSmlHandle* info = taosMemoryMalloc(sizeof(SSmlHandle));
+ SSmlHandle* info = taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) {
return NULL;
}
@@ -1699,12 +1666,23 @@ static int32_t smlInsertData(SSmlHandle* info) {
}
smlBuildOutput(info->exec, info->pVgHash);
+ info->cost.insertRpcTime = taosGetTimestampUs();
+
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
info->affectedRows = taos_affected_rows(info->pRequest);
return info->pRequest->code;
}
+static void smlPrintStatisticInfo(SSmlHandle *info){
+ uError("SML:0x%"PRIx64" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
+ parse cost:%"PRId64",schema cost:%"PRId64",bind cost:%"PRId64",rpc cost:%"PRId64",total cost:%"PRId64"", info->id, info->cost.code,
+ info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables,
+ info->cost.schemaTime-info->cost.parseTime, info->cost.insertBindTime-info->cost.schemaTime,
+ info->cost.insertRpcTime-info->cost.insertBindTime, info->cost.endTime-info->cost.insertRpcTime,
+ info->cost.endTime-info->cost.parseTime);
+}
+
static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
int32_t code = TSDB_CODE_SUCCESS;
@@ -1714,6 +1692,7 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
goto cleanup;
}
+ info->cost.parseTime = taosGetTimestampUs();
for (int32_t i = 0; i < numLines; ++i) {
code = smlParseLine(info, lines[i]);
if (code != TSDB_CODE_SUCCESS) {
@@ -1721,24 +1700,29 @@ static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
goto cleanup;
}
}
- uDebug("SML:0x%"PRIx64" smlInsertLines parse success. tables %d", info->id, taosHashGetSize(info->childTables));
- uDebug("SML:0x%"PRIx64" smlInsertLines parse success. super tables %d", info->id, taosHashGetSize(info->superTables));
+ info->cost.lineNum = numLines;
+ info->cost.numOfSTables = taosHashGetSize(info->superTables);
+ info->cost.numOfCTables = taosHashGetSize(info->childTables);
+
+ info->cost.schemaTime = taosGetTimestampUs();
code = smlModifyDBSchemas(info);
if (code != 0) {
uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
goto cleanup;
}
+ info->cost.insertBindTime = taosGetTimestampUs();
code = smlInsertData(info);
if (code != 0) {
uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code));
goto cleanup;
}
-
- uDebug("SML:0x%"PRIx64" smlInsertLines finish inserting %d lines.", info->id, numLines);
+ info->cost.endTime = taosGetTimestampUs();
cleanup:
+ info->cost.code = code;
+ smlPrintStatisticInfo(info);
return code;
}
@@ -1790,7 +1774,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
}
smlDestroyInfo(info);
-end:
return (TAOS_RES*)request;
}
diff --git a/source/common/src/tname.c b/source/common/src/tname.c
index f4755f5b5e..62ba4bfb79 100644
--- a/source/common/src/tname.c
+++ b/source/common/src/tname.c
@@ -15,6 +15,8 @@
#define _DEFAULT_SOURCE
#include "tname.h"
+#include "tcommon.h"
+#include "tstrbuild.h"
#define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T)
@@ -294,4 +296,43 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return 0;
}
+static int compareKv(const void* p1, const void* p2) {
+ SSmlKv* kv1 = *(SSmlKv**)p1;
+ SSmlKv* kv2 = *(SSmlKv**)p2;
+ int32_t kvLen1 = kv1->keyLen;
+ int32_t kvLen2 = kv2->keyLen;
+ int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
+ if (res != 0) {
+ return res;
+ } else {
+ return kvLen1-kvLen2;
+ }
+}
+/*
+ * use stable name and tags to grearate child table name
+ */
+void buildChildTableName(RandTableName *rName) {
+ int32_t size = taosArrayGetSize(rName->tags);
+ ASSERT(size > 0);
+ taosArraySort(rName->tags, compareKv);
+
+ SStringBuilder sb = {0};
+ taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen);
+ for (int j = 0; j < size; ++j) {
+ SSmlKv *tagKv = taosArrayGetP(rName->tags, j);
+ taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
+ taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
+ }
+ size_t len = 0;
+ char* keyJoined = taosStringBuilderGetResult(&sb, &len);
+ T_MD5_CTX context;
+ tMD5Init(&context);
+ tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
+ tMD5Final(&context);
+ uint64_t digest1 = *(uint64_t*)(context.digest);
+ uint64_t digest2 = *(uint64_t*)(context.digest + 8);
+ snprintf(rName->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
+ taosStringBuilderDestroy(&sb);
+ rName->uid = digest1;
+}
diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c
index 32205b337c..129d41061e 100644
--- a/source/dnode/mgmt/implement/src/dmHandle.c
+++ b/source/dnode/mgmt/implement/src/dmHandle.c
@@ -217,145 +217,6 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread(pWrapper->pDnode);
}
-static int32_t dmSpawnUdfd(SDnode *pDnode);
-
-void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
- dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
- SDnode *pDnode = process->data;
- if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pDnode->udfdData.stopCalled)) {
- dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
- } else {
- dInfo("udfd process restart");
- dmSpawnUdfd(pDnode);
- }
-}
-
-static int32_t dmSpawnUdfd(SDnode *pDnode) {
- dInfo("dnode start spawning udfd");
- uv_process_options_t options = {0};
-
- char path[PATH_MAX] = {0};
- if (tsProcPath == NULL) {
- path[0] = '.';
- } else {
- strncpy(path, tsProcPath, strlen(tsProcPath));
- taosDirName(path);
- }
-#ifdef WINDOWS
- strcat(path, "udfd.exe");
-#else
- strcat(path, "/udfd");
-#endif
- char* argsUdfd[] = {path, "-c", configDir, NULL};
- options.args = argsUdfd;
- options.file = path;
-
- options.exit_cb = dmUdfdExit;
-
- SUdfdData *pData = &pDnode->udfdData;
- uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
-
- uv_stdio_container_t child_stdio[3];
- child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
- child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
- child_stdio[1].flags = UV_IGNORE;
- child_stdio[2].flags = UV_INHERIT_FD;
- child_stdio[2].data.fd = 2;
- options.stdio_count = 3;
- options.stdio = child_stdio;
-
- options.flags = UV_PROCESS_DETACHED;
-
- char dnodeIdEnvItem[32] = {0};
- char thrdPoolSizeEnvItem[32] = {0};
- snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId);
- float numCpuCores = 4;
- taosGetCpuCores(&numCpuCores);
- snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
- char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
- options.env = envUdfd;
-
- int err = uv_spawn(&pData->loop, &pData->process, &options);
- pData->process.data = (void*)pDnode;
-
- if (err != 0) {
- dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
- }
- return err;
-}
-
-static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
- if (!uv_is_closing(handle)) {
- uv_close(handle, NULL);
- }
-}
-
-static void dmUdfdStopAsyncCb(uv_async_t *async) {
- SDnode *pDnode = async->data;
- SUdfdData *pData = &pDnode->udfdData;
- uv_stop(&pData->loop);
-}
-
-static void dmWatchUdfd(void *args) {
- SDnode *pDnode = args;
- SUdfdData *pData = &pDnode->udfdData;
- uv_loop_init(&pData->loop);
- uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb);
- pData->stopAsync.data = pDnode;
- int32_t err = dmSpawnUdfd(pDnode);
- atomic_store_32(&pData->spawnErr, err);
- uv_barrier_wait(&pData->barrier);
- uv_run(&pData->loop, UV_RUN_DEFAULT);
- uv_loop_close(&pData->loop);
-
- uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL);
- uv_run(&pData->loop, UV_RUN_DEFAULT);
- uv_loop_close(&pData->loop);
- return;
-}
-
-static int32_t dmStartUdfd(SDnode *pDnode) {
- char dnodeId[8] = {0};
- snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId);
- uv_os_setenv("DNODE_ID", dnodeId);
- SUdfdData *pData = &pDnode->udfdData;
- if (pData->startCalled) {
- dInfo("dnode-mgmt start udfd already called");
- return 0;
- }
- pData->startCalled = true;
- uv_barrier_init(&pData->barrier, 2);
- uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
- uv_barrier_wait(&pData->barrier);
- int32_t err = atomic_load_32(&pData->spawnErr);
- if (err != 0) {
- uv_barrier_destroy(&pData->barrier);
- uv_async_send(&pData->stopAsync);
- uv_thread_join(&pData->thread);
- pData->needCleanUp = false;
- dInfo("dnode-mgmt udfd cleaned up after spawn err");
- } else {
- pData->needCleanUp = true;
- }
- return err;
-}
-
-static int32_t dmStopUdfd(SDnode *pDnode) {
- dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
- pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr);
- SUdfdData *pData = &pDnode->udfdData;
- if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
- return 0;
- }
- atomic_store_32(&pData->stopCalled, 1);
- pData->needCleanUp = false;
- uv_barrier_destroy(&pData->barrier);
- uv_async_send(&pData->stopAsync);
- uv_thread_join(&pData->thread);
- dInfo("dnode-mgmt udfd cleaned up");
- return 0;
-}
-
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to init");
SDnode *pDnode = pWrapper->pDnode;
@@ -387,7 +248,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
}
dmReportStartup(pDnode, "dnode-transport", "initialized");
- if (dmStartUdfd(pDnode) != 0) {
+ if (udfStartUdfd(pDnode->data.dnodeId) != 0) {
dError("failed to start udfd");
}
@@ -398,7 +259,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pWrapper->pDnode;
- dmStopUdfd(pDnode);
+
+ udfStopUdfd();
+
dmStopWorker(pDnode);
taosWLockLatch(&pDnode->data.latch);
diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h
index 2e8ad982d8..445e1d42f5 100644
--- a/source/dnode/mgmt/interface/inc/dmDef.h
+++ b/source/dnode/mgmt/interface/inc/dmDef.h
@@ -156,6 +156,8 @@ typedef struct SUdfdData {
uv_pipe_t ctrlPipe;
uv_async_t stopAsync;
int32_t stopCalled;
+
+ int32_t dnodeId;
} SUdfdData;
typedef struct SDnode {
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 1327ddb48e..2166de9fb2 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@@ -803,7 +804,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs; // this can be set during create the struct
- pCtx[k].fpSet.process(&pCtx[k]);
+ if (pCtx[k].fpSet.process != NULL)
+ pCtx[k].fpSet.process(&pCtx[k]);
}
}
}
@@ -1074,35 +1076,36 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
// set the output buffer for the selectivity + tag query
static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
int32_t num = 0;
- int16_t tagLen = 0;
SqlFunctionCtx* p = NULL;
- SqlFunctionCtx** pTagCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
- if (pTagCtx == NULL) {
+ SqlFunctionCtx** pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
+ if (pValCtx == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
- int32_t functionId = pCtx[i].functionId;
-
- if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
- tagLen += pCtx[i].resDataInfo.bytes;
- pTagCtx[num++] = &pCtx[i];
- } else if (1 /*(aAggs[functionId].status & FUNCSTATE_SELECTIVITY) != 0*/) {
- p = &pCtx[i];
- } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
- // tag function may be the group by tag column
- // ts may be the required primary timestamp column
- continue;
+ if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
+ pValCtx[num++] = &pCtx[i];
} else {
- // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
+ p = &pCtx[i];
}
+// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
+// tagLen += pCtx[i].resDataInfo.bytes;
+// pTagCtx[num++] = &pCtx[i];
+// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
+// // tag function may be the group by tag column
+// // ts may be the required primary timestamp column
+// continue;
+// } else {
+// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
+// }
}
+
if (p != NULL) {
- p->subsidiaries.pCtx = pTagCtx;
+ p->subsidiaries.pCtx = pValCtx;
p->subsidiaries.num = num;
} else {
- taosMemoryFreeClear(pTagCtx);
+ taosMemoryFreeClear(pValCtx);
}
return TSDB_CODE_SUCCESS;
@@ -2219,6 +2222,8 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
if (pCtx[j].fpSet.process) {
pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
+ } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
+ // do nothing, todo refactor
} else {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
@@ -3974,6 +3979,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
@@ -4037,9 +4043,13 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
- projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
+ pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
pProjectInfo->pPseudoColInfo);
+ if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
+ longjmp(pTaskInfo->env, pTaskInfo->code);
+ }
+
int32_t status = handleLimitOffset(pOperator, pBlock);
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index 183cb9dbe6..2ba3e257b2 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -262,6 +262,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return NULL;
}
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
@@ -289,7 +291,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->pScalarExprInfo != NULL) {
- projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
+ pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
+ if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
+ longjmp(pTaskInfo->env, pTaskInfo->code);
+ }
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c
index a654876513..990dc0f200 100644
--- a/source/libs/executor/src/sortoperator.c
+++ b/source/libs/executor/src/sortoperator.c
@@ -114,7 +114,10 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->pExpr != NULL) {
- projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
+ int32_t code = projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
+ if (code != TSDB_CODE_SUCCESS) {
+ longjmp(pOperator->pTaskInfo->env, code);
+ }
}
}
diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h
index 1f2ad0797d..ce2f0b0651 100644
--- a/source/libs/function/inc/builtinsimpl.h
+++ b/source/libs/function/inc/builtinsimpl.h
@@ -37,11 +37,11 @@ bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t sumFunction(SqlFunctionCtx *pCtx);
int32_t sumInvertFunction(SqlFunctionCtx *pCtx);
-bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
-bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
+bool minmaxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t minFunction(SqlFunctionCtx* pCtx);
int32_t maxFunction(SqlFunctionCtx *pCtx);
+int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
@@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
+int32_t bottomFunction(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
@@ -82,6 +83,8 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn
int32_t histogramFunction(SqlFunctionCtx* pCtx);
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
+bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index 5aa1b63c79..2be2682bc9 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
- // todo
+ SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
+ pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
}
@@ -509,9 +510,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateInOutNum,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getMinmaxFuncEnv,
- .initFunc = minFunctionSetup,
+ .initFunc = minmaxFunctionSetup,
.processFunc = minFunction,
- .finalizeFunc = functionFinalize
+ .finalizeFunc = minmaxFunctionFinalize
},
{
.name = "max",
@@ -520,9 +521,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateInOutNum,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getMinmaxFuncEnv,
- .initFunc = maxFunctionSetup,
+ .initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
- .finalizeFunc = functionFinalize
+ .finalizeFunc = minmaxFunctionFinalize
},
{
.name = "stddev",
@@ -562,14 +563,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateApercentile,
.getEnvFunc = getMinmaxFuncEnv,
- .initFunc = maxFunctionSetup,
+ .initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
{
.name = "top",
.type = FUNCTION_TYPE_TOP,
- .classification = FUNC_MGT_AGG_FUNC,
+ .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
.translateFunc = translateTop,
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
@@ -579,12 +580,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "bottom",
.type = FUNCTION_TYPE_BOTTOM,
- .classification = FUNC_MGT_AGG_FUNC,
+ .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
.translateFunc = translateBottom,
- .getEnvFunc = getMinmaxFuncEnv,
- .initFunc = maxFunctionSetup,
- .processFunc = maxFunction,
- .finalizeFunc = functionFinalize
+ .getEnvFunc = getTopBotFuncEnv,
+ .initFunc = functionSetup,
+ .processFunc = bottomFunction,
+ .finalizeFunc = topBotFinalize
},
{
.name = "spread",
@@ -603,7 +604,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC,
.translateFunc = translateLastRow,
.getEnvFunc = getMinmaxFuncEnv,
- .initFunc = maxFunctionSetup,
+ .initFunc = minmaxFunctionSetup,
.processFunc = maxFunction,
.finalizeFunc = functionFinalize
},
@@ -1032,8 +1033,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_SELECT_VALUE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
.translateFunc = translateSelectValue,
- .getEnvFunc = NULL,
- .initFunc = NULL,
+ .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later.
+ .initFunc = functionSetup,
.sprocessFunc = NULL,
.finalizeFunc = NULL
}
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index 9c1601b61a..1cb47a0bf1 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -37,13 +37,15 @@ typedef struct SAvgRes {
int64_t count;
} SAvgRes;
+typedef struct STuplePos {
+ int32_t pageId;
+ int32_t offset;
+} STuplePos;
+
typedef struct STopBotResItem {
- SVariant v;
- uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
- struct {
- int32_t pageId;
- int32_t offset;
- } tuplePos; // tuple data of this chosen row
+ SVariant v;
+ uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
+ STuplePos tuplePos; // tuple data of this chosen row
} STopBotResItem;
typedef struct STopBotRes {
@@ -616,101 +618,25 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
return FUNC_DATA_REQUIRED_STATIS_LOAD;
}
-bool maxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
- if (!functionSetup(pCtx, pResultInfo)) {
- return false;
- }
+typedef struct SMinmaxResInfo {
+ bool assign; // assign the first value or not
+ int64_t v;
+ STuplePos tuplePos;
+} SMinmaxResInfo;
- char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
- switch (pCtx->resDataInfo.type) {
- case TSDB_DATA_TYPE_INT:
- *((int32_t*)buf) = INT32_MIN;
- break;
- case TSDB_DATA_TYPE_UINT:
- *((uint32_t*)buf) = 0;
- break;
- case TSDB_DATA_TYPE_FLOAT:
- *((float*)buf) = -FLT_MAX;
- break;
- case TSDB_DATA_TYPE_DOUBLE:
- SET_DOUBLE_VAL(((double*)buf), -DBL_MAX);
- break;
- case TSDB_DATA_TYPE_BIGINT:
- *((int64_t*)buf) = INT64_MIN;
- break;
- case TSDB_DATA_TYPE_UBIGINT:
- *((uint64_t*)buf) = 0;
- break;
- case TSDB_DATA_TYPE_SMALLINT:
- *((int16_t*)buf) = INT16_MIN;
- break;
- case TSDB_DATA_TYPE_USMALLINT:
- *((uint16_t*)buf) = 0;
- break;
- case TSDB_DATA_TYPE_TINYINT:
- *((int8_t*)buf) = INT8_MIN;
- break;
- case TSDB_DATA_TYPE_UTINYINT:
- *((uint8_t*)buf) = 0;
- break;
- case TSDB_DATA_TYPE_BOOL:
- *((int8_t*)buf) = 0;
- break;
- default:
- assert(0);
- }
- return true;
-}
-
-bool minFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
+bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized
}
- char* buf = GET_ROWCELL_INTERBUF(pResultInfo);
- switch (pCtx->resDataInfo.type) {
- case TSDB_DATA_TYPE_TINYINT:
- *((int8_t*)buf) = INT8_MAX;
- break;
- case TSDB_DATA_TYPE_UTINYINT:
- *(uint8_t*)buf = UINT8_MAX;
- break;
- case TSDB_DATA_TYPE_SMALLINT:
- *((int16_t*)buf) = INT16_MAX;
- break;
- case TSDB_DATA_TYPE_USMALLINT:
- *((uint16_t*)buf) = UINT16_MAX;
- break;
- case TSDB_DATA_TYPE_INT:
- *((int32_t*)buf) = INT32_MAX;
- break;
- case TSDB_DATA_TYPE_UINT:
- *((uint32_t*)buf) = UINT32_MAX;
- break;
- case TSDB_DATA_TYPE_BIGINT:
- *((int64_t*)buf) = INT64_MAX;
- break;
- case TSDB_DATA_TYPE_UBIGINT:
- *((uint64_t*)buf) = UINT64_MAX;
- break;
- case TSDB_DATA_TYPE_FLOAT:
- *((float*)buf) = FLT_MAX;
- break;
- case TSDB_DATA_TYPE_DOUBLE:
- SET_DOUBLE_VAL(((double*)buf), DBL_MAX);
- break;
- case TSDB_DATA_TYPE_BOOL:
- *((int8_t*)buf) = 1;
- break;
- default:
- assert(0);
- }
-
+ SMinmaxResInfo* buf = GET_ROWCELL_INTERBUF(pResultInfo);
+ buf->assign = false;
+ buf->tuplePos.pageId = -1;
return true;
}
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
- pEnv->calcMemSize = sizeof(int64_t);
+ pEnv->calcMemSize = sizeof(SMinmaxResInfo);
return true;
}
@@ -758,6 +684,9 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
} \
} while (0)
+static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
+static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
+
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int32_t numOfElems = 0;
@@ -768,13 +697,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int32_t type = pCol->info.type;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
- char* buf = GET_ROWCELL_INTERBUF(pResInfo);
+ SMinmaxResInfo *pBuf = GET_ROWCELL_INTERBUF(pResInfo);
// data in current data block are qualified to the query
if (pInput->colDataAggIsSet) {
numOfElems = pInput->numOfRows - pAgg->numOfNull;
ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0);
-
if (numOfElems == 0) {
return numOfElems;
}
@@ -793,48 +721,82 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
// the index is the original position, not the relative position
TSKEY key = (pCtx->ptsList != NULL) ? pCtx->ptsList[index] : TSKEY_INITIAL_VAL;
- if (IS_SIGNED_NUMERIC_TYPE(type)) {
- int64_t prev = 0;
- GET_TYPED_DATA(prev, int64_t, type, buf);
+ if (!pBuf->assign) {
+ pBuf->v = *(int64_t*)tval;
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ } else {
+ if (IS_SIGNED_NUMERIC_TYPE(type)) {
+ int64_t prev = 0;
+ GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
- int64_t val = GET_INT64_VAL(tval);
- if ((prev < val) ^ isMinFunc) {
- *(int64_t*)buf = val;
- for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
- SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
- if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
- __ctx->tag.i = key;
- __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
+ int64_t val = GET_INT64_VAL(tval);
+ if ((prev < val) ^ isMinFunc) {
+ pBuf->v = val;
+ // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
+ // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
+ // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
+ // __ctx->tag.i = key;
+ // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
+ // }
+ //
+ // __ctx->fpSet.process(__ctx);
+ // }
+
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
+ }
- __ctx->fpSet.process(__ctx);
+ } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
+ uint64_t prev = 0;
+ GET_TYPED_DATA(prev, uint64_t, type, &pBuf->v);
+
+ uint64_t val = GET_UINT64_VAL(tval);
+ if ((prev < val) ^ isMinFunc) {
+ pBuf->v = val;
+ // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
+ // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
+ // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
+ // __ctx->tag.i = key;
+ // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
+ // }
+ //
+ // __ctx->fpSet.process(__ctx);
+ // }
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ } else if (type == TSDB_DATA_TYPE_DOUBLE) {
+ double prev = 0;
+ GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
+
+ double val = GET_DOUBLE_VAL(tval);
+ if ((prev < val) ^ isMinFunc) {
+ pBuf->v = val;
+
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ } else if (type == TSDB_DATA_TYPE_FLOAT) {
+ double prev = 0;
+ GET_TYPED_DATA(prev, int64_t, type, &pBuf->v);
+
+ double val = GET_DOUBLE_VAL(tval);
+ if ((prev < val) ^ isMinFunc) {
+ pBuf->v = val;
+ }
+
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
}
- } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
- uint64_t prev = 0;
- GET_TYPED_DATA(prev, uint64_t, type, buf);
-
- uint64_t val = GET_UINT64_VAL(tval);
- if ((prev < val) ^ isMinFunc) {
- *(uint64_t*)buf = val;
- for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
- SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
- if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
- __ctx->tag.i = key;
- __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
- }
-
- __ctx->fpSet.process(__ctx);
- }
- }
- } else if (type == TSDB_DATA_TYPE_DOUBLE) {
- double val = GET_DOUBLE_VAL(tval);
- UPDATE_DATA(pCtx, *(double*)buf, val, numOfElems, isMinFunc, key);
- } else if (type == TSDB_DATA_TYPE_FLOAT) {
- double val = GET_DOUBLE_VAL(tval);
- UPDATE_DATA(pCtx, *(float*)buf, val, numOfElems, isMinFunc, key);
}
+ pBuf->assign = true;
return numOfElems;
}
@@ -843,47 +805,318 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
- LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
- } else if (type == TSDB_DATA_TYPE_SMALLINT) {
- LOOPCHECK_N(*(int16_t*)buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
- } else if (type == TSDB_DATA_TYPE_INT) {
- int32_t* pData = (int32_t*)pCol->pData;
- int32_t* val = (int32_t*)buf;
+ int8_t* pData = (int8_t*)pCol->pData;
+ int8_t* val = (int8_t*)&pBuf->v;
for (int32_t i = start; i < start + numOfRows; ++i) {
if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
- if ((*val < pData[i]) ^ isMinFunc) {
+ if (!pBuf->assign) {
*val = pData[i];
- TSKEY ts = (pCtx->ptsList != NULL) ? GET_TS_DATA(pCtx, i) : 0;
- DO_UPDATE_SUBSID_RES(pCtx, ts);
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
}
numOfElems += 1;
}
+ } else if (type == TSDB_DATA_TYPE_SMALLINT) {
+ int16_t* pData = (int16_t*)pCol->pData;
+ int16_t* val = (int16_t*)&pBuf->v;
-#if defined(_DEBUG_VIEW)
- qDebug("max value updated:%d", *retVal);
-#endif
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
+ } else if (type == TSDB_DATA_TYPE_INT) {
+ int32_t* pData = (int32_t*)pCol->pData;
+ int32_t* val = (int32_t*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
} else if (type == TSDB_DATA_TYPE_BIGINT) {
- LOOPCHECK_N(*(int64_t*)buf, pCol, pCtx, int64_t, numOfRows, start, isMinFunc, numOfElems);
+ int64_t* pData = (int64_t*)pCol->pData;
+ int64_t* val = (int64_t*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
if (type == TSDB_DATA_TYPE_UTINYINT) {
- LOOPCHECK_N(*(uint8_t*)buf, pCol, pCtx, uint8_t, numOfRows, start, isMinFunc, numOfElems);
+ uint8_t* pData = (uint8_t*)pCol->pData;
+ uint8_t* val = (uint8_t*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
} else if (type == TSDB_DATA_TYPE_USMALLINT) {
- LOOPCHECK_N(*(uint16_t*)buf, pCol, pCtx, uint16_t, numOfRows, start, isMinFunc, numOfElems);
+ uint16_t* pData = (uint16_t*)pCol->pData;
+ uint16_t* val = (uint16_t*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
} else if (type == TSDB_DATA_TYPE_UINT) {
- LOOPCHECK_N(*(uint32_t*)buf, pCol, pCtx, uint32_t, numOfRows, start, isMinFunc, numOfElems);
+ uint32_t* pData = (uint32_t*)pCol->pData;
+ uint32_t* val = (uint32_t*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
} else if (type == TSDB_DATA_TYPE_UBIGINT) {
- LOOPCHECK_N(*(uint64_t*)buf, pCol, pCtx, uint64_t, numOfRows, start, isMinFunc, numOfElems);
+ uint64_t* pData = (uint64_t*)pCol->pData;
+ uint64_t* val = (uint64_t*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
- LOOPCHECK_N(*(double*)buf, pCol, pCtx, double, numOfRows, start, isMinFunc, numOfElems);
+ double* pData = (double*)pCol->pData;
+ double* val = (double*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
} else if (type == TSDB_DATA_TYPE_FLOAT) {
- LOOPCHECK_N(*(float*)buf, pCol, pCtx, float, numOfRows, start, isMinFunc, numOfElems);
+ float* pData = (float*)pCol->pData;
+ double* val = (double*)&pBuf->v;
+
+ for (int32_t i = start; i < start + numOfRows; ++i) {
+ if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ if (!pBuf->assign) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ pBuf->assign = true;
+ } else {
+ // ignore the equivalent data value
+ if ((*val) == pData[i]) {
+ continue;
+ }
+
+ if ((*val < pData[i]) ^ isMinFunc) {
+ *val = pData[i];
+ if (pCtx->subsidiaries.num > 0) {
+ copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
+ }
+ }
+ }
+
+ numOfElems += 1;
+ }
}
return numOfElems;
@@ -901,6 +1134,65 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
+static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex);
+
+int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
+ SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
+
+ SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
+
+ int32_t type = pCtx->input.pData[0]->info.type;
+ int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
+
+ SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
+
+ // todo assign the tag value
+ int32_t currentRow = pBlock->info.rows;
+
+ if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) {
+ float v = *(double*) &pRes->v;
+ colDataAppend(pCol, currentRow, (const char*)&v, false);
+ } else {
+ colDataAppend(pCol, currentRow, (const char*)&pRes->v, false);
+ }
+
+ setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
+ return pEntryInfo->numOfRes;
+}
+
+void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex) {
+ int32_t pageId = pTuplePos->pageId;
+ int32_t offset = pTuplePos->offset;
+ if (pTuplePos->pageId != -1) {
+ SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
+
+ bool* nullList = (bool*)((char*)pPage + offset);
+ char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
+
+ // todo set the offset value to optimize the performance.
+ for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
+ SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
+
+ SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
+ int32_t srcSlotId = pFuncParam->pCol->slotId;
+ int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
+
+ int32_t ps = 0;
+ for (int32_t k = 0; k < srcSlotId; ++k) {
+ SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
+ ps += pSrcCol->info.bytes;
+ }
+
+ SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
+ if (nullList[srcSlotId]) {
+ colDataAppendNULL(pDstCol, rowIndex);
+ } else {
+ colDataAppend(pDstCol, rowIndex, (pStart + ps), false);
+ }
+ }
+ }
+}
+
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStddevRes);
return true;
@@ -1244,6 +1536,14 @@ bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return true;
}
+bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
+ SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
+ pEnv->calcMemSize = pNode->node.resType.bytes;
+ return true;
+}
+
+
+
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
if (pTsColInfo == NULL) {
return 0;
@@ -1622,35 +1922,49 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
}
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
- uint64_t uid, SResultRowEntryInfo* pEntryInfo);
-
-static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
-static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem);
+ uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
int32_t topFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
- // if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
- // buildTopBotStruct(pRes, pCtx);
- // }
-
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex;
- int32_t numOfRows = pInput->numOfRows;
-
- for (int32_t i = start; i < numOfRows + start; ++i) {
+ for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
- numOfElems++;
+ numOfElems++;
char* data = colDataGetData(pCol, i);
- doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo);
+ doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true);
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t bottomFunction(SqlFunctionCtx* pCtx) {
+ int32_t numOfElems = 0;
+ SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
+
+ SInputColumnInfoData* pInput = &pCtx->input;
+ SColumnInfoData* pCol = pInput->pData[0];
+
+ int32_t type = pInput->pData[0]->info.type;
+
+ int32_t start = pInput->startRowIndex;
+ for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
+ if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
+
+ numOfElems++;
+ char* data = colDataGetData(pCol, i);
+ doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false);
}
return TSDB_CODE_SUCCESS;
@@ -1684,7 +1998,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
}
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
- uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
+ uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int32_t maxSize = pCtx->param[1].param.i;
@@ -1701,30 +2015,36 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
pItem->uid = uid;
// save the data of this tuple
- saveTupleData(pCtx, rowIndex, pSrcBlock, pItem);
+ saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
// allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo->numOfRes++;
taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn,
- false);
+ !isTopQuery);
} else { // replace the minimum value in the result
- if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
- (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
+ if ((isTopQuery && (
+ (IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
+ (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
+ (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)))
+ || (!isTopQuery && (
+ (IS_SIGNED_NUMERIC_TYPE(type) && val.i < pItems[0].v.i) ||
+ (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u < pItems[0].v.u) ||
+ (IS_FLOAT_TYPE(type) && val.d < pItems[0].v.d))
+ )) {
// replace the old data and the coresponding tuple data
STopBotResItem* pItem = &pItems[0];
pItem->v = val;
pItem->uid = uid;
// save the data of this tuple by over writing the old data
- copyTupleData(pCtx, rowIndex, pSrcBlock, pItem);
-
+ copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type,
- topBotResComparFn, NULL, false);
+ topBotResComparFn, NULL, !isTopQuery);
}
}
}
-void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
+void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
SFilePage* pPage = NULL;
int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool);
@@ -1740,7 +2060,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
}
}
- pItem->tuplePos.pageId = pCtx->curBufPage;
+ pPos->pageId = pCtx->curBufPage;
// keep the current row data, extract method
int32_t offset = 0;
@@ -1751,6 +2071,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
bool isNull = colDataIsNull_s(pCol, rowIndex);
if (isNull) {
nullList[i] = true;
+ offset += pCol->info.bytes;
continue;
}
@@ -1764,17 +2085,17 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
offset += pCol->info.bytes;
}
- pItem->tuplePos.offset = pPage->num;
+ pPos->offset = pPage->num;
pPage->num += completeRowSize;
setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage);
}
-void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) {
- SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId);
+void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
+ SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
- bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset);
+ bool* nullList = (bool*)((char*)pPage + pPos->offset);
char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool));
int32_t offset = 0;
@@ -1803,54 +2124,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
- int32_t type = pCtx->input.pData[0]->info.type;
- int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
+ int32_t type = pCtx->input.pData[0]->info.type;
+ int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
+
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows;
- switch (type) {
- case TSDB_DATA_TYPE_INT: {
- for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
- STopBotResItem* pItem = &pRes->pItems[i];
- colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i);
-
- int32_t pageId = pItem->tuplePos.pageId;
- int32_t offset = pItem->tuplePos.offset;
- if (pItem->tuplePos.pageId != -1) {
- SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
-
- bool* nullList = (bool*)((char*)pPage + offset);
- char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool));
-
- // todo set the offset value to optimize the performance.
- for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
- SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
-
- SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
- int32_t srcSlotId = pFuncParam->pCol->slotId;
- int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId;
-
- int32_t ps = 0;
- for (int32_t k = 0; k < srcSlotId; ++k) {
- SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k);
- ps += pSrcCol->info.bytes;
- }
-
- SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
- if (nullList[srcSlotId]) {
- colDataAppendNULL(pDstCol, currentRow);
- } else {
- colDataAppend(pDstCol, currentRow, (pStart + ps), false);
- }
- }
- }
-
- currentRow += 1;
- }
-
- break;
+ for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
+ STopBotResItem* pItem = &pRes->pItems[i];
+ if (type == TSDB_DATA_TYPE_FLOAT) {
+ float v = pItem->v.d;
+ colDataAppend(pCol, currentRow, (const char*)&v, false);
+ } else {
+ colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false);
}
+
+ setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
+ currentRow += 1;
}
return pEntryInfo->numOfRes;
diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c
index 75b6aeaae9..11502b4c47 100644
--- a/source/libs/function/src/tudf.c
+++ b/source/libs/function/src/tudf.c
@@ -23,10 +23,165 @@
#include "builtinsimpl.h"
#include "functionMgt.h"
-//TODO: network error processing.
//TODO: add unit test
//TODO: include all global variable under context struct
+typedef struct SUdfdData {
+ bool startCalled;
+ bool needCleanUp;
+ uv_loop_t loop;
+ uv_thread_t thread;
+ uv_barrier_t barrier;
+ uv_process_t process;
+ int spawnErr;
+ uv_pipe_t ctrlPipe;
+ uv_async_t stopAsync;
+ int32_t stopCalled;
+
+ int32_t dnodeId;
+} SUdfdData;
+
+SUdfdData udfdGlobal = {0};
+
+static int32_t udfSpawnUdfd(SUdfdData *pData);
+
+void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
+ fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
+ SUdfdData *pData = process->data;
+ if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
+ fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
+ } else {
+ fnInfo("udfd process restart");
+ udfSpawnUdfd(pData);
+ }
+}
+
+static int32_t udfSpawnUdfd(SUdfdData* pData) {
+ fnInfo("dnode start spawning udfd");
+ uv_process_options_t options = {0};
+
+ char path[PATH_MAX] = {0};
+ if (tsProcPath == NULL) {
+ path[0] = '.';
+ } else {
+ strncpy(path, tsProcPath, strlen(tsProcPath));
+ taosDirName(path);
+ }
+#ifdef WINDOWS
+ strcat(path, "udfd.exe");
+#else
+ strcat(path, "/udfd");
+#endif
+ char* argsUdfd[] = {path, "-c", configDir, NULL};
+ options.args = argsUdfd;
+ options.file = path;
+
+ options.exit_cb = udfUdfdExit;
+
+ uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
+
+ uv_stdio_container_t child_stdio[3];
+ child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
+ child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
+ child_stdio[1].flags = UV_IGNORE;
+ child_stdio[2].flags = UV_INHERIT_FD;
+ child_stdio[2].data.fd = 2;
+ options.stdio_count = 3;
+ options.stdio = child_stdio;
+
+ options.flags = UV_PROCESS_DETACHED;
+
+ char dnodeIdEnvItem[32] = {0};
+ char thrdPoolSizeEnvItem[32] = {0};
+ snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
+ float numCpuCores = 4;
+ taosGetCpuCores(&numCpuCores);
+ snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
+ char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
+ options.env = envUdfd;
+
+ int err = uv_spawn(&pData->loop, &pData->process, &options);
+ pData->process.data = (void*)pData;
+
+ if (err != 0) {
+ fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
+ }
+ return err;
+}
+
+static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
+ if (!uv_is_closing(handle)) {
+ uv_close(handle, NULL);
+ }
+}
+
+static void udfUdfdStopAsyncCb(uv_async_t *async) {
+ SUdfdData *pData = async->data;
+ uv_stop(&pData->loop);
+}
+
+static void udfWatchUdfd(void *args) {
+ SUdfdData *pData = args;
+ uv_loop_init(&pData->loop);
+ uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb);
+ pData->stopAsync.data = pData;
+ int32_t err = udfSpawnUdfd(pData);
+ atomic_store_32(&pData->spawnErr, err);
+ uv_barrier_wait(&pData->barrier);
+ uv_run(&pData->loop, UV_RUN_DEFAULT);
+ uv_loop_close(&pData->loop);
+
+ uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
+ uv_run(&pData->loop, UV_RUN_DEFAULT);
+ uv_loop_close(&pData->loop);
+ return;
+}
+
+int32_t udfStartUdfd(int32_t startDnodeId) {
+ SUdfdData *pData = &udfdGlobal;
+ if (pData->startCalled) {
+ fnInfo("dnode-mgmt start udfd already called");
+ return 0;
+ }
+ pData->startCalled = true;
+ char dnodeId[8] = {0};
+ snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
+ uv_os_setenv("DNODE_ID", dnodeId);
+ pData->dnodeId = startDnodeId;
+
+ uv_barrier_init(&pData->barrier, 2);
+ uv_thread_create(&pData->thread, udfWatchUdfd, pData);
+ uv_barrier_wait(&pData->barrier);
+ int32_t err = atomic_load_32(&pData->spawnErr);
+ if (err != 0) {
+ uv_barrier_destroy(&pData->barrier);
+ uv_async_send(&pData->stopAsync);
+ uv_thread_join(&pData->thread);
+ pData->needCleanUp = false;
+ fnInfo("dnode-mgmt udfd cleaned up after spawn err");
+ } else {
+ pData->needCleanUp = true;
+ }
+ return err;
+}
+
+int32_t udfStopUdfd() {
+ SUdfdData *pData = &udfdGlobal;
+ fnInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
+ pData->needCleanUp, pData->spawnErr);
+ if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
+ return 0;
+ }
+ atomic_store_32(&pData->stopCalled, 1);
+ pData->needCleanUp = false;
+ uv_barrier_destroy(&pData->barrier);
+ uv_async_send(&pData->stopAsync);
+ uv_thread_join(&pData->thread);
+ fnInfo("dnode-mgmt udfd cleaned up");
+ return 0;
+}
+
+//==============================================================================================
/* Copyright (c) 2013, Ben Noordhuis
* The QUEUE is copied from queue.h under libuv
* */
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index 9a8dcc57d9..911d8384f0 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -522,6 +522,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
ret = raftStoreClose(pSyncNode->pRaftStore);
assert(ret == 0);
+ syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
voteGrantedDestroy(pSyncNode->pVotesGranted);
votesRespondDestory(pSyncNode->pVotesRespond);
syncIndexMgrDestroy(pSyncNode->pNextIndex);
@@ -1138,6 +1139,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
syncNodeReplicate(ths);
}
+ syncEntryDestory(pEntry);
return ret;
}
diff --git a/tests/script/tsim/insert/basic0.sim b/tests/script/tsim/insert/basic0.sim
index 94bd0f1ecf..d8dde20e4e 100644
--- a/tests/script/tsim/insert/basic0.sim
+++ b/tests/script/tsim/insert/basic0.sim
@@ -140,7 +140,8 @@ endi
if $data00 != -13 then
return -1
endi
-if $data01 != -2.30000 then
+if $data01 != -2.30000 then
+ print expect -2.30000, actual: $data01
return -1
endi
if $data02 != -3.300000000 then
diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim
index cabb88ea09..b3cbda5090 100644
--- a/tests/script/tsim/query/udf.sim
+++ b/tests/script/tsim/query/udf.sim
@@ -64,35 +64,35 @@ if $data00 != 1.414213562 then
return -1
endi
-sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
-sql select udf1(f1, f2) from t2;
-print $rows , $data00 , $data10 , $data20 , $data30
-if $rows != 4 then
- return -1
-endi
-if $data00 != 88 then
- return -1
-endi
-if $data10 != 88 then
- return -1
-endi
-
-if $data20 != NULL then
- return -1
-endi
-
-if $data30 != NULL then
- return -1
-endi
-
-sql select udf2(f1, f2) from t2;
-print $rows, $data00
-if $rows != 1 then
- return -1
-endi
-if $data00 != 2.645751311 then
- return -1
-endi
+#sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
+#sql select udf1(f1, f2) from t2;
+#print $rows , $data00 , $data10 , $data20 , $data30
+#if $rows != 4 then
+# return -1
+#endi
+#if $data00 != 88 then
+# return -1
+#endi
+#if $data10 != 88 then
+# return -1
+#endi
+#
+#if $data20 != NULL then
+# return -1
+#endi
+#
+#if $data30 != NULL then
+# return -1
+#endi
+#
+#sql select udf2(f1, f2) from t2;
+#print $rows, $data00
+#if $rows != 1 then
+# return -1
+#endi
+#if $data00 != 2.645751311 then
+# return -1
+#endi
sql drop function udf1;
sql show functions;
if $rows != 1 then
diff --git a/tools/taos-tools b/tools/taos-tools
index 2f3dfddd4d..0ae9f872c2 160000
--- a/tools/taos-tools
+++ b/tools/taos-tools
@@ -1 +1 @@
-Subproject commit 2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c
+Subproject commit 0ae9f872c26d5da8cb61aa9eb00b5c7aeba10ec4