diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 494d1577fc..2dc7622f46 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -331,9 +331,11 @@ endif(${BUILD_WITH_TRAFT}) # LIBUV if(${BUILD_WITH_UV}) - if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows") - MESSAGE("Windows need set no-sign-compare") - add_compile_options(-Wno-sign-compare) + if (TD_WINDOWS) + # There is no GetHostNameW function on win7. + file(READ "libuv/src/win/util.c" LIBUV_WIN_UTIL_CONTENT) + string(REPLACE "if (GetHostNameW(buf, UV_MAXHOSTNAMESIZE" "DWORD nSize = UV_MAXHOSTNAMESIZE;\n if (GetComputerNameW(buf, &nSize" LIBUV_WIN_UTIL_CONTENT "${LIBUV_WIN_UTIL_CONTENT}") + file(WRITE "libuv/src/win/util.c" "${LIBUV_WIN_UTIL_CONTENT}") endif () add_subdirectory(libuv EXCLUDE_FROM_ALL) endif(${BUILD_WITH_UV}) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 74810d4d07..5e339eb113 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -924,9 +924,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); -SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index baeb972e05..bf969bf2e4 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -429,7 +429,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray for (int32_t i = 0; i < rows; i++) { int64_t* uid = taosArrayGet(uidList, i); void* tag = taosHashGet(tags, uid, sizeof(int64_t)); - ASSERT(tag); + if (suid != 0) { + ASSERT(tag); + } for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a7e93a1906..9b9a38c7ea 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -871,7 +871,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); - ; } static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { @@ -1595,7 +1594,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); @@ -1886,62 +1885,6 @@ _error: return NULL; } -SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { - SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - goto _error; - } - - pOperator->pTaskInfo = pTaskInfo; - pInfo->inputOrder = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; - pInfo->execModel = OPTR_EXEC_MODEL_STREAM; - pInfo->win = pTaskInfo->window; - pInfo->twAggSup = *pTwAggSupp; - pInfo->primaryTsIndex = primaryTsSlotId; - - int32_t numOfRows = 4096; - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - - initResultSizeInfo(&pOperator->resultInfo, numOfRows); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); - initBasicInfo(&pInfo->binfo, pResBlock); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); - - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - initResultRowInfo(&pInfo->binfo.resultRowInfo); - - pOperator->name = "StreamTimeIntervalAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->info = pInfo; - - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, - destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); - - code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - return pOperator; - -_error: - destroyIntervalOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pOperator); - pTaskInfo->code = code; - return NULL; -} - // todo handle multiple timeline cases. assume no timeline interweaving static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3109,11 +3052,12 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); - SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); - TSKEY maxTs = INT64_MIN; + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + TSKEY maxTs = INT64_MIN; SExprSupp* pSup = &pOperator->exprSupp; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1c7446ad6f..3c0d9a5f63 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2822,6 +2822,29 @@ static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { return TSDB_CODE_SUCCESS; } +static int32_t checkEvery(STranslateContext* pCxt, SValueNode* pInterval) { + int32_t len = strlen(pInterval->literal); + + char *unit = &pInterval->literal[len - 1]; + if (*unit == 'n' || *unit == 'y') { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, + "Unsupported time unit in EVERY clause"); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) { + int32_t code = TSDB_CODE_SUCCESS; + + code = checkEvery(pCxt, (SValueNode *)(*pEvery)); + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, pEvery); + } + + return code; +} + static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { int32_t code = TSDB_CODE_SUCCESS; @@ -2856,7 +2879,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { int32_t code = translateExpr(pCxt, &pSelect->pRange); if (TSDB_CODE_SUCCESS == code) { - code = translateExpr(pCxt, &pSelect->pEvery); + code = translateInterpEvery(pCxt, &pSelect->pEvery); } if (TSDB_CODE_SUCCESS == code) { code = translateInterpFill(pCxt, pSelect); diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 22229ea0e8..ab9b21dc3f 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -199,10 +199,20 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) if (pPageH) { // copy the page content memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid)); + + for (int nLoops = 0;;) { + if (pPageH->pPager) break; + if (++nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + pPage->pLruNext = NULL; pPage->pPager = pPageH->pPager; memcpy(pPage->pData, pPageH->pData, pPage->pageSize); + tdbDebug("pcache/pPageH: %p %d %p %p", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage); tdbPageInit(pPage, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize); pPage->kLen = pPageH->kLen; pPage->vLen = pPageH->vLen; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index fdb397561d..2a28ec66d2 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -33,6 +33,8 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd); int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url); int32_t cfgSetItem(SConfig *pConfig, const char *name, const char *value, ECfgSrcType stype); +extern char **environ; + SConfig *cfgInit() { SConfig *pCfg = taosMemoryCalloc(1, sizeof(SConfig)); if (pCfg == NULL) { @@ -627,24 +629,17 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { } int32_t cfgLoadFromEnvVar(SConfig *pConfig) { - char *line = NULL, *name, *value, *value2, *value3; + char line[1024], *name, *value, *value2, *value3; int32_t olen, vlen, vlen2, vlen3; int32_t code = 0; - ssize_t _bytes = 0; - TdCmdPtr pCmd = taosOpenCmd("set"); - if (pCmd == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - while (!taosEOFCmd(pCmd)) { + char **pEnv = environ; + line[1023] = 0; + while(*pEnv != NULL) { name = value = value2 = value3 = NULL; olen = vlen = vlen2 = vlen3 = 0; - _bytes = taosGetLineCmd(pCmd, &line); - if (_bytes < 0) { - break; - } - if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0; + strncpy(line, *pEnv, sizeof(line)-1); + pEnv++; taosEnvToCfg(line, line); paGetToken(line, &name, &olen); @@ -671,9 +666,6 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) { } } - taosCloseCmd(&pCmd); - if (line != NULL) taosMemoryFreeClear(line); - uInfo("load from env variables cfg success"); return 0; } @@ -1040,34 +1032,25 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl index++; } - char *line = NULL; - ssize_t _bytes = 0; - TdCmdPtr pCmd = taosOpenCmd("set"); - if (pCmd != NULL) { - while (!taosEOFCmd(pCmd)) { - _bytes = taosGetLineCmd(pCmd, &line); - if (_bytes < 0) { - break; - } - if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0; - if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) { - char *p = strchr(line, '='); - if (p != NULL) { + char line[1024]; + char **pEnv = environ; + line[1023] = 0; + while(*pEnv != NULL) { + strncpy(line, *pEnv, sizeof(line)-1); + pEnv++; + if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) { + char *p = strchr(line, '='); + if (p != NULL) { + p++; + if (*p == '\'') { p++; - if (*p == '\'') { - p++; - p[strlen(p)-1] = '\0'; - } - memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX)); - uInfo("get apollo url from env variables success, apolloUrl=%s",apolloUrl); - taosCloseCmd(&pCmd); - if (line != NULL) taosMemoryFreeClear(line); - return 0; + p[strlen(p)-1] = '\0'; } + memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX)); + uInfo("get apollo url from env variables success, apolloUrl=%s",apolloUrl); + return 0; } } - taosCloseCmd(&pCmd); - if (line != NULL) taosMemoryFreeClear(line); } const char *filepath = ".env"; @@ -1083,10 +1066,11 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl return 0; } } + int64_t _bytes; TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM); if (pFile != NULL) { while (!taosEOFFile(pFile)) { - _bytes = taosGetLineFile(pFile, &line); + _bytes = taosGetsFile(pFile, sizeof(line) - 1, line); if (_bytes <= 0) { break; } @@ -1101,14 +1085,12 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl } memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX)); taosCloseFile(&pFile); - if (line != NULL) taosMemoryFreeClear(line); uInfo("get apollo url from env file success"); return 0; } } } taosCloseFile(&pFile); - if (line != NULL) taosMemoryFreeClear(line); } uInfo("fail get apollo url from cmd env file");