diff --git a/cmake/cmake.define b/cmake/cmake.define index 56b6b7e1de..44b36d0efa 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -170,7 +170,7 @@ ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") ENDIF() - MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2/AVX512) is ACTIVATED") + MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED") IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi") diff --git a/include/common/ttime.h b/include/common/ttime.h index 306b5105d0..1dfa609064 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -118,6 +118,13 @@ int32_t taosChar2Ts(const char* format, SArray** formats, const char* tsStr, int void TEST_ts2char(const char* format, int64_t ts, int32_t precision, char* out, int32_t outLen); int32_t TEST_char2ts(const char* format, int64_t* ts, int32_t precision, const char* tsStr); +/// @brief get offset seconds from zero timezone to input timezone +/// for +XX timezone, the offset to zero is negative value +/// @param tzStr timezonestr, eg: +0800, -0830, -08 +/// @param offset seconds, eg: +08 offset -28800, -01 offset 3600 +/// @return 0 success, other fail +int32_t offsetOfTimezone(char* tzStr, int64_t* offset); + #ifdef __cplusplus } #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 654a0b6abc..f407290c00 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -825,6 +825,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta); +void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 04c6c0cbaf..a3ee294338 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -640,6 +640,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503) #define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504) #define TSDB_CODE_SCH_JOB_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x2505) +#define TSDB_CODE_SCH_JOB_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x2506) //parser #define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 3a3d13117e..ab0c22fc9b 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -30,6 +30,10 @@ extern "C" { #define INT64MASK(_x) ((((uint64_t)1) << _x) - 1) #define INT32MASK(_x) (((uint32_t)1 << _x) - 1) #define INT8MASK(_x) (((uint8_t)1 << _x) - 1) + +#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode +#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode + // Compression algorithm #define NO_COMPRESSION 0 #define ONE_STAGE_COMP 1 @@ -129,6 +133,12 @@ int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32 int32_t nBuf); int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf, int32_t nBuf); +// for internal usage +int32_t getWordLength(char type); + +int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); +int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); /************************************************************************* * STREAM COMPRESSION diff --git a/include/util/tdef.h b/include/util/tdef.h index 1b56b5b623..69d0c1126d 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -34,7 +34,6 @@ extern "C" { // Bytes for each type. extern const int32_t TYPE_BYTES[21]; -// TODO: replace and remove code below #define CHAR_BYTES sizeof(char) #define SHORT_BYTES sizeof(int16_t) #define INT_BYTES sizeof(int32_t) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index ea41afd8fb..71f0df5ebe 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -520,7 +520,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { // div round up int seq; - uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 8; + uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 7; int totalSeq = ((contentLength + chunk_size - 1) / chunk_size); MultipartPartData partData; @@ -717,6 +717,7 @@ static SArray *getListByPrefix(const char *prefix) { return data.objectArray; } } else { + taosArrayDestroyEx(data.objectArray, s3FreeObjectKey); s3PrintError(__func__, data.status, data.err_msg); } return NULL; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 37e0e97575..7a2ae90cda 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1478,6 +1478,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { return -1; } + bool matched = false; + int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); @@ -1486,6 +1488,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { if (strcasecmp("debugFlag", name) == 0) { int32_t flag = pItem->i32; taosSetAllDebugFlag(flag, true); + matched = true; } break; } @@ -1494,6 +1497,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { bool enableCore = pItem->bval; taosSetCoreDump(enableCore); uInfo("%s set to %d", name, enableCore); + matched = true; } break; } @@ -1512,6 +1516,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); + matched = true; } else if (strcasecmp("firstEp", name) == 0) { tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; @@ -1526,6 +1531,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); + matched = true; } break; } @@ -1536,10 +1542,12 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { taosSetSystemLocale(locale, charset); osSetSystemLocale(locale, charset); uInfo("locale set to '%s', charset set to '%s'", locale, charset); + matched = true; } else if (strcasecmp("logDir", name) == 0) { uInfo("%s set from '%s' to '%s'", name, tsLogDir, pItem->str); tstrncpy(tsLogDir, pItem->str, PATH_MAX); taosExpandDir(tsLogDir, tsLogDir, PATH_MAX); + matched = true; } break; } @@ -1547,15 +1555,19 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { if (strcasecmp("metaCacheMaxSize", name) == 0) { atomic_store_32(&tsMetaCacheMaxSize, pItem->i32); uInfo("%s set to %d", name, atomic_load_32(&tsMetaCacheMaxSize)); + matched = true; } else if (strcasecmp("minimalTmpDirGB", name) == 0) { tsTempSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024); uInfo("%s set to %"PRId64, name, tsTempSpace.reserved); + matched = true; } else if (strcasecmp("minimalDataDirGB", name) == 0) { tsDataSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024); uInfo("%s set to %"PRId64, name, tsDataSpace.reserved); + matched = true; } else if (strcasecmp("minimalLogDirGB", name) == 0) { tsLogSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024); uInfo("%s set to %"PRId64, name, tsLogSpace.reserved); + matched = true; } break; } @@ -1566,18 +1578,23 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port); cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype); uInfo("%s set to %s", name, tsSecond); + matched = true; } else if (strcasecmp("smlChildTableName", name) == 0) { uInfo("%s set from %s to %s", name, tsSmlChildTableName, pItem->str); tstrncpy(tsSmlChildTableName, pItem->str, TSDB_TABLE_NAME_LEN); + matched = true; } else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) { uInfo("%s set from %s to %s", name, tsSmlAutoChildTableNameDelimiter, pItem->str); tstrncpy(tsSmlAutoChildTableNameDelimiter, pItem->str, TSDB_TABLE_NAME_LEN); + matched = true; } else if (strcasecmp("smlTagName", name) == 0) { uInfo("%s set from %s to %s", name, tsSmlTagName, pItem->str); tstrncpy(tsSmlTagName, pItem->str, TSDB_COL_NAME_LEN); + matched = true; } else if (strcasecmp("smlTsDefaultName", name) == 0) { uInfo("%s set from %s to %s", name, tsSmlTsDefaultName, pItem->str); tstrncpy(tsSmlTsDefaultName, pItem->str, TSDB_COL_NAME_LEN); + matched = true; } else if (strcasecmp("serverPort", name) == 0) { tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; @@ -1592,11 +1609,13 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); + matched = true; } else if (strcasecmp("slowLogScope", name) == 0) { if (taosSetSlowLogScope(pItem->str)) { return -1; } uInfo("%s set to %s", name, pItem->str); + matched = true; } break; } @@ -1605,6 +1624,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { osSetTimezone(pItem->str); uInfo("%s set from %s to %s", name, tsTimezoneStr, pItem->str); cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype); + matched = true; } else if (strcasecmp("tempDir", name) == 0) { uInfo("%s set from %s to %s", name, tsTempDir, pItem->str); tstrncpy(tsTempDir, pItem->str, PATH_MAX); @@ -1613,9 +1633,11 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { uError("failed to create tempDir:%s since %s", tsTempDir, terrstr()); return -1; } + matched = true; } else if (strcasecmp("telemetryServer", name) == 0) { uInfo("%s set from %s to %s", name, pItem->str, tsTelemServer); tstrncpy(tsTelemServer, pItem->str, TSDB_FQDN_LEN); + matched = true; } break; } @@ -1624,6 +1646,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { break; } + if (matched) goto _out; + { // 'bool/int32_t/int64_t/float/double' variables with general modification function static OptionNameAndVar debugOptions[] = { {"cDebugFlag", &cDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"fsDebugFlag", &fsDebugFlag}, @@ -1664,6 +1688,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { } } +_out: return terrno == TSDB_CODE_SUCCESS ? 0 : -1; } diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index a701c88a24..227de7f5fc 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -194,6 +194,14 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) { return 0; } +int32_t offsetOfTimezone(char* tzStr, int64_t* offset) { + if (tzStr && (tzStr[0] == 'z' || tzStr[0] == 'Z')) { + *offset = 0; + return 0; + } + return parseTimezone(tzStr, offset); +} + /* * rfc3339 format: * 2013-04-12T15:52:01+08:00 diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7c3c54537d..6899a08602 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -678,7 +678,7 @@ _OVER: return -1; } -static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -690,7 +690,12 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { pReq->streamId = pTask->id.streamId; STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. + initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -706,7 +711,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPersistTaskDropReq(pTrans, pTask) < 0) { + if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) { return -1; } } @@ -1085,9 +1090,10 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream STransAction action = {0}; SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); - mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); @@ -1283,12 +1289,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (dropReq.igNotExists) { - mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name); + mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); sdbRelease(pMnode->pSdb, pStream); tFreeSMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + mError("stream:%s not exist failed to drop", dropReq.name); tFreeSMDropStreamReq(&dropReq); return -1; } @@ -1660,7 +1667,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndPauseStreamTask(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) { SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), @@ -1673,8 +1680,12 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1682,7 +1693,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { +int32_t mndPauseAllStreamTasks(SMnode* pMnode, STrans *pTrans, SStreamObj *pStream) { SArray *tasks = pStream->tasks; int32_t size = taosArrayGetSize(tasks); @@ -1691,7 +1702,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPauseStreamTask(pTrans, pTask) < 0) { + if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) { return -1; } @@ -1768,7 +1779,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } // pause all tasks - if (mndPauseAllStreamTasks(pTrans, pStream) < 0) { + if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1795,7 +1806,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { +static int32_t mndResumeStreamTask(STrans *pTrans, SMnode* pMnode, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1806,8 +1817,12 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig pReq->streamId = pTask->id.streamId; pReq->igUntreated = igUntreated; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1815,14 +1830,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig return 0; } -int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) { +int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode* pMnode, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { return -1; } @@ -1831,7 +1846,6 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn } } } - // pStream->pHTasksList is null return 0; } @@ -1884,7 +1898,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // resume all tasks - if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) { + if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2534,8 +2548,12 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); taosWUnLockLatch(&pStream->lock); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3ae0eb1ddf..2717f1b78c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1974,6 +1974,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } + streamMetaInitBackend(pMeta); + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { tqError("vgId:%d failed to load stream tasks", vgId); streamMetaWUnLock(pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 41392ba27b..7a8147f83b 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -169,10 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); + + streamMetaWLock(pWriter->pTq->pStreamMeta); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta); if (code == 0) { + streamMetaInitBackend(pWriter->pTq->pStreamMeta); code = streamStateLoadTasks(pWriter); } + + streamMetaWUnLock(pWriter->pTq->pStreamMeta); tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); taosMemoryFree(pWriter); return code; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index e578638e9d..7ec0490e3f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -144,7 +144,6 @@ int32_t tqRestartStreamTasks(STQ* pTq) { } streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d failed to reopen stream meta", vgId); @@ -153,6 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { return code; } + streamMetaInitBackend(pMeta); int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 22fb3b84ec..48b622e324 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -409,7 +409,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { extern int32_t tsS3UploadDelaySec; long s3Size(const char *object_name); int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - committer->ctx->skipTsRow = false; if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) { STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA]; if (fobj && fobj->f->did.level == nlevel - 1) { @@ -670,4 +669,4 @@ _exit: tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index bd73b63c91..66ee88d607 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -882,8 +882,38 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); if (numFile >= sttTrigger) { // launch merge - code = tsdbSchedMerge(fs->tsdb, fset->fid); - TSDB_CHECK_CODE(code, lino, _exit); + bool skipMerge = false; + { + int32_t now = taosGetTimestampSec(); + + extern int8_t tsS3Enabled; + extern int32_t tsS3UploadDelaySec; + long s3Size(const char *object_name); + int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs); + if (tsS3Enabled && nlevel > 1) { + STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA]; + if (fobj && fobj->f->did.level == nlevel - 1) { + // if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay + const char *object_name = taosDirEntryBaseName((char *)fobj->fname); + + if (taosCheckExistFile(fobj->fname)) { + int32_t mtime = 0; + taosStatFile(fobj->fname, NULL, &mtime, NULL); + if (mtime < now - tsS3UploadDelaySec) { + skipMerge = true; + } + } else if (s3Size(object_name) > 0) { + skipMerge = true; + } + } + // new fset can be written with ts data + } + } + + if (!skipMerge) { + code = tsdbSchedMerge(fs->tsdb, fset->fid); + TSDB_CHECK_CODE(code, lino, _exit); + } } if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index cc05b8ee18..8cd9304188 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -50,6 +50,7 @@ void remove_file(const char *fname, bool last_level) { long s3_size = tsS3Enabled ? s3Size(object_name) : 0; if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) { s3DeleteObjects(&object_name, 1); + tsdbInfo("file:%s is removed from s3", fname); } else { tsdbError("file:%s remove failed", fname); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index b61f17a52a..f6888ba9cb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -103,7 +103,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile char fname[TSDB_FILENAME_LEN]; TdFilePtr fdFrom = NULL; - TdFilePtr fdTo = NULL; + // TdFilePtr fdTo = NULL; tsdbTFileName(rtner->tsdb, to, fname); @@ -359,7 +359,11 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { s3EvictCache(fobj->fname, fsize * 2); } */ - tsdbInfo("file:%s size: %" PRId64 " do migrate", fobj->fname, fobj->f->size); + if (fobj->f->did.level > did.level) { + continue; + } + tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level, + did.level); code = tsdbDoMigrateFileObj(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ecf9953194..e6879345ec 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1005,7 +1005,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, size_t len = 0; char *keyJoined = taosStringBuilderGetResult(&sb, &len); - auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ + auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + } taosStringBuilderDestroy(&sb); } @@ -1227,7 +1229,9 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in size_t len = 0; char *keyJoined = taosStringBuilderGetResult(&sb, &len); - auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ + auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + } taosStringBuilderDestroy(&sb); } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index e58e6a8055..099870db5c 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -29,7 +29,7 @@ #define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) #define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define DEAULT_DELETE_MARK INT64_MAX #define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" #define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" #define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 4eda11a6a4..e71e18d37d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2784,11 +2784,44 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, pNew->hasGroup = pAgg->hasGroup; pNew->node.pChildren = nodesCloneList(pAgg->node.pChildren); - SNode* pNode = NULL; - FOREACH(pNode, pNew->node.pChildren) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { - OPTIMIZE_FLAG_CLEAR_MASK(((SScanLogicNode*)pNode)->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH); + int32_t code = 0; + SNode* pNode = nodesListGetNode(pNew->node.pChildren, 0); + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + SNodeList* pOldScanCols = NULL; + TSWAP(pScan->pScanCols, pOldScanCols); + nodesDestroyList(pScan->pScanPseudoCols); + pScan->pScanPseudoCols = NULL; + nodesDestroyList(pScan->node.pTargets); + pScan->node.pTargets = NULL; + SNodeListNode* list = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + list->pNodeList = pFunc; + code = nodesCollectColumnsFromNode((SNode*)list, NULL, COLLECT_COL_TYPE_COL, &pScan->pScanCols); + if (TSDB_CODE_SUCCESS != code) { + return code; } + nodesFree(list); + bool found = false; + FOREACH(pNode, pScan->pScanCols) { + if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) { + found = true; + break; + } + } + if (!found) { + FOREACH(pNode, pOldScanCols) { + if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) { + nodesListMakeStrictAppend(&pScan->pScanCols, nodesCloneNode(pNode)); + break; + } + } + } + nodesDestroyList(pOldScanCols); + code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + OPTIMIZE_FLAG_CLEAR_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH); } *pNewAgg = pNew; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e7c6297f44..6144ebd340 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1083,6 +1083,15 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); } + // trans current timezone's unix ts to dest timezone + // offset = delta from dest timezone to zero + // delta from zero to current timezone = 3600 * (cur)tsTimezone + int64_t offset = 0; + if (0 != offsetOfTimezone(tz, &offset)) { + goto _end; + } + timeVal -= offset + 3600 * ((int64_t)tsTimezone); + struct tm tmInfo; int32_t len = 0; diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index 9d0ad30e2a..d37393137f 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -68,7 +68,7 @@ int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SS SSchJob* pJob = schAcquireJob(jobId); if (NULL == pJob) { qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_SCH_JOB_NOT_EXISTS); } *job = pJob; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7013b43a6f..228da65021 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -262,18 +262,33 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - // todo: not wait in a critical region - while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) { - stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); - taosMsleep(100); + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); + + return 0; +} + +// todo refactor: the lock shoud be restricted in one function +void streamMetaInitBackend(SStreamMeta* pMeta) { + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); + if (pMeta->streamBackend == NULL) { + streamMetaWUnLock(pMeta); + + while (1) { + streamMetaWLock(pMeta); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); + if (pMeta->streamBackend != NULL) { + break; + } + + streamMetaWUnLock(pMeta); + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); + taosMsleep(100); + } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); streamBackendLoadCheckpointInfo(pMeta); - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - return 0; } void streamMetaClear(SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 103fd9e087..5893bc14f1 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -195,7 +195,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk } } if (qDebugFlag & DEBUG_TRACE) { - char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 16); + char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 64); sprintf(buf, "[current: %s,", pFile->pCurrent); sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest); sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions); @@ -483,7 +483,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { SStreamSnapHandle* handle = &pWriter->handle; if (qDebugFlag & DEBUG_TRACE) { - char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 16); + char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 64); int n = sprintf(buf, "["); for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { SBackendFileItem* item = taosArrayGet(handle->pFileList, i); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index dc89a24180..06b82f3ba1 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -52,6 +52,7 @@ #include "lz4.h" #include "tRealloc.h" #include "tlog.h" +#include "ttypes.h" #ifdef TD_TSZ #include "td_sz.h" @@ -62,8 +63,6 @@ static const int32_t TEST_NUMBER = 1; #define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL) #define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a))) -#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode -#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode #ifdef TD_TSZ bool lossyFloat = false; @@ -99,24 +98,7 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15}; // get the byte limit. - int32_t word_length = 0; - switch (type) { - case TSDB_DATA_TYPE_BIGINT: - word_length = LONG_BYTES; - break; - case TSDB_DATA_TYPE_INT: - word_length = INT_BYTES; - break; - case TSDB_DATA_TYPE_SMALLINT: - word_length = SHORT_BYTES; - break; - case TSDB_DATA_TYPE_TINYINT: - word_length = CHAR_BYTES; - break; - default: - uError("Invalid compress integer type:%d", type); - return -1; - } + int32_t word_length = getWordLength(type); int32_t byte_limit = nelements * word_length + 1; int32_t opos = 1; @@ -221,24 +203,9 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char } int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) { - - int32_t word_length = 0; - switch (type) { - case TSDB_DATA_TYPE_BIGINT: - word_length = LONG_BYTES; - break; - case TSDB_DATA_TYPE_INT: - word_length = INT_BYTES; - break; - case TSDB_DATA_TYPE_SMALLINT: - word_length = SHORT_BYTES; - break; - case TSDB_DATA_TYPE_TINYINT: - word_length = CHAR_BYTES; - break; - default: - uError("Invalid decompress integer type:%d", type); - return -1; + int32_t word_length = getWordLength(type); + if (word_length == -1) { + return word_length; } // If not compressed. @@ -247,8 +214,11 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha return nelements * word_length; } - // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 - // 12 13 14 15 +#if __AVX2__ + tsDecompressIntImpl_Hw(input, nelements, output, type); + return nelements * word_length; +#else + // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; @@ -257,185 +227,6 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha int32_t _pos = 0; int64_t prev_value = 0; -#if __AVX2__ - while (1) { - if (_pos == nelements) break; - - uint64_t w = 0; - memcpy(&w, ip, LONG_BYTES); - - char selector = (char)(w & INT64MASK(4)); // selector = 4 - char bit = bit_per_integer[(int32_t)selector]; // bit = 3 - int32_t elems = selector_to_elems[(int32_t)selector]; - - // Optimize the performance, by remove the constantly switch operation. - int32_t v = 4; - uint64_t zigzag_value = 0; - uint64_t mask = INT64MASK(bit); - - switch (type) { - case TSDB_DATA_TYPE_BIGINT: { - int64_t* p = (int64_t*) output; - - int32_t gRemainder = (nelements - _pos); - int32_t num = (gRemainder > elems)? elems:gRemainder; - - int32_t batch = num >> 2; - int32_t remain = num & 0x03; - if (selector == 0 || selector == 1) { - if (tsAVX2Enable && tsSIMDEnable) { - for (int32_t i = 0; i < batch; ++i) { - __m256i prev = _mm256_set1_epi64x(prev_value); - _mm256_storeu_si256((__m256i *)&p[_pos], prev); - _pos += 4; - } - - for (int32_t i = 0; i < remain; ++i) { - p[_pos++] = prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = prev_value; - v += bit; - } - } - } else { - if (tsAVX2Enable && tsSIMDEnable) { - __m256i base = _mm256_set1_epi64x(w); - __m256i maskVal = _mm256_set1_epi64x(mask); - - __m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4); - __m256i inc = _mm256_set1_epi64x(bit << 2); - - for (int32_t i = 0; i < batch; ++i) { - __m256i after = _mm256_srlv_epi64(base, shiftBits); - __m256i zigzagVal = _mm256_and_si256(after, maskVal); - - // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) - __m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal); - signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask); - // get the four zigzag values here - __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); - - // calculate the cumulative sum (prefix sum) for each number - // decode[0] = prev_value + final[0] - // decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1] - // decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2] - // decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3] - - // 1, 2, 3, 4 - //+ 0, 1, 0, 3 - // 1, 3, 3, 7 - // shift and add for the first round - __m128i prev = _mm_set1_epi64x(prev_value); - __m256i x = _mm256_slli_si256(delta, 8); - - delta = _mm256_add_epi64(delta, x); - _mm256_storeu_si256((__m256i *)&p[_pos], delta); - - // 1, 3, 3, 7 - //+ 0, 0, 3, 3 - // 1, 3, 6, 10 - // shift and add operation for the second round - __m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]); - __m128i secondItem = _mm_set1_epi64x(p[_pos + 1]); - __m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem); - firstPart = _mm_add_epi64(firstPart, prev); - secPart = _mm_add_epi64(secPart, prev); - - // save it in the memory - _mm_storeu_si128((__m128i *)&p[_pos], firstPart); - _mm_storeu_si128((__m128i *)&p[_pos + 2], secPart); - - shiftBits = _mm256_add_epi64(shiftBits, inc); - prev_value = p[_pos + 3]; -// uDebug("_pos:%d %"PRId64", %"PRId64", %"PRId64", %"PRId64, _pos, p[_pos], p[_pos+1], p[_pos+2], p[_pos+3]); - _pos += 4; - } - - // handle the remain value - for (int32_t i = 0; i < remain; i++) { - zigzag_value = ((w >> (v + (batch * bit * 4))) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = prev_value; -// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]); - - v += bit; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = prev_value; -// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]); - - v += bit; - } - } - } - } break; - case TSDB_DATA_TYPE_INT: { - int32_t* p = (int32_t*) output; - - if (selector == 0 || selector == 1) { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int32_t)prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = (int32_t)prev_value; - v += bit; - } - } - } break; - case TSDB_DATA_TYPE_SMALLINT: { - int16_t* p = (int16_t*) output; - - if (selector == 0 || selector == 1) { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int16_t)prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = (int16_t)prev_value; - v += bit; - } - } - } break; - - case TSDB_DATA_TYPE_TINYINT: { - int8_t *p = (int8_t *)output; - - if (selector == 0 || selector == 1) { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - p[_pos++] = (int8_t)prev_value; - } - } else { - for (int32_t i = 0; i < elems && count < nelements; i++, count++) { - zigzag_value = ((w >> v) & mask); - prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); - - p[_pos++] = (int8_t)prev_value; - v += bit; - } - } - } break; - } - - ip += LONG_BYTES; - } - - return nelements * word_length; -#else - while (1) { if (count == nelements) break; @@ -644,6 +435,8 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c // TODO: Take care here, we assumes little endian encoding. int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) { int32_t _pos = 1; + int32_t longBytes = LONG_BYTES; + ASSERTS(nelements >= 0, "nelements is negative"); if (nelements == 0) return 0; @@ -684,25 +477,25 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, } flags = flag1 | (flag2 << 4); // Encode the flag. - if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, &flags, CHAR_BYTES); _pos += CHAR_BYTES; /* Here, we assume it is little endian encoding method. */ // Encode dd1 if (is_bigendian()) { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; - memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1); + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; + memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1); } else { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, (char *)(&dd1), flag1); } _pos += flag1; // Encode dd2; if (is_bigendian()) { - if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over; - memcpy(output + _pos, (char *)(&dd2) + LONG_BYTES - flag2, flag2); + if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over; + memcpy(output + _pos, (char *)(&dd2) + longBytes - flag2, flag2); } else { - if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, (char *)(&dd2), flag2); } _pos += flag2; @@ -715,15 +508,15 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, flag2 = 0; flags = flag1 | (flag2 << 4); // Encode the flag. - if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, &flags, CHAR_BYTES); _pos += CHAR_BYTES; // Encode dd1; if (is_bigendian()) { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; - memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1); + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; + memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1); } else { - if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over; + if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over; memcpy(output + _pos, (char *)(&dd1), flag1); } _pos += flag1; @@ -734,17 +527,19 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, _exit_over: output[0] = 0; // Means the string is not compressed - memcpy(output + 1, input, nelements * LONG_BYTES); - return nelements * LONG_BYTES + 1; + memcpy(output + 1, input, nelements * longBytes); + return nelements * longBytes + 1; } int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) { + int64_t longBytes = LONG_BYTES; + ASSERTS(nelements >= 0, "nelements is negative"); if (nelements == 0) return 0; if (input[0] == 0) { - memcpy(output, input + 1, nelements * LONG_BYTES); - return nelements * LONG_BYTES; + memcpy(output, input + 1, nelements * longBytes); + return nelements * longBytes; } else if (input[0] == 1) { // Decompress int64_t *ostream = (int64_t *)output; @@ -763,7 +558,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement delta_of_delta = 0; } else { if (is_bigendian()) { - memcpy(((char *)(&dd1)) + LONG_BYTES - nbytes, input + ipos, nbytes); + memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); } else { memcpy(&dd1, input + ipos, nbytes); } @@ -779,7 +574,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement prev_value = prev_value + prev_delta; ostream[opos++] = prev_value; } - if (opos == nelements) return nelements * LONG_BYTES; + if (opos == nelements) return nelements * longBytes; // Decode dd2 uint64_t dd2 = 0; @@ -788,7 +583,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement delta_of_delta = 0; } else { if (is_bigendian()) { - memcpy(((char *)(&dd2)) + LONG_BYTES - nbytes, input + ipos, nbytes); + memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); } else { memcpy(&dd2, input + ipos, nbytes); } @@ -799,7 +594,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement prev_delta = delta_of_delta + prev_delta; prev_value = prev_value + prev_delta; ostream[opos++] = prev_value; - if (opos == nelements) return nelements * LONG_BYTES; + if (opos == nelements) return nelements * longBytes; } } else { @@ -807,11 +602,13 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement return -1; } } -/* --------------------------------------------Double Compression - * ---------------------------------------------- */ + +/* --------------------------------------------Double Compression ---------------------------------------------- */ void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) { + int32_t longBytes = LONG_BYTES; + uint8_t nbytes = (flag & INT8MASK(3)) + 1; - int32_t nshift = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); + int32_t nshift = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); diff >>= nshift; while (nbytes) { @@ -906,12 +703,14 @@ int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, ch } FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) { + int32_t longBytes = LONG_BYTES; + uint64_t diff = 0ul; int32_t nbytes = (flag & 0x7) + 1; for (int32_t i = 0; i < nbytes; i++) { diff |= (((uint64_t)0xff & input[(*ipos)++]) << BITS_PER_BYTE * i); } - int32_t shift_width = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); + int32_t shift_width = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3); diff <<= shift_width; return diff; @@ -1061,14 +860,7 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t return diff; } -int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) { - float *ostream = (float *)output; - - if (input[0] == 1) { - memcpy(output, input + 1, nelements * FLOAT_BYTES); - return nelements * FLOAT_BYTES; - } - +static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float* ostream) { uint8_t flags = 0; int32_t ipos = 1; int32_t opos = 0; @@ -1094,6 +886,21 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c ostream[opos++] = curr.real; } +} + +int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) { + if (input[0] == 1) { + memcpy(output, input + 1, nelements * FLOAT_BYTES); + return nelements * FLOAT_BYTES; + } + + if (tsSIMDEnable && tsAVX2Enable) { + tsDecompressFloatImplAvx2(input, nelements, output); + } else if (tsSIMDEnable && tsAVX512Enable) { + tsDecompressFloatImplAvx512(input, nelements, output); + } else { // alternative implementation without SIMD instructions. + tsDecompressFloatHelper(input, nelements, (float*)output); + } return nelements * FLOAT_BYTES; } diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c new file mode 100644 index 0000000000..f32a4014d6 --- /dev/null +++ b/source/util/src/tdecompress.c @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "ttypes.h" +#include "tcompression.h" + +int32_t getWordLength(char type) { + int32_t wordLength = 0; + switch (type) { + case TSDB_DATA_TYPE_BIGINT: + wordLength = LONG_BYTES; + break; + case TSDB_DATA_TYPE_INT: + wordLength = INT_BYTES; + break; + case TSDB_DATA_TYPE_SMALLINT: + wordLength = SHORT_BYTES; + break; + case TSDB_DATA_TYPE_TINYINT: + wordLength = CHAR_BYTES; + break; + default: + uError("Invalid decompress integer type:%d", type); + return -1; + } + + return wordLength; +} + +int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) { + int32_t word_length = getWordLength(type); + + // Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60}; + int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1}; + + const char *ip = input + 1; + int32_t count = 0; + int32_t _pos = 0; + int64_t prev_value = 0; + +#if __AVX2__ + while (1) { + if (_pos == nelements) break; + + uint64_t w = 0; + memcpy(&w, ip, LONG_BYTES); + + char selector = (char)(w & INT64MASK(4)); // selector = 4 + char bit = bit_per_integer[(int32_t)selector]; // bit = 3 + int32_t elems = selector_to_elems[(int32_t)selector]; + + // Optimize the performance, by remove the constantly switch operation. + int32_t v = 4; + uint64_t zigzag_value = 0; + uint64_t mask = INT64MASK(bit); + + switch (type) { + case TSDB_DATA_TYPE_BIGINT: { + int64_t* p = (int64_t*) output; + + int32_t gRemainder = (nelements - _pos); + int32_t num = (gRemainder > elems)? elems:gRemainder; + + int32_t batch = num >> 2; + int32_t remain = num & 0x03; + if (selector == 0 || selector == 1) { + if (tsSIMDEnable && tsAVX2Enable) { + for (int32_t i = 0; i < batch; ++i) { + __m256i prev = _mm256_set1_epi64x(prev_value); + _mm256_storeu_si256((__m256i *)&p[_pos], prev); + _pos += 4; + } + + for (int32_t i = 0; i < remain; ++i) { + p[_pos++] = prev_value; + } + } else if (tsSIMDEnable && tsAVX512Enable) { +#if __AVX512F__ + // todo add avx512 impl +#endif + } else { // alternative implementation without SIMD instructions. + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = prev_value; + v += bit; + } + } + } else { + if (tsSIMDEnable && tsAVX2Enable) { + __m256i base = _mm256_set1_epi64x(w); + __m256i maskVal = _mm256_set1_epi64x(mask); + + __m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4); + __m256i inc = _mm256_set1_epi64x(bit << 2); + + for (int32_t i = 0; i < batch; ++i) { + __m256i after = _mm256_srlv_epi64(base, shiftBits); + __m256i zigzagVal = _mm256_and_si256(after, maskVal); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal); + signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask); + + // get the four zigzag values here + __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); + + // calculate the cumulative sum (prefix sum) for each number + // decode[0] = prev_value + final[0] + // decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1] + // decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2] + // decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3] + + // 1, 2, 3, 4 + //+ 0, 1, 0, 3 + // 1, 3, 3, 7 + // shift and add for the first round + __m128i prev = _mm_set1_epi64x(prev_value); + __m256i x = _mm256_slli_si256(delta, 8); + + delta = _mm256_add_epi64(delta, x); + _mm256_storeu_si256((__m256i *)&p[_pos], delta); + + // 1, 3, 3, 7 + //+ 0, 0, 3, 3 + // 1, 3, 6, 10 + // shift and add operation for the second round + __m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]); + __m128i secondItem = _mm_set1_epi64x(p[_pos + 1]); + __m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem); + firstPart = _mm_add_epi64(firstPart, prev); + secPart = _mm_add_epi64(secPart, prev); + + // save it in the memory + _mm_storeu_si128((__m128i *)&p[_pos], firstPart); + _mm_storeu_si128((__m128i *)&p[_pos + 2], secPart); + + shiftBits = _mm256_add_epi64(shiftBits, inc); + prev_value = p[_pos + 3]; + _pos += 4; + } + + // handle the remain value + for (int32_t i = 0; i < remain; i++) { + zigzag_value = ((w >> (v + (batch * bit * 4))) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = prev_value; + v += bit; + } + } else if (tsSIMDEnable && tsAVX512Enable) { +#if __AVX512F__ + // todo add avx512 impl +#endif + } else { // alternative implementation without SIMD instructions. + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = prev_value; + v += bit; + } + } + } + } break; + case TSDB_DATA_TYPE_INT: { + int32_t* p = (int32_t*) output; + + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = (int32_t)prev_value; + } + } else { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = (int32_t)prev_value; + v += bit; + } + } + } break; + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* p = (int16_t*) output; + + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = (int16_t)prev_value; + } + } else { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = (int16_t)prev_value; + v += bit; + } + } + } break; + + case TSDB_DATA_TYPE_TINYINT: { + int8_t *p = (int8_t *)output; + + if (selector == 0 || selector == 1) { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + p[_pos++] = (int8_t)prev_value; + } + } else { + for (int32_t i = 0; i < elems && count < nelements; i++, count++) { + zigzag_value = ((w >> v) & mask); + prev_value += ZIGZAG_DECODE(int64_t, zigzag_value); + + p[_pos++] = (int8_t)prev_value; + v += bit; + } + } + } break; + } + + ip += LONG_BYTES; + } + +#endif + return nelements * word_length; +} + +int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output) { +#if __AVX512F__ + // todo add it +#endif + return 0; +} + +// todo add later +int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) { +#if __AVX2__ +#endif + return 0; +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b4448c36c2..21022f2016 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -504,6 +504,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status erro TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout") TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_IS_DROPPING, "Job is dropping") +TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_NOT_EXISTS, "Job no longer exist") // parser TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYNTAX_ERROR, "syntax error near") diff --git a/source/util/src/tunit.c b/source/util/src/tunit.c index b0630ec8ca..d3447294ea 100644 --- a/source/util/src/tunit.c +++ b/source/util/src/tunit.c @@ -35,27 +35,27 @@ int64_t taosStrHumanToInt64(const char* str) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_PEBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_PEBIBYTE; } else if ((unit == 'T') || (unit == 't')) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_TEBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_TEBIBYTE; } else if ((unit == 'G') || (unit == 'g')) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_GIBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE; } else if ((unit == 'M') || (unit == 'm')) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_MEBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE; } else if ((unit == 'K') || (unit == 'k')) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_KIBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE; } else { val = atoll(str); } @@ -93,17 +93,17 @@ int32_t taosStrHumanToInt32(const char* str) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_GIBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE; } else if ((unit == 'M') || (unit == 'm')) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_MEBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE; } else if ((unit == 'K') || (unit == 'k')) { strNoUnit = taosMemoryCalloc(sLen, 1); memcpy(strNoUnit, str, sLen - 1); - val = atoll(strNoUnit) * UNIT_ONE_KIBIBYTE; + val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE; } else { val = atoll(str); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6a7c0b47ec..4540d28074 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -120,6 +120,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/slimit.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/slimit.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/slimit.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index 92aacbb350..160473ffce 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -30,17 +30,16 @@ class TDTestCase: tdSql.query(f'select to_iso8601(ts) from {self.ntbname}') for i in range(self.rowNum): tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}') - timezone_list = ['+0000','+0100','+0200','+0300','+0330','+0400','+0500','+0530','+0600','+0700','+0800','+0900','+1000','+1100','+1200',\ - '+00','+01','+02','+03','+04','+05','+06','+07','+08','+09','+10','+11','+12',\ - '+00:00','+01:00','+02:00','+03:00','+03:30','+04:00','+05:00','+05:30','+06:00','+07:00','+08:00','+09:00','+10:00','+11:00','+12:00',\ - '-0000','-0100','-0200','-0300','-0400','-0500','-0600','-0700','-0800','-0900','-1000','-1100','-1200',\ - '-00','-01','-02','-03','-04','-05','-06','-07','-08','-09','-10','-11','-12',\ - '-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00',\ - 'z','Z'] - for j in timezone_list: - tdSql.query(f'select to_iso8601(ts,"{j}") from {self.ntbname}') - for i in range(self.rowNum): - tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{j}') + + tz_list = ['+0000','+0530', '+00', '+06', '+00:00', '+12:00', '-0000', '-0900', '-00', '-05', '-00:00', '-03:00','z', 'Z'] + res_list = ['2021-12-31T16:00:00.000+0000', '2021-12-31T21:30:00.000+0530', '2021-12-31T16:00:00.000+00', '2021-12-31T22:00:00.000+06',\ + '2021-12-31T16:00:00.000+00:00', '2022-01-01T04:00:00.000+12:00','2021-12-31T16:00:00.000-0000','2021-12-31T07:00:00.000-0900',\ + '2021-12-31T16:00:00.000-00', '2021-12-31T11:00:00.000-05','2021-12-31T16:00:00.000-00:00','2021-12-31T13:00:00.000-03:00',\ + '2021-12-31T16:00:00.000z', '2021-12-31T16:00:00.000Z'] + for i in range(len(tz_list)): + tdSql.query(f'select to_iso8601(ts,"{tz_list[i]}") from {self.ntbname} where c1==1') + tdSql.checkEqual(tdSql.queryResult[0][0],res_list[i]) + error_param_list = [0,100.5,'a','!'] for i in error_param_list: tdSql.error(f'select to_iso8601(ts,"{i}") from {self.ntbname}') diff --git a/tests/system-test/2-query/ts-4233.py b/tests/system-test/2-query/ts-4233.py new file mode 100644 index 0000000000..9b0a2f175c --- /dev/null +++ b/tests/system-test/2-query/ts-4233.py @@ -0,0 +1,46 @@ + +import taos + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def checksql(self, sql): + result = os.popen("taos -s '%s'" %sql) + res = result.read() + print(res) + if ("Query OK" in res): + tdLog.info(f"checkEqual success") + else : + tdLog.exit(f"checkEqual error") + + def check(self): + conn = taos.connect() + sql = "select 'a;b' as x" + tdSql.query(f"%s" %sql) + tdSql.checkRows(1) + + self.checksql('select "a;b" as x\G') + self.checksql('select "a;b" as x >> /tmp/res.txt') + return + + def run(self): + tdSql.prepare() + self.check() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 3b150230e7..115abdcd36 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -62,6 +62,8 @@ static void shellCleanup(void *arg); static void *shellCancelHandler(void *arg); static void *shellThreadLoop(void *arg); +static bool shellCmdkilled = false; + bool shellIsEmptyCommand(const char *cmd) { for (char c = *cmd++; c != 0; c = *cmd++) { if (c != ' ' && c != '\t' && c != ';') { @@ -72,6 +74,8 @@ bool shellIsEmptyCommand(const char *cmd) { } int32_t shellRunSingleCommand(char *command) { + shellCmdkilled = false; + if (shellIsEmptyCommand(command)) { return 0; } @@ -199,22 +203,17 @@ void shellRunSingleCommandImp(char *command) { bool printMode = false; if ((sptr = strstr(command, ">>")) != NULL) { - cptr = strstr(command, ";"); - if (cptr != NULL) { - *cptr = '\0'; - } - fname = sptr + 2; while (*fname == ' ') fname++; *sptr = '\0'; - } - if ((sptr = strstr(command, "\\G")) != NULL) { - cptr = strstr(command, ";"); + cptr = strstr(fname, ";"); if (cptr != NULL) { *cptr = '\0'; } + } + if ((sptr = strstr(command, "\\G")) != NULL) { *sptr = '\0'; printMode = true; // When output to a file, the switch does not work. } @@ -262,7 +261,8 @@ void shellRunSingleCommandImp(char *command) { if (error_no == 0) { printf("Query OK, %"PRId64 " row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6); } else { - printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); + terrno = error_no; + printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(NULL), numOfRows, (et - st) / 1E6); } taos_free_result(pSql); } else { @@ -957,7 +957,11 @@ void shellDumpResultCallback(void *param, TAOS_RES *tres, int num_of_rows) { } } dump_info->numOfAllRows += num_of_rows; - taos_fetch_rows_a(tres, shellDumpResultCallback, param); + if (!shellCmdkilled) { + taos_fetch_rows_a(tres, shellDumpResultCallback, param); + } else { + tsem_post(&dump_info->sem); + } } else { if (num_of_rows < 0) { printf("\033[31masync retrieve failed, code: %d\033[0m\n", num_of_rows); @@ -972,13 +976,15 @@ int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool ver num_of_rows = shellDumpResultToFile(fname, tres); } else { tsDumpInfo dump_info; - init_dump_info(&dump_info, tres, sql, vertical); - taos_fetch_rows_a(tres, shellDumpResultCallback, &dump_info); - tsem_wait(&dump_info.sem); - num_of_rows = dump_info.numOfAllRows; + if (!shellCmdkilled) { + init_dump_info(&dump_info, tres, sql, vertical); + taos_fetch_rows_a(tres, shellDumpResultCallback, &dump_info); + tsem_wait(&dump_info.sem); + num_of_rows = dump_info.numOfAllRows; + } } - *error_no = taos_errno(tres); + *error_no = shellCmdkilled ? TSDB_CODE_TSC_QUERY_KILLED : taos_errno(tres); return num_of_rows; } @@ -1202,6 +1208,7 @@ void *shellCancelHandler(void *arg) { } else { #endif if (shell.conn) { + shellCmdkilled = true; taos_kill_query(shell.conn); } #ifdef WEBSOCKET diff --git a/tools/shell/src/shellWebsocket.c b/tools/shell/src/shellWebsocket.c index e83ceff099..fceec37a64 100644 --- a/tools/shell/src/shellWebsocket.c +++ b/tools/shell/src/shellWebsocket.c @@ -236,22 +236,17 @@ void shellRunSingleCommandWebsocketImp(char *command) { bool printMode = false; if ((sptr = strstr(command, ">>")) != NULL) { - cptr = strstr(command, ";"); - if (cptr != NULL) { - *cptr = '\0'; - } - fname = sptr + 2; while (*fname == ' ') fname++; *sptr = '\0'; - } - if ((sptr = strstr(command, "\\G")) != NULL) { - cptr = strstr(command, ";"); + cptr = strstr(fname, ";"); if (cptr != NULL) { *cptr = '\0'; } + } + if ((sptr = strstr(command, "\\G")) != NULL) { *sptr = '\0'; printMode = true; // When output to a file, the switch does not work. }