diff --git a/source/libs/executor/test/index_executor_tests.cpp b/source/libs/executor/test/index_executor_tests.cpp index 5f1bff45a3..5b03da034e 100644 --- a/source/libs/executor/test/index_executor_tests.cpp +++ b/source/libs/executor/test/index_executor_tests.cpp @@ -200,11 +200,37 @@ TEST(testCase, index_filter) { doFilterTag(opNode, result); EXPECT_EQ(1, taosArrayGetSize(result)); + taosArrayDestroy(result); + nodesDestroyNode(res); + } + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight); + SArray *result = taosArrayInit(4, sizeof(uint64_t)); + doFilterTag(opNode, result); + EXPECT_EQ(0, taosArrayGetSize(result)); + taosArrayDestroy(result); + nodesDestroyNode(res); + } + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_GREATER_EQUAL, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); + + SArray *result = taosArrayInit(4, sizeof(uint64_t)); + doFilterTag(opNode, result); + EXPECT_EQ(0, taosArrayGetSize(result)); + taosArrayDestroy(result); nodesDestroyNode(res); } } +// add other greater/lower/equal/in compare func test + TEST(testCase, index_filter_varify) { { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 638a070192..9fa20d3556 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -310,28 +310,28 @@ enum { }; int64_t gUdfTaskSeqNum = 0; -typedef struct SUdfdProxy { +typedef struct SUdfcProxy { char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; - uv_barrier_t gUdfInitBarrier; + uv_barrier_t initBarrier; - uv_loop_t gUdfdLoop; - uv_thread_t gUdfLoopThread; - uv_async_t gUdfLoopTaskAync; + uv_loop_t uvLoop; + uv_thread_t loopThread; + uv_async_t loopTaskAync; - uv_async_t gUdfLoopStopAsync; + uv_async_t loopStopAsync; - uv_mutex_t gUdfTaskQueueMutex; - int8_t gUdfcState; - QUEUE gUdfTaskQueue; - QUEUE gUvProcTaskQueue; + uv_mutex_t taskQueueMutex; + int8_t udfcState; + QUEUE taskQueue; + QUEUE uvProcTaskQueue; int8_t initialized; -} SUdfdProxy; +} SUdfcProxy; -SUdfdProxy gUdfdProxy = {0}; +SUdfcProxy gUdfdProxy = {0}; typedef struct SClientUdfUvSession { - SUdfdProxy *udfc; + SUdfcProxy *udfc; int64_t severHandle; uv_pipe_t *udfUvPipe; @@ -341,7 +341,7 @@ typedef struct SClientUdfUvSession { } SClientUdfUvSession; typedef struct SClientUvTaskNode { - SUdfdProxy *udfc; + SUdfcProxy *udfc; int8_t type; int errCode; @@ -1055,11 +1055,11 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask); - SUdfdProxy *udfc = uvTask->udfc; - uv_mutex_lock(&udfc->gUdfTaskQueueMutex); - QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue); - uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); - uv_async_send(&udfc->gUdfLoopTaskAync); + SUdfcProxy *udfc = uvTask->udfc; + uv_mutex_lock(&udfc->taskQueueMutex); + QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue); + uv_mutex_unlock(&udfc->taskQueueMutex); + uv_async_send(&udfc->loopTaskAync); uv_sem_wait(&uvTask->taskSem); fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask); @@ -1073,7 +1073,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0); + uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0); uvTask->pipe = pipe; SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn)); @@ -1113,46 +1113,46 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { } void udfClientAsyncCb(uv_async_t *async) { - SUdfdProxy *udfc = async->data; + SUdfcProxy *udfc = async->data; QUEUE wq; - uv_mutex_lock(&udfc->gUdfTaskQueueMutex); - QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); - uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); + uv_mutex_lock(&udfc->taskQueueMutex); + QUEUE_MOVE(&udfc->taskQueue, &wq); + uv_mutex_unlock(&udfc->taskQueueMutex); while (!QUEUE_EMPTY(&wq)) { QUEUE* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); udfcStartUvTask(task); - QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue); + QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); } } -void cleanUpUvTasks(SUdfdProxy *udfc) { +void cleanUpUvTasks(SUdfcProxy *udfc) { fnDebug("clean up uv tasks") QUEUE wq; - uv_mutex_lock(&udfc->gUdfTaskQueueMutex); - QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); - uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); + uv_mutex_lock(&udfc->taskQueueMutex); + QUEUE_MOVE(&udfc->taskQueue, &wq); + uv_mutex_unlock(&udfc->taskQueueMutex); while (!QUEUE_EMPTY(&wq)) { QUEUE* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); - if (udfc->gUdfcState == UDFC_STATE_STOPPING) { + if (udfc->udfcState == UDFC_STATE_STOPPING) { task->errCode = TSDB_CODE_UDF_STOPPING; } uv_sem_post(&task->taskSem); } - while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) { - QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue); + while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) { + QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); - if (udfc->gUdfcState == UDFC_STATE_STOPPING) { + if (udfc->udfcState == UDFC_STATE_STOPPING) { task->errCode = TSDB_CODE_UDF_STOPPING; } uv_sem_post(&task->taskSem); @@ -1160,28 +1160,28 @@ void cleanUpUvTasks(SUdfdProxy *udfc) { } void udfStopAsyncCb(uv_async_t *async) { - SUdfdProxy *udfc = async->data; + SUdfcProxy *udfc = async->data; cleanUpUvTasks(udfc); - if (udfc->gUdfcState == UDFC_STATE_STOPPING) { - uv_stop(&udfc->gUdfdLoop); + if (udfc->udfcState == UDFC_STATE_STOPPING) { + uv_stop(&udfc->uvLoop); } } void constructUdfService(void *argsThread) { - SUdfdProxy *udfc = (SUdfdProxy*)argsThread; - uv_loop_init(&udfc->gUdfdLoop); + SUdfcProxy *udfc = (SUdfcProxy *)argsThread; + uv_loop_init(&udfc->uvLoop); - uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb); - udfc->gUdfLoopTaskAync.data = udfc; - uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb); - udfc->gUdfLoopStopAsync.data = udfc; - uv_mutex_init(&udfc->gUdfTaskQueueMutex); - QUEUE_INIT(&udfc->gUdfTaskQueue); - QUEUE_INIT(&udfc->gUvProcTaskQueue); - uv_barrier_wait(&udfc->gUdfInitBarrier); + uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfClientAsyncCb); + udfc->loopTaskAync.data = udfc; + uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); + udfc->loopStopAsync.data = udfc; + uv_mutex_init(&udfc->taskQueueMutex); + QUEUE_INIT(&udfc->taskQueue); + QUEUE_INIT(&udfc->uvProcTaskQueue); + uv_barrier_wait(&udfc->initBarrier); //TODO return value of uv_run - uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT); - uv_loop_close(&udfc->gUdfdLoop); + uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); + uv_loop_close(&udfc->uvLoop); } int32_t udfcOpen() { @@ -1189,14 +1189,14 @@ int32_t udfcOpen() { if (old == 1) { return 0; } - SUdfdProxy *proxy = &gUdfdProxy; + SUdfcProxy *proxy = &gUdfdProxy; getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName)); - proxy->gUdfcState = UDFC_STATE_STARTNG; - uv_barrier_init(&proxy->gUdfInitBarrier, 2); - uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); - atomic_store_8(&proxy->gUdfcState, UDFC_STATE_READY); - proxy->gUdfcState = UDFC_STATE_READY; - uv_barrier_wait(&proxy->gUdfInitBarrier); + proxy->udfcState = UDFC_STATE_STARTNG; + uv_barrier_init(&proxy->initBarrier, 2); + uv_thread_create(&proxy->loopThread, constructUdfService, proxy); + atomic_store_8(&proxy->udfcState, UDFC_STATE_READY); + proxy->udfcState = UDFC_STATE_READY; + uv_barrier_wait(&proxy->initBarrier); fnInfo("udfc initialized") return 0; } @@ -1207,13 +1207,13 @@ int32_t udfcClose() { return 0; } - SUdfdProxy *udfc = &gUdfdProxy; - udfc->gUdfcState = UDFC_STATE_STOPPING; - uv_async_send(&udfc->gUdfLoopStopAsync); - uv_thread_join(&udfc->gUdfLoopThread); - uv_mutex_destroy(&udfc->gUdfTaskQueueMutex); - uv_barrier_destroy(&udfc->gUdfInitBarrier); - udfc->gUdfcState = UDFC_STATE_INITAL; + SUdfcProxy *udfc = &gUdfdProxy; + udfc->udfcState = UDFC_STATE_STOPPING; + uv_async_send(&udfc->loopStopAsync); + uv_thread_join(&udfc->loopThread); + uv_mutex_destroy(&udfc->taskQueueMutex); + uv_barrier_destroy(&udfc->initBarrier); + udfc->udfcState = UDFC_STATE_INITAL; fnInfo("udfc cleaned up"); return 0; } @@ -1236,7 +1236,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { fnInfo("udfc setup udf. udfName: %s", udfName); - if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) { + if (gUdfdProxy.udfcState != UDFC_STATE_READY) { return TSDB_CODE_UDF_INVALID_STATE; } SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask)); @@ -1484,7 +1484,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SSDataBlock tempBlock = {0}; tempBlock.info.numOfCols = numOfCols; - tempBlock.info.rows = numOfRows; + tempBlock.info.rows = pInput->totalRows; tempBlock.info.uid = pInput->uid; bool hasVarCol = false; tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index e0c24ac3bd..d56413f840 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -27,10 +27,33 @@ #endif #define INDEX_NUM_OF_THREADS 4 -#define INDEX_QUEUE_SIZE 200 +#define INDEX_QUEUE_SIZE 200 void* indexQhandle = NULL; +#define INDEX_DATA_BOOL_NULL 0x02 +#define INDEX_DATA_TINYINT_NULL 0x80 +#define INDEX_DATA_SMALLINT_NULL 0x8000 +#define INDEX_DATA_INT_NULL 0x80000000L +#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L +#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL + +#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN +#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN +#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF +#define INDEX_DATA_BINARY_NULL 0xFF +#define INDEX_DATA_JSON_NULL 0xFFFFFFFF +#define INDEX_DATA_JSON_null 0xFFFFFFFE +#define INDEX_DATA_JSON_NOT_NULL 0x01 + +#define INDEX_DATA_UTINYINT_NULL 0xFF +#define INDEX_DATA_USMALLINT_NULL 0xFFFF +#define INDEX_DATA_UINT_NULL 0xFFFFFFFF +#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL + +#define INDEX_DATA_NULL_STR "NULL" +#define INDEX_DATA_NULL_STR_L "null" + void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); @@ -67,12 +90,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { return -1; } -#ifdef USE_LUCENE - index_t* index = index_open(path); - sIdx->index = index; -#endif - -#ifdef USE_INVERTED_INDEX // sIdx->cache = (void*)indexCacheCreate(sIdx); sIdx->tindex = indexTFileCreate(path); if (sIdx->tindex == NULL) { @@ -85,7 +102,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { taosThreadMutexInit(&sIdx->mtx, NULL); *index = sIdx; return 0; -#endif END: if (sIdx != NULL) { @@ -97,12 +113,6 @@ END: } void indexClose(SIndex* sIdx) { -#ifdef USE_LUCENE - index_close(sIdex->index); - sIdx->index = NULL; -#endif - -#ifdef USE_INVERTED_INDEX void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; @@ -114,31 +124,12 @@ void indexClose(SIndex* sIdx) { taosHashCleanup(sIdx->colObj); taosThreadMutexDestroy(&sIdx->mtx); indexTFileDestroy(sIdx->tindex); -#endif taosMemoryFree(sIdx->path); taosMemoryFree(sIdx); return; } int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { -#ifdef USE_LUCENE - index_document_t* doc = index_document_create(); - - char buf[16] = {0}; - sprintf(buf, "%d", uid); - - for (int i = 0; i < taosArrayGetSize(fVals); i++) { - SIndexTerm* p = taosArrayGetP(fVals, i); - index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1); - } - index_document_add(doc, NULL, 0, buf, strlen(buf), 0); - - index_put(index->index, doc); - index_document_destroy(doc); -#endif - -#ifdef USE_INVERTED_INDEX - // TODO(yihao): reduce the lock range taosThreadMutexLock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { @@ -170,12 +161,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { return ret; } } - -#endif return 0; } int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { -#ifdef USE_INVERTED_INDEX EIndexOperatorType opera = multiQuerys->opera; // relation of querys SArray* iRslts = taosArrayInit(4, POINTER_BYTES); @@ -188,35 +176,14 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result } indexMergeFinalResults(iRslts, opera, result); indexInterResultsDestroy(iRslts); - -#endif return 0; } -int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { -#ifdef USE_INVERTED_INDEX +int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; } +int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; } -#endif - - return 1; -} -int indexRebuild(SIndex* index, SIndexOpts* opts) { -#ifdef USE_INVERTED_INDEX -#endif - - return 0; -} - -SIndexOpts* indexOptsCreate() { -#ifdef USE_LUCENE -#endif - return NULL; -} -void indexOptsDestroy(SIndexOpts* opts) { -#ifdef USE_LUCENE -#endif - return; -} +SIndexOpts* indexOptsCreate() { return NULL; } +void indexOptsDestroy(SIndexOpts* opts) { return; } /* * @param: oper * diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index 5f471dba65..08d58da07f 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -403,6 +403,19 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { EXPECT_EQ(1000, taosArrayGetSize(result)); indexMultiTermQueryDestroy(mq); } + { + std::string colName("other_column"); + std::string colVal("100"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } { std::string colName("test1"); std::string colVal("10"); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index cd7e3beb13..26f3f689aa 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -145,9 +145,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } while (0) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) -#define CONN_PERSIST_TIME(para) (para * 1000 * 10) -#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ do { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ @@ -223,11 +223,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); #define CONN_RELEASE_BY_SERVER(conn) \ (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) -#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) +#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) +#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) -#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) +#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) static void* cliWorkThread(void* arg); diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index 00dbcf0f6d..b02ca79ed4 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -109,6 +109,7 @@ if $data01 != 18.547236991 then endi sql select udf2(udf1(f2-f1)), udf2(udf1(f2/f1)) from t2; +print $rows , $data00 , $data01 if $rows != 1 then return -1 endi @@ -118,7 +119,19 @@ endi if $data01 != 152.420471066 then return -1 endi -print $rows , $data00 , $data01 + +sql select udf2(f2) from udf.t2 group by 1-udf1(f1); +print $rows , $data00 , $data10 +if $rows != 2 then + return -1 +endi +if $data00 != 2.000000000 then + return -1 +endi +if $data10 != 12.083045974 then + return -1 +endi + sql drop function udf1; sql show functions; if $rows != 1 then