Merge branch '3.0' into feature/3.0_interval_hash_optimize
This commit is contained in:
commit
4dc30fb55a
|
@ -331,9 +331,11 @@ endif(${BUILD_WITH_TRAFT})
|
|||
|
||||
# LIBUV
|
||||
if(${BUILD_WITH_UV})
|
||||
if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows")
|
||||
MESSAGE("Windows need set no-sign-compare")
|
||||
add_compile_options(-Wno-sign-compare)
|
||||
if (TD_WINDOWS)
|
||||
# There is no GetHostNameW function on win7.
|
||||
file(READ "libuv/src/win/util.c" LIBUV_WIN_UTIL_CONTENT)
|
||||
string(REPLACE "if (GetHostNameW(buf, UV_MAXHOSTNAMESIZE" "DWORD nSize = UV_MAXHOSTNAMESIZE;\n if (GetComputerNameW(buf, &nSize" LIBUV_WIN_UTIL_CONTENT "${LIBUV_WIN_UTIL_CONTENT}")
|
||||
file(WRITE "libuv/src/win/util.c" "${LIBUV_WIN_UTIL_CONTENT}")
|
||||
endif ()
|
||||
add_subdirectory(libuv EXCLUDE_FROM_ALL)
|
||||
endif(${BUILD_WITH_UV})
|
||||
|
|
|
@ -924,9 +924,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
|
||||
SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
|
|
@ -429,7 +429,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
|
|||
for (int32_t i = 0; i < rows; i++) {
|
||||
int64_t* uid = taosArrayGet(uidList, i);
|
||||
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
|
||||
if (suid != 0) {
|
||||
ASSERT(tag);
|
||||
}
|
||||
for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){
|
||||
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
|
||||
|
||||
|
|
|
@ -871,7 +871,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
|
|||
|
||||
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
|
||||
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
|
||||
;
|
||||
}
|
||||
|
||||
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
|
||||
|
@ -1595,7 +1594,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -1886,62 +1885,6 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
|
||||
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pInfo->inputOrder = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
|
||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
pOperator->name = "StreamTimeIntervalAggOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL,
|
||||
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// todo handle multiple timeline cases. assume no timeline interweaving
|
||||
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -3109,9 +3052,10 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
|
|||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
||||
TSKEY maxTs = INT64_MIN;
|
||||
|
||||
|
|
|
@ -2822,6 +2822,29 @@ static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t checkEvery(STranslateContext* pCxt, SValueNode* pInterval) {
|
||||
int32_t len = strlen(pInterval->literal);
|
||||
|
||||
char *unit = &pInterval->literal[len - 1];
|
||||
if (*unit == 'n' || *unit == 'y') {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE,
|
||||
"Unsupported time unit in EVERY clause");
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
code = checkEvery(pCxt, (SValueNode *)(*pEvery));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, pEvery);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -2856,7 +2879,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
|
||||
int32_t code = translateExpr(pCxt, &pSelect->pRange);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateExpr(pCxt, &pSelect->pEvery);
|
||||
code = translateInterpEvery(pCxt, &pSelect->pEvery);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = translateInterpFill(pCxt, pSelect);
|
||||
|
|
|
@ -199,10 +199,20 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
|
|||
if (pPageH) {
|
||||
// copy the page content
|
||||
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
|
||||
|
||||
for (int nLoops = 0;;) {
|
||||
if (pPageH->pPager) break;
|
||||
if (++nLoops > 1000) {
|
||||
sched_yield();
|
||||
nLoops = 0;
|
||||
}
|
||||
}
|
||||
|
||||
pPage->pLruNext = NULL;
|
||||
pPage->pPager = pPageH->pPager;
|
||||
|
||||
memcpy(pPage->pData, pPageH->pData, pPage->pageSize);
|
||||
tdbDebug("pcache/pPageH: %p %d %p %p", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage);
|
||||
tdbPageInit(pPage, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize);
|
||||
pPage->kLen = pPageH->kLen;
|
||||
pPage->vLen = pPageH->vLen;
|
||||
|
|
|
@ -33,6 +33,8 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd);
|
|||
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url);
|
||||
int32_t cfgSetItem(SConfig *pConfig, const char *name, const char *value, ECfgSrcType stype);
|
||||
|
||||
extern char **environ;
|
||||
|
||||
SConfig *cfgInit() {
|
||||
SConfig *pCfg = taosMemoryCalloc(1, sizeof(SConfig));
|
||||
if (pCfg == NULL) {
|
||||
|
@ -627,24 +629,17 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
|
|||
}
|
||||
|
||||
int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
||||
char *line = NULL, *name, *value, *value2, *value3;
|
||||
char line[1024], *name, *value, *value2, *value3;
|
||||
int32_t olen, vlen, vlen2, vlen3;
|
||||
int32_t code = 0;
|
||||
ssize_t _bytes = 0;
|
||||
TdCmdPtr pCmd = taosOpenCmd("set");
|
||||
if (pCmd == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
while (!taosEOFCmd(pCmd)) {
|
||||
char **pEnv = environ;
|
||||
line[1023] = 0;
|
||||
while(*pEnv != NULL) {
|
||||
name = value = value2 = value3 = NULL;
|
||||
olen = vlen = vlen2 = vlen3 = 0;
|
||||
|
||||
_bytes = taosGetLineCmd(pCmd, &line);
|
||||
if (_bytes < 0) {
|
||||
break;
|
||||
}
|
||||
if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
|
||||
strncpy(line, *pEnv, sizeof(line)-1);
|
||||
pEnv++;
|
||||
taosEnvToCfg(line, line);
|
||||
|
||||
paGetToken(line, &name, &olen);
|
||||
|
@ -671,9 +666,6 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
|||
}
|
||||
}
|
||||
|
||||
taosCloseCmd(&pCmd);
|
||||
if (line != NULL) taosMemoryFreeClear(line);
|
||||
|
||||
uInfo("load from env variables cfg success");
|
||||
return 0;
|
||||
}
|
||||
|
@ -1040,16 +1032,12 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
|
|||
index++;
|
||||
}
|
||||
|
||||
char *line = NULL;
|
||||
ssize_t _bytes = 0;
|
||||
TdCmdPtr pCmd = taosOpenCmd("set");
|
||||
if (pCmd != NULL) {
|
||||
while (!taosEOFCmd(pCmd)) {
|
||||
_bytes = taosGetLineCmd(pCmd, &line);
|
||||
if (_bytes < 0) {
|
||||
break;
|
||||
}
|
||||
if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
|
||||
char line[1024];
|
||||
char **pEnv = environ;
|
||||
line[1023] = 0;
|
||||
while(*pEnv != NULL) {
|
||||
strncpy(line, *pEnv, sizeof(line)-1);
|
||||
pEnv++;
|
||||
if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) {
|
||||
char *p = strchr(line, '=');
|
||||
if (p != NULL) {
|
||||
|
@ -1060,15 +1048,10 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
|
|||
}
|
||||
memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX));
|
||||
uInfo("get apollo url from env variables success, apolloUrl=%s",apolloUrl);
|
||||
taosCloseCmd(&pCmd);
|
||||
if (line != NULL) taosMemoryFreeClear(line);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
taosCloseCmd(&pCmd);
|
||||
if (line != NULL) taosMemoryFreeClear(line);
|
||||
}
|
||||
|
||||
const char *filepath = ".env";
|
||||
if (envFile != NULL && strlen(envFile)>0) {
|
||||
|
@ -1083,10 +1066,11 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
int64_t _bytes;
|
||||
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM);
|
||||
if (pFile != NULL) {
|
||||
while (!taosEOFFile(pFile)) {
|
||||
_bytes = taosGetLineFile(pFile, &line);
|
||||
_bytes = taosGetsFile(pFile, sizeof(line) - 1, line);
|
||||
if (_bytes <= 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -1101,14 +1085,12 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
|
|||
}
|
||||
memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX));
|
||||
taosCloseFile(&pFile);
|
||||
if (line != NULL) taosMemoryFreeClear(line);
|
||||
uInfo("get apollo url from env file success");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
taosCloseFile(&pFile);
|
||||
if (line != NULL) taosMemoryFreeClear(line);
|
||||
}
|
||||
|
||||
uInfo("fail get apollo url from cmd env file");
|
||||
|
|
Loading…
Reference in New Issue