Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4229
This commit is contained in:
commit
082ddbddf6
|
@ -170,7 +170,7 @@ ELSE ()
|
|||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
|
||||
ENDIF()
|
||||
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2/AVX512) is ACTIVATED")
|
||||
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED")
|
||||
|
||||
IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI)
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi")
|
||||
|
|
|
@ -118,6 +118,13 @@ int32_t taosChar2Ts(const char* format, SArray** formats, const char* tsStr, int
|
|||
void TEST_ts2char(const char* format, int64_t ts, int32_t precision, char* out, int32_t outLen);
|
||||
int32_t TEST_char2ts(const char* format, int64_t* ts, int32_t precision, const char* tsStr);
|
||||
|
||||
/// @brief get offset seconds from zero timezone to input timezone
|
||||
/// for +XX timezone, the offset to zero is negative value
|
||||
/// @param tzStr timezonestr, eg: +0800, -0830, -08
|
||||
/// @param offset seconds, eg: +08 offset -28800, -01 offset 3600
|
||||
/// @return 0 success, other fail
|
||||
int32_t offsetOfTimezone(char* tzStr, int64_t* offset);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -825,6 +825,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
|||
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaReopen(SStreamMeta* pMeta);
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||
|
|
|
@ -640,6 +640,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
|
||||
#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
|
||||
#define TSDB_CODE_SCH_JOB_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x2505)
|
||||
#define TSDB_CODE_SCH_JOB_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x2506)
|
||||
|
||||
//parser
|
||||
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)
|
||||
|
|
|
@ -30,6 +30,10 @@ extern "C" {
|
|||
#define INT64MASK(_x) ((((uint64_t)1) << _x) - 1)
|
||||
#define INT32MASK(_x) (((uint32_t)1 << _x) - 1)
|
||||
#define INT8MASK(_x) (((uint8_t)1 << _x) - 1)
|
||||
|
||||
#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode
|
||||
#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode
|
||||
|
||||
// Compression algorithm
|
||||
#define NO_COMPRESSION 0
|
||||
#define ONE_STAGE_COMP 1
|
||||
|
@ -129,6 +133,12 @@ int32_t tsCompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32
|
|||
int32_t nBuf);
|
||||
int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
|
||||
int32_t nBuf);
|
||||
// for internal usage
|
||||
int32_t getWordLength(char type);
|
||||
|
||||
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
|
||||
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output);
|
||||
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output);
|
||||
|
||||
/*************************************************************************
|
||||
* STREAM COMPRESSION
|
||||
|
|
|
@ -34,7 +34,6 @@ extern "C" {
|
|||
// Bytes for each type.
|
||||
extern const int32_t TYPE_BYTES[21];
|
||||
|
||||
// TODO: replace and remove code below
|
||||
#define CHAR_BYTES sizeof(char)
|
||||
#define SHORT_BYTES sizeof(int16_t)
|
||||
#define INT_BYTES sizeof(int32_t)
|
||||
|
|
|
@ -520,7 +520,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
|||
|
||||
// div round up
|
||||
int seq;
|
||||
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 8;
|
||||
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 7;
|
||||
int totalSeq = ((contentLength + chunk_size - 1) / chunk_size);
|
||||
|
||||
MultipartPartData partData;
|
||||
|
@ -717,6 +717,7 @@ static SArray *getListByPrefix(const char *prefix) {
|
|||
return data.objectArray;
|
||||
}
|
||||
} else {
|
||||
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
|
||||
s3PrintError(__func__, data.status, data.err_msg);
|
||||
}
|
||||
return NULL;
|
||||
|
|
|
@ -1478,6 +1478,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
bool matched = false;
|
||||
|
||||
int32_t len = strlen(name);
|
||||
char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0};
|
||||
strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len));
|
||||
|
@ -1486,6 +1488,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
if (strcasecmp("debugFlag", name) == 0) {
|
||||
int32_t flag = pItem->i32;
|
||||
taosSetAllDebugFlag(flag, true);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1494,6 +1497,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
bool enableCore = pItem->bval;
|
||||
taosSetCoreDump(enableCore);
|
||||
uInfo("%s set to %d", name, enableCore);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1512,6 +1516,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
||||
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
||||
matched = true;
|
||||
} else if (strcasecmp("firstEp", name) == 0) {
|
||||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||
|
@ -1526,6 +1531,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
||||
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1536,10 +1542,12 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
taosSetSystemLocale(locale, charset);
|
||||
osSetSystemLocale(locale, charset);
|
||||
uInfo("locale set to '%s', charset set to '%s'", locale, charset);
|
||||
matched = true;
|
||||
} else if (strcasecmp("logDir", name) == 0) {
|
||||
uInfo("%s set from '%s' to '%s'", name, tsLogDir, pItem->str);
|
||||
tstrncpy(tsLogDir, pItem->str, PATH_MAX);
|
||||
taosExpandDir(tsLogDir, tsLogDir, PATH_MAX);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1547,15 +1555,19 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
if (strcasecmp("metaCacheMaxSize", name) == 0) {
|
||||
atomic_store_32(&tsMetaCacheMaxSize, pItem->i32);
|
||||
uInfo("%s set to %d", name, atomic_load_32(&tsMetaCacheMaxSize));
|
||||
matched = true;
|
||||
} else if (strcasecmp("minimalTmpDirGB", name) == 0) {
|
||||
tsTempSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024);
|
||||
uInfo("%s set to %"PRId64, name, tsTempSpace.reserved);
|
||||
matched = true;
|
||||
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
|
||||
tsDataSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024);
|
||||
uInfo("%s set to %"PRId64, name, tsDataSpace.reserved);
|
||||
matched = true;
|
||||
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
|
||||
tsLogSpace.reserved = (int64_t)(((double)pItem->fval) * 1024 * 1024 * 1024);
|
||||
uInfo("%s set to %"PRId64, name, tsLogSpace.reserved);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1566,18 +1578,23 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
||||
cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype);
|
||||
uInfo("%s set to %s", name, tsSecond);
|
||||
matched = true;
|
||||
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
||||
uInfo("%s set from %s to %s", name, tsSmlChildTableName, pItem->str);
|
||||
tstrncpy(tsSmlChildTableName, pItem->str, TSDB_TABLE_NAME_LEN);
|
||||
matched = true;
|
||||
} else if (strcasecmp("smlAutoChildTableNameDelimiter", name) == 0) {
|
||||
uInfo("%s set from %s to %s", name, tsSmlAutoChildTableNameDelimiter, pItem->str);
|
||||
tstrncpy(tsSmlAutoChildTableNameDelimiter, pItem->str, TSDB_TABLE_NAME_LEN);
|
||||
matched = true;
|
||||
} else if (strcasecmp("smlTagName", name) == 0) {
|
||||
uInfo("%s set from %s to %s", name, tsSmlTagName, pItem->str);
|
||||
tstrncpy(tsSmlTagName, pItem->str, TSDB_COL_NAME_LEN);
|
||||
matched = true;
|
||||
} else if (strcasecmp("smlTsDefaultName", name) == 0) {
|
||||
uInfo("%s set from %s to %s", name, tsSmlTsDefaultName, pItem->str);
|
||||
tstrncpy(tsSmlTsDefaultName, pItem->str, TSDB_COL_NAME_LEN);
|
||||
matched = true;
|
||||
} else if (strcasecmp("serverPort", name) == 0) {
|
||||
tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN);
|
||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||
|
@ -1592,11 +1609,13 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
||||
uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst);
|
||||
matched = true;
|
||||
} else if (strcasecmp("slowLogScope", name) == 0) {
|
||||
if (taosSetSlowLogScope(pItem->str)) {
|
||||
return -1;
|
||||
}
|
||||
uInfo("%s set to %s", name, pItem->str);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1605,6 +1624,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
osSetTimezone(pItem->str);
|
||||
uInfo("%s set from %s to %s", name, tsTimezoneStr, pItem->str);
|
||||
cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype);
|
||||
matched = true;
|
||||
} else if (strcasecmp("tempDir", name) == 0) {
|
||||
uInfo("%s set from %s to %s", name, tsTempDir, pItem->str);
|
||||
tstrncpy(tsTempDir, pItem->str, PATH_MAX);
|
||||
|
@ -1613,9 +1633,11 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
|
||||
return -1;
|
||||
}
|
||||
matched = true;
|
||||
} else if (strcasecmp("telemetryServer", name) == 0) {
|
||||
uInfo("%s set from %s to %s", name, pItem->str, tsTelemServer);
|
||||
tstrncpy(tsTelemServer, pItem->str, TSDB_FQDN_LEN);
|
||||
matched = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1624,6 +1646,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
break;
|
||||
}
|
||||
|
||||
if (matched) goto _out;
|
||||
|
||||
{ // 'bool/int32_t/int64_t/float/double' variables with general modification function
|
||||
static OptionNameAndVar debugOptions[] = {
|
||||
{"cDebugFlag", &cDebugFlag}, {"dDebugFlag", &dDebugFlag}, {"fsDebugFlag", &fsDebugFlag},
|
||||
|
@ -1664,6 +1688,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) {
|
|||
}
|
||||
}
|
||||
|
||||
_out:
|
||||
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -194,6 +194,14 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t offsetOfTimezone(char* tzStr, int64_t* offset) {
|
||||
if (tzStr && (tzStr[0] == 'z' || tzStr[0] == 'Z')) {
|
||||
*offset = 0;
|
||||
return 0;
|
||||
}
|
||||
return parseTimezone(tzStr, offset);
|
||||
}
|
||||
|
||||
/*
|
||||
* rfc3339 format:
|
||||
* 2013-04-12T15:52:01+08:00
|
||||
|
|
|
@ -678,7 +678,7 @@ _OVER:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||
static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -690,7 +690,12 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0);
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
||||
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
@ -706,7 +711,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
|
|||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndPersistTaskDropReq(pTrans, pTask) < 0) {
|
||||
if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1085,9 +1090,10 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
|
|||
|
||||
STransAction action = {0};
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
|
||||
TSDB_CODE_SYN_PROPOSE_NOT_READY);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(buf);
|
||||
|
@ -1283,12 +1289,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
if (pStream == NULL) {
|
||||
if (dropReq.igNotExists) {
|
||||
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name);
|
||||
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
tFreeSMDropStreamReq(&dropReq);
|
||||
return 0;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
mError("stream:%s not exist failed to drop", dropReq.name);
|
||||
tFreeSMDropStreamReq(&dropReq);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1660,7 +1667,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
||||
static int32_t mndPauseStreamTask(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
|
||||
|
@ -1673,8 +1680,12 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
|||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0);
|
||||
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
@ -1682,7 +1693,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t mndPauseAllStreamTasks(SMnode* pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
SArray *tasks = pStream->tasks;
|
||||
|
||||
int32_t size = taosArrayGetSize(tasks);
|
||||
|
@ -1691,7 +1702,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
|
|||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndPauseStreamTask(pTrans, pTask) < 0) {
|
||||
if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1768,7 +1779,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// pause all tasks
|
||||
if (mndPauseAllStreamTasks(pTrans, pStream) < 0) {
|
||||
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -1795,7 +1806,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
|
||||
static int32_t mndResumeStreamTask(STrans *pTrans, SMnode* pMnode, SStreamTask *pTask, int8_t igUntreated) {
|
||||
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1806,8 +1817,12 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
|
|||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->igUntreated = igUntreated;
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0);
|
||||
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
@ -1815,14 +1830,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) {
|
||||
int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode* pMnode, SStreamObj *pStream, int8_t igUntreated) {
|
||||
int32_t size = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
|
||||
if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1831,7 +1846,6 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
|
|||
}
|
||||
}
|
||||
}
|
||||
// pStream->pHTasksList is null
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1884,7 +1898,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// resume all tasks
|
||||
if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) {
|
||||
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -2534,8 +2548,12 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0);
|
||||
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
|
|
@ -1974,6 +1974,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
streamMetaInitBackend(pMeta);
|
||||
|
||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
|
@ -169,10 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
|
|||
}
|
||||
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
|
||||
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||
|
||||
streamMetaWLock(pWriter->pTq->pStreamMeta);
|
||||
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta);
|
||||
if (code == 0) {
|
||||
streamMetaInitBackend(pWriter->pTq->pStreamMeta);
|
||||
code = streamStateLoadTasks(pWriter);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pWriter->pTq->pStreamMeta);
|
||||
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
|
||||
taosMemoryFree(pWriter);
|
||||
return code;
|
||||
|
|
|
@ -144,7 +144,6 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
|
|||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
code = streamMetaReopen(pMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||
|
@ -153,6 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
|
|||
return code;
|
||||
}
|
||||
|
||||
streamMetaInitBackend(pMeta);
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
|
||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);
|
||||
|
|
|
@ -409,7 +409,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
|||
extern int32_t tsS3UploadDelaySec;
|
||||
long s3Size(const char *object_name);
|
||||
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
||||
committer->ctx->skipTsRow = false;
|
||||
if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) {
|
||||
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
|
||||
if (fobj && fobj->f->did.level == nlevel - 1) {
|
||||
|
@ -670,4 +669,4 @@ _exit:
|
|||
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -882,8 +882,38 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
|
|||
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
|
||||
if (numFile >= sttTrigger) {
|
||||
// launch merge
|
||||
code = tsdbSchedMerge(fs->tsdb, fset->fid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
bool skipMerge = false;
|
||||
{
|
||||
int32_t now = taosGetTimestampSec();
|
||||
|
||||
extern int8_t tsS3Enabled;
|
||||
extern int32_t tsS3UploadDelaySec;
|
||||
long s3Size(const char *object_name);
|
||||
int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs);
|
||||
if (tsS3Enabled && nlevel > 1) {
|
||||
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
|
||||
if (fobj && fobj->f->did.level == nlevel - 1) {
|
||||
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
||||
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
||||
|
||||
if (taosCheckExistFile(fobj->fname)) {
|
||||
int32_t mtime = 0;
|
||||
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
||||
if (mtime < now - tsS3UploadDelaySec) {
|
||||
skipMerge = true;
|
||||
}
|
||||
} else if (s3Size(object_name) > 0) {
|
||||
skipMerge = true;
|
||||
}
|
||||
}
|
||||
// new fset can be written with ts data
|
||||
}
|
||||
}
|
||||
|
||||
if (!skipMerge) {
|
||||
code = tsdbSchedMerge(fs->tsdb, fset->fid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
|
||||
|
|
|
@ -50,6 +50,7 @@ void remove_file(const char *fname, bool last_level) {
|
|||
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
|
||||
if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) {
|
||||
s3DeleteObjects(&object_name, 1);
|
||||
tsdbInfo("file:%s is removed from s3", fname);
|
||||
} else {
|
||||
tsdbError("file:%s remove failed", fname);
|
||||
}
|
||||
|
|
|
@ -103,7 +103,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
|
|||
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
TdFilePtr fdFrom = NULL;
|
||||
TdFilePtr fdTo = NULL;
|
||||
// TdFilePtr fdTo = NULL;
|
||||
|
||||
tsdbTFileName(rtner->tsdb, to, fname);
|
||||
|
||||
|
@ -359,7 +359,11 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
|||
s3EvictCache(fobj->fname, fsize * 2);
|
||||
}
|
||||
*/
|
||||
tsdbInfo("file:%s size: %" PRId64 " do migrate", fobj->fname, fobj->f->size);
|
||||
if (fobj->f->did.level > did.level) {
|
||||
continue;
|
||||
}
|
||||
tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level,
|
||||
did.level);
|
||||
|
||||
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
|
|
@ -1005,7 +1005,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
|||
size_t len = 0;
|
||||
char *keyJoined = taosStringBuilderGetResult(&sb, &len);
|
||||
|
||||
auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len);
|
||||
if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){
|
||||
auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len);
|
||||
}
|
||||
|
||||
taosStringBuilderDestroy(&sb);
|
||||
}
|
||||
|
@ -1227,7 +1229,9 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
size_t len = 0;
|
||||
char *keyJoined = taosStringBuilderGetResult(&sb, &len);
|
||||
|
||||
auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len);
|
||||
if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){
|
||||
auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len);
|
||||
}
|
||||
|
||||
taosStringBuilderDestroy(&sb);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
|
||||
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
|
||||
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
|
||||
#define DEAULT_DELETE_MARK INT64_MAX
|
||||
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
|
||||
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
|
||||
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
|
||||
|
|
|
@ -2784,11 +2784,44 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg,
|
|||
pNew->hasGroup = pAgg->hasGroup;
|
||||
pNew->node.pChildren = nodesCloneList(pAgg->node.pChildren);
|
||||
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pNew->node.pChildren) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
OPTIMIZE_FLAG_CLEAR_MASK(((SScanLogicNode*)pNode)->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH);
|
||||
int32_t code = 0;
|
||||
SNode* pNode = nodesListGetNode(pNew->node.pChildren, 0);
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||
SNodeList* pOldScanCols = NULL;
|
||||
TSWAP(pScan->pScanCols, pOldScanCols);
|
||||
nodesDestroyList(pScan->pScanPseudoCols);
|
||||
pScan->pScanPseudoCols = NULL;
|
||||
nodesDestroyList(pScan->node.pTargets);
|
||||
pScan->node.pTargets = NULL;
|
||||
SNodeListNode* list = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
list->pNodeList = pFunc;
|
||||
code = nodesCollectColumnsFromNode((SNode*)list, NULL, COLLECT_COL_TYPE_COL, &pScan->pScanCols);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
nodesFree(list);
|
||||
bool found = false;
|
||||
FOREACH(pNode, pScan->pScanCols) {
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
FOREACH(pNode, pOldScanCols) {
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) {
|
||||
nodesListMakeStrictAppend(&pScan->pScanCols, nodesCloneNode(pNode));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
nodesDestroyList(pOldScanCols);
|
||||
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
OPTIMIZE_FLAG_CLEAR_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH);
|
||||
}
|
||||
|
||||
*pNewAgg = pNew;
|
||||
|
|
|
@ -1083,6 +1083,15 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
|
|||
memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS);
|
||||
}
|
||||
|
||||
// trans current timezone's unix ts to dest timezone
|
||||
// offset = delta from dest timezone to zero
|
||||
// delta from zero to current timezone = 3600 * (cur)tsTimezone
|
||||
int64_t offset = 0;
|
||||
if (0 != offsetOfTimezone(tz, &offset)) {
|
||||
goto _end;
|
||||
}
|
||||
timeVal -= offset + 3600 * ((int64_t)tsTimezone);
|
||||
|
||||
struct tm tmInfo;
|
||||
int32_t len = 0;
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SS
|
|||
SSchJob* pJob = schAcquireJob(jobId);
|
||||
if (NULL == pJob) {
|
||||
qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_JOB_NOT_EXISTS);
|
||||
}
|
||||
|
||||
*job = pJob;
|
||||
|
|
|
@ -262,18 +262,33 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
|||
}
|
||||
}
|
||||
|
||||
// todo: not wait in a critical region
|
||||
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) {
|
||||
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
||||
taosMsleep(100);
|
||||
taosMemoryFree(defaultPath);
|
||||
taosMemoryFree(newPath);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo refactor: the lock shoud be restricted in one function
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta) {
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
|
||||
if (pMeta->streamBackend == NULL) {
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
while (1) {
|
||||
streamMetaWLock(pMeta);
|
||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
|
||||
if (pMeta->streamBackend != NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
||||
taosMsleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
|
||||
streamBackendLoadCheckpointInfo(pMeta);
|
||||
|
||||
taosMemoryFree(defaultPath);
|
||||
taosMemoryFree(newPath);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void streamMetaClear(SStreamMeta* pMeta) {
|
||||
|
|
|
@ -195,7 +195,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
|
|||
}
|
||||
}
|
||||
if (qDebugFlag & DEBUG_TRACE) {
|
||||
char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 16);
|
||||
char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 64);
|
||||
sprintf(buf, "[current: %s,", pFile->pCurrent);
|
||||
sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
|
||||
sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);
|
||||
|
@ -483,7 +483,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
|||
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
|
||||
SStreamSnapHandle* handle = &pWriter->handle;
|
||||
if (qDebugFlag & DEBUG_TRACE) {
|
||||
char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 16);
|
||||
char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 64);
|
||||
int n = sprintf(buf, "[");
|
||||
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
|
||||
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
|
||||
|
|
|
@ -52,6 +52,7 @@
|
|||
#include "lz4.h"
|
||||
#include "tRealloc.h"
|
||||
#include "tlog.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
#ifdef TD_TSZ
|
||||
#include "td_sz.h"
|
||||
|
@ -62,8 +63,6 @@ static const int32_t TEST_NUMBER = 1;
|
|||
#define SIMPLE8B_MAX_INT64 ((uint64_t)1152921504606846974LL)
|
||||
|
||||
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
|
||||
#define ZIGZAG_ENCODE(T, v) (((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1)) // zigzag encode
|
||||
#define ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) // zigzag decode
|
||||
|
||||
#ifdef TD_TSZ
|
||||
bool lossyFloat = false;
|
||||
|
@ -99,24 +98,7 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char
|
|||
15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15, 15};
|
||||
|
||||
// get the byte limit.
|
||||
int32_t word_length = 0;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
word_length = LONG_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
word_length = INT_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
word_length = SHORT_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
word_length = CHAR_BYTES;
|
||||
break;
|
||||
default:
|
||||
uError("Invalid compress integer type:%d", type);
|
||||
return -1;
|
||||
}
|
||||
int32_t word_length = getWordLength(type);
|
||||
|
||||
int32_t byte_limit = nelements * word_length + 1;
|
||||
int32_t opos = 1;
|
||||
|
@ -221,24 +203,9 @@ int32_t tsCompressINTImp(const char *const input, const int32_t nelements, char
|
|||
}
|
||||
|
||||
int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, char *const output, const char type) {
|
||||
|
||||
int32_t word_length = 0;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
word_length = LONG_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
word_length = INT_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
word_length = SHORT_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
word_length = CHAR_BYTES;
|
||||
break;
|
||||
default:
|
||||
uError("Invalid decompress integer type:%d", type);
|
||||
return -1;
|
||||
int32_t word_length = getWordLength(type);
|
||||
if (word_length == -1) {
|
||||
return word_length;
|
||||
}
|
||||
|
||||
// If not compressed.
|
||||
|
@ -247,8 +214,11 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
|
|||
return nelements * word_length;
|
||||
}
|
||||
|
||||
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11
|
||||
// 12 13 14 15
|
||||
#if __AVX2__
|
||||
tsDecompressIntImpl_Hw(input, nelements, output, type);
|
||||
return nelements * word_length;
|
||||
#else
|
||||
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
||||
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
|
||||
int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
|
||||
|
||||
|
@ -257,185 +227,6 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
|
|||
int32_t _pos = 0;
|
||||
int64_t prev_value = 0;
|
||||
|
||||
#if __AVX2__
|
||||
while (1) {
|
||||
if (_pos == nelements) break;
|
||||
|
||||
uint64_t w = 0;
|
||||
memcpy(&w, ip, LONG_BYTES);
|
||||
|
||||
char selector = (char)(w & INT64MASK(4)); // selector = 4
|
||||
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
|
||||
int32_t elems = selector_to_elems[(int32_t)selector];
|
||||
|
||||
// Optimize the performance, by remove the constantly switch operation.
|
||||
int32_t v = 4;
|
||||
uint64_t zigzag_value = 0;
|
||||
uint64_t mask = INT64MASK(bit);
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t* p = (int64_t*) output;
|
||||
|
||||
int32_t gRemainder = (nelements - _pos);
|
||||
int32_t num = (gRemainder > elems)? elems:gRemainder;
|
||||
|
||||
int32_t batch = num >> 2;
|
||||
int32_t remain = num & 0x03;
|
||||
if (selector == 0 || selector == 1) {
|
||||
if (tsAVX2Enable && tsSIMDEnable) {
|
||||
for (int32_t i = 0; i < batch; ++i) {
|
||||
__m256i prev = _mm256_set1_epi64x(prev_value);
|
||||
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
|
||||
_pos += 4;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < remain; ++i) {
|
||||
p[_pos++] = prev_value;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (tsAVX2Enable && tsSIMDEnable) {
|
||||
__m256i base = _mm256_set1_epi64x(w);
|
||||
__m256i maskVal = _mm256_set1_epi64x(mask);
|
||||
|
||||
__m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4);
|
||||
__m256i inc = _mm256_set1_epi64x(bit << 2);
|
||||
|
||||
for (int32_t i = 0; i < batch; ++i) {
|
||||
__m256i after = _mm256_srlv_epi64(base, shiftBits);
|
||||
__m256i zigzagVal = _mm256_and_si256(after, maskVal);
|
||||
|
||||
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
|
||||
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
|
||||
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);
|
||||
// get the four zigzag values here
|
||||
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
|
||||
|
||||
// calculate the cumulative sum (prefix sum) for each number
|
||||
// decode[0] = prev_value + final[0]
|
||||
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
|
||||
// decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
|
||||
// decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
|
||||
|
||||
// 1, 2, 3, 4
|
||||
//+ 0, 1, 0, 3
|
||||
// 1, 3, 3, 7
|
||||
// shift and add for the first round
|
||||
__m128i prev = _mm_set1_epi64x(prev_value);
|
||||
__m256i x = _mm256_slli_si256(delta, 8);
|
||||
|
||||
delta = _mm256_add_epi64(delta, x);
|
||||
_mm256_storeu_si256((__m256i *)&p[_pos], delta);
|
||||
|
||||
// 1, 3, 3, 7
|
||||
//+ 0, 0, 3, 3
|
||||
// 1, 3, 6, 10
|
||||
// shift and add operation for the second round
|
||||
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
|
||||
__m128i secondItem = _mm_set1_epi64x(p[_pos + 1]);
|
||||
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem);
|
||||
firstPart = _mm_add_epi64(firstPart, prev);
|
||||
secPart = _mm_add_epi64(secPart, prev);
|
||||
|
||||
// save it in the memory
|
||||
_mm_storeu_si128((__m128i *)&p[_pos], firstPart);
|
||||
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);
|
||||
|
||||
shiftBits = _mm256_add_epi64(shiftBits, inc);
|
||||
prev_value = p[_pos + 3];
|
||||
// uDebug("_pos:%d %"PRId64", %"PRId64", %"PRId64", %"PRId64, _pos, p[_pos], p[_pos+1], p[_pos+2], p[_pos+3]);
|
||||
_pos += 4;
|
||||
}
|
||||
|
||||
// handle the remain value
|
||||
for (int32_t i = 0; i < remain; i++) {
|
||||
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = prev_value;
|
||||
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
|
||||
|
||||
v += bit;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = prev_value;
|
||||
// uDebug("_pos:%d %"PRId64, _pos-1, p[_pos-1]);
|
||||
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
}
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t* p = (int32_t*) output;
|
||||
|
||||
if (selector == 0 || selector == 1) {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = (int32_t)prev_value;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = (int32_t)prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t* p = (int16_t*) output;
|
||||
|
||||
if (selector == 0 || selector == 1) {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = (int16_t)prev_value;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = (int16_t)prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t *p = (int8_t *)output;
|
||||
|
||||
if (selector == 0 || selector == 1) {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = (int8_t)prev_value;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = (int8_t)prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
}
|
||||
|
||||
ip += LONG_BYTES;
|
||||
}
|
||||
|
||||
return nelements * word_length;
|
||||
#else
|
||||
|
||||
while (1) {
|
||||
if (count == nelements) break;
|
||||
|
||||
|
@ -644,6 +435,8 @@ int32_t tsDecompressStringImp(const char *const input, int32_t compressedSize, c
|
|||
// TODO: Take care here, we assumes little endian encoding.
|
||||
int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
|
||||
int32_t _pos = 1;
|
||||
int32_t longBytes = LONG_BYTES;
|
||||
|
||||
ASSERTS(nelements >= 0, "nelements is negative");
|
||||
|
||||
if (nelements == 0) return 0;
|
||||
|
@ -684,25 +477,25 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements,
|
|||
}
|
||||
flags = flag1 | (flag2 << 4);
|
||||
// Encode the flag.
|
||||
if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, &flags, CHAR_BYTES);
|
||||
_pos += CHAR_BYTES;
|
||||
/* Here, we assume it is little endian encoding method. */
|
||||
// Encode dd1
|
||||
if (is_bigendian()) {
|
||||
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1);
|
||||
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
|
||||
} else {
|
||||
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd1), flag1);
|
||||
}
|
||||
_pos += flag1;
|
||||
// Encode dd2;
|
||||
if (is_bigendian()) {
|
||||
if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd2) + LONG_BYTES - flag2, flag2);
|
||||
if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd2) + longBytes - flag2, flag2);
|
||||
} else {
|
||||
if ((_pos + flag2 - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
if ((_pos + flag2 - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd2), flag2);
|
||||
}
|
||||
_pos += flag2;
|
||||
|
@ -715,15 +508,15 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements,
|
|||
flag2 = 0;
|
||||
flags = flag1 | (flag2 << 4);
|
||||
// Encode the flag.
|
||||
if ((_pos + CHAR_BYTES - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
if ((_pos + CHAR_BYTES - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, &flags, CHAR_BYTES);
|
||||
_pos += CHAR_BYTES;
|
||||
// Encode dd1;
|
||||
if (is_bigendian()) {
|
||||
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd1) + LONG_BYTES - flag1, flag1);
|
||||
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd1) + longBytes - flag1, flag1);
|
||||
} else {
|
||||
if ((_pos + flag1 - 1) >= nelements * LONG_BYTES) goto _exit_over;
|
||||
if ((_pos + flag1 - 1) >= nelements * longBytes) goto _exit_over;
|
||||
memcpy(output + _pos, (char *)(&dd1), flag1);
|
||||
}
|
||||
_pos += flag1;
|
||||
|
@ -734,17 +527,19 @@ int32_t tsCompressTimestampImp(const char *const input, const int32_t nelements,
|
|||
|
||||
_exit_over:
|
||||
output[0] = 0; // Means the string is not compressed
|
||||
memcpy(output + 1, input, nelements * LONG_BYTES);
|
||||
return nelements * LONG_BYTES + 1;
|
||||
memcpy(output + 1, input, nelements * longBytes);
|
||||
return nelements * longBytes + 1;
|
||||
}
|
||||
|
||||
int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelements, char *const output) {
|
||||
int64_t longBytes = LONG_BYTES;
|
||||
|
||||
ASSERTS(nelements >= 0, "nelements is negative");
|
||||
if (nelements == 0) return 0;
|
||||
|
||||
if (input[0] == 0) {
|
||||
memcpy(output, input + 1, nelements * LONG_BYTES);
|
||||
return nelements * LONG_BYTES;
|
||||
memcpy(output, input + 1, nelements * longBytes);
|
||||
return nelements * longBytes;
|
||||
} else if (input[0] == 1) { // Decompress
|
||||
int64_t *ostream = (int64_t *)output;
|
||||
|
||||
|
@ -763,7 +558,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
|
|||
delta_of_delta = 0;
|
||||
} else {
|
||||
if (is_bigendian()) {
|
||||
memcpy(((char *)(&dd1)) + LONG_BYTES - nbytes, input + ipos, nbytes);
|
||||
memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes);
|
||||
} else {
|
||||
memcpy(&dd1, input + ipos, nbytes);
|
||||
}
|
||||
|
@ -779,7 +574,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
|
|||
prev_value = prev_value + prev_delta;
|
||||
ostream[opos++] = prev_value;
|
||||
}
|
||||
if (opos == nelements) return nelements * LONG_BYTES;
|
||||
if (opos == nelements) return nelements * longBytes;
|
||||
|
||||
// Decode dd2
|
||||
uint64_t dd2 = 0;
|
||||
|
@ -788,7 +583,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
|
|||
delta_of_delta = 0;
|
||||
} else {
|
||||
if (is_bigendian()) {
|
||||
memcpy(((char *)(&dd2)) + LONG_BYTES - nbytes, input + ipos, nbytes);
|
||||
memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes);
|
||||
} else {
|
||||
memcpy(&dd2, input + ipos, nbytes);
|
||||
}
|
||||
|
@ -799,7 +594,7 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
|
|||
prev_delta = delta_of_delta + prev_delta;
|
||||
prev_value = prev_value + prev_delta;
|
||||
ostream[opos++] = prev_value;
|
||||
if (opos == nelements) return nelements * LONG_BYTES;
|
||||
if (opos == nelements) return nelements * longBytes;
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -807,11 +602,13 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
/* --------------------------------------------Double Compression
|
||||
* ---------------------------------------------- */
|
||||
|
||||
/* --------------------------------------------Double Compression ---------------------------------------------- */
|
||||
void encodeDoubleValue(uint64_t diff, uint8_t flag, char *const output, int32_t *const pos) {
|
||||
int32_t longBytes = LONG_BYTES;
|
||||
|
||||
uint8_t nbytes = (flag & INT8MASK(3)) + 1;
|
||||
int32_t nshift = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
|
||||
int32_t nshift = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
|
||||
diff >>= nshift;
|
||||
|
||||
while (nbytes) {
|
||||
|
@ -906,12 +703,14 @@ int32_t tsCompressDoubleImp(const char *const input, const int32_t nelements, ch
|
|||
}
|
||||
|
||||
FORCE_INLINE uint64_t decodeDoubleValue(const char *const input, int32_t *const ipos, uint8_t flag) {
|
||||
int32_t longBytes = LONG_BYTES;
|
||||
|
||||
uint64_t diff = 0ul;
|
||||
int32_t nbytes = (flag & 0x7) + 1;
|
||||
for (int32_t i = 0; i < nbytes; i++) {
|
||||
diff |= (((uint64_t)0xff & input[(*ipos)++]) << BITS_PER_BYTE * i);
|
||||
}
|
||||
int32_t shift_width = (LONG_BYTES * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
|
||||
int32_t shift_width = (longBytes * BITS_PER_BYTE - nbytes * BITS_PER_BYTE) * (flag >> 3);
|
||||
diff <<= shift_width;
|
||||
|
||||
return diff;
|
||||
|
@ -1061,14 +860,7 @@ uint32_t decodeFloatValue(const char *const input, int32_t *const ipos, uint8_t
|
|||
return diff;
|
||||
}
|
||||
|
||||
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
|
||||
float *ostream = (float *)output;
|
||||
|
||||
if (input[0] == 1) {
|
||||
memcpy(output, input + 1, nelements * FLOAT_BYTES);
|
||||
return nelements * FLOAT_BYTES;
|
||||
}
|
||||
|
||||
static void tsDecompressFloatHelper(const char *const input, const int32_t nelements, float* ostream) {
|
||||
uint8_t flags = 0;
|
||||
int32_t ipos = 1;
|
||||
int32_t opos = 0;
|
||||
|
@ -1094,6 +886,21 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c
|
|||
|
||||
ostream[opos++] = curr.real;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
|
||||
if (input[0] == 1) {
|
||||
memcpy(output, input + 1, nelements * FLOAT_BYTES);
|
||||
return nelements * FLOAT_BYTES;
|
||||
}
|
||||
|
||||
if (tsSIMDEnable && tsAVX2Enable) {
|
||||
tsDecompressFloatImplAvx2(input, nelements, output);
|
||||
} else if (tsSIMDEnable && tsAVX512Enable) {
|
||||
tsDecompressFloatImplAvx512(input, nelements, output);
|
||||
} else { // alternative implementation without SIMD instructions.
|
||||
tsDecompressFloatHelper(input, nelements, (float*)output);
|
||||
}
|
||||
|
||||
return nelements * FLOAT_BYTES;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,251 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "ttypes.h"
|
||||
#include "tcompression.h"
|
||||
|
||||
int32_t getWordLength(char type) {
|
||||
int32_t wordLength = 0;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
wordLength = LONG_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
wordLength = INT_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
wordLength = SHORT_BYTES;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
wordLength = CHAR_BYTES;
|
||||
break;
|
||||
default:
|
||||
uError("Invalid decompress integer type:%d", type);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return wordLength;
|
||||
}
|
||||
|
||||
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) {
|
||||
int32_t word_length = getWordLength(type);
|
||||
|
||||
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
||||
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
|
||||
int32_t selector_to_elems[] = {240, 120, 60, 30, 20, 15, 12, 10, 8, 7, 6, 5, 4, 3, 2, 1};
|
||||
|
||||
const char *ip = input + 1;
|
||||
int32_t count = 0;
|
||||
int32_t _pos = 0;
|
||||
int64_t prev_value = 0;
|
||||
|
||||
#if __AVX2__
|
||||
while (1) {
|
||||
if (_pos == nelements) break;
|
||||
|
||||
uint64_t w = 0;
|
||||
memcpy(&w, ip, LONG_BYTES);
|
||||
|
||||
char selector = (char)(w & INT64MASK(4)); // selector = 4
|
||||
char bit = bit_per_integer[(int32_t)selector]; // bit = 3
|
||||
int32_t elems = selector_to_elems[(int32_t)selector];
|
||||
|
||||
// Optimize the performance, by remove the constantly switch operation.
|
||||
int32_t v = 4;
|
||||
uint64_t zigzag_value = 0;
|
||||
uint64_t mask = INT64MASK(bit);
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t* p = (int64_t*) output;
|
||||
|
||||
int32_t gRemainder = (nelements - _pos);
|
||||
int32_t num = (gRemainder > elems)? elems:gRemainder;
|
||||
|
||||
int32_t batch = num >> 2;
|
||||
int32_t remain = num & 0x03;
|
||||
if (selector == 0 || selector == 1) {
|
||||
if (tsSIMDEnable && tsAVX2Enable) {
|
||||
for (int32_t i = 0; i < batch; ++i) {
|
||||
__m256i prev = _mm256_set1_epi64x(prev_value);
|
||||
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
|
||||
_pos += 4;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < remain; ++i) {
|
||||
p[_pos++] = prev_value;
|
||||
}
|
||||
} else if (tsSIMDEnable && tsAVX512Enable) {
|
||||
#if __AVX512F__
|
||||
// todo add avx512 impl
|
||||
#endif
|
||||
} else { // alternative implementation without SIMD instructions.
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (tsSIMDEnable && tsAVX2Enable) {
|
||||
__m256i base = _mm256_set1_epi64x(w);
|
||||
__m256i maskVal = _mm256_set1_epi64x(mask);
|
||||
|
||||
__m256i shiftBits = _mm256_set_epi64x(bit * 3 + 4, bit * 2 + 4, bit + 4, 4);
|
||||
__m256i inc = _mm256_set1_epi64x(bit << 2);
|
||||
|
||||
for (int32_t i = 0; i < batch; ++i) {
|
||||
__m256i after = _mm256_srlv_epi64(base, shiftBits);
|
||||
__m256i zigzagVal = _mm256_and_si256(after, maskVal);
|
||||
|
||||
// ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1)))
|
||||
__m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal);
|
||||
signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask);
|
||||
|
||||
// get the four zigzag values here
|
||||
__m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask);
|
||||
|
||||
// calculate the cumulative sum (prefix sum) for each number
|
||||
// decode[0] = prev_value + final[0]
|
||||
// decode[1] = decode[0] + final[1] -----> prev_value + final[0] + final[1]
|
||||
// decode[2] = decode[1] + final[2] -----> prev_value + final[0] + final[1] + final[2]
|
||||
// decode[3] = decode[2] + final[3] -----> prev_value + final[0] + final[1] + final[2] + final[3]
|
||||
|
||||
// 1, 2, 3, 4
|
||||
//+ 0, 1, 0, 3
|
||||
// 1, 3, 3, 7
|
||||
// shift and add for the first round
|
||||
__m128i prev = _mm_set1_epi64x(prev_value);
|
||||
__m256i x = _mm256_slli_si256(delta, 8);
|
||||
|
||||
delta = _mm256_add_epi64(delta, x);
|
||||
_mm256_storeu_si256((__m256i *)&p[_pos], delta);
|
||||
|
||||
// 1, 3, 3, 7
|
||||
//+ 0, 0, 3, 3
|
||||
// 1, 3, 6, 10
|
||||
// shift and add operation for the second round
|
||||
__m128i firstPart = _mm_loadu_si128((__m128i *)&p[_pos]);
|
||||
__m128i secondItem = _mm_set1_epi64x(p[_pos + 1]);
|
||||
__m128i secPart = _mm_add_epi64(_mm_loadu_si128((__m128i *)&p[_pos + 2]), secondItem);
|
||||
firstPart = _mm_add_epi64(firstPart, prev);
|
||||
secPart = _mm_add_epi64(secPart, prev);
|
||||
|
||||
// save it in the memory
|
||||
_mm_storeu_si128((__m128i *)&p[_pos], firstPart);
|
||||
_mm_storeu_si128((__m128i *)&p[_pos + 2], secPart);
|
||||
|
||||
shiftBits = _mm256_add_epi64(shiftBits, inc);
|
||||
prev_value = p[_pos + 3];
|
||||
_pos += 4;
|
||||
}
|
||||
|
||||
// handle the remain value
|
||||
for (int32_t i = 0; i < remain; i++) {
|
||||
zigzag_value = ((w >> (v + (batch * bit * 4))) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = prev_value;
|
||||
v += bit;
|
||||
}
|
||||
} else if (tsSIMDEnable && tsAVX512Enable) {
|
||||
#if __AVX512F__
|
||||
// todo add avx512 impl
|
||||
#endif
|
||||
} else { // alternative implementation without SIMD instructions.
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
}
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t* p = (int32_t*) output;
|
||||
|
||||
if (selector == 0 || selector == 1) {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = (int32_t)prev_value;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = (int32_t)prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t* p = (int16_t*) output;
|
||||
|
||||
if (selector == 0 || selector == 1) {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = (int16_t)prev_value;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = (int16_t)prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t *p = (int8_t *)output;
|
||||
|
||||
if (selector == 0 || selector == 1) {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
p[_pos++] = (int8_t)prev_value;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
|
||||
zigzag_value = ((w >> v) & mask);
|
||||
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
|
||||
|
||||
p[_pos++] = (int8_t)prev_value;
|
||||
v += bit;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
}
|
||||
|
||||
ip += LONG_BYTES;
|
||||
}
|
||||
|
||||
#endif
|
||||
return nelements * word_length;
|
||||
}
|
||||
|
||||
int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output) {
|
||||
#if __AVX512F__
|
||||
// todo add it
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo add later
|
||||
int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output) {
|
||||
#if __AVX2__
|
||||
#endif
|
||||
return 0;
|
||||
}
|
|
@ -504,6 +504,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status erro
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_IS_DROPPING, "Job is dropping")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_NOT_EXISTS, "Job no longer exist")
|
||||
|
||||
// parser
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYNTAX_ERROR, "syntax error near")
|
||||
|
|
|
@ -35,27 +35,27 @@ int64_t taosStrHumanToInt64(const char* str) {
|
|||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_PEBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_PEBIBYTE;
|
||||
} else if ((unit == 'T') || (unit == 't')) {
|
||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_TEBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_TEBIBYTE;
|
||||
} else if ((unit == 'G') || (unit == 'g')) {
|
||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_GIBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
|
||||
} else if ((unit == 'M') || (unit == 'm')) {
|
||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_MEBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
|
||||
} else if ((unit == 'K') || (unit == 'k')) {
|
||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_KIBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
|
||||
} else {
|
||||
val = atoll(str);
|
||||
}
|
||||
|
@ -93,17 +93,17 @@ int32_t taosStrHumanToInt32(const char* str) {
|
|||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_GIBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_GIBIBYTE;
|
||||
} else if ((unit == 'M') || (unit == 'm')) {
|
||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_MEBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_MEBIBYTE;
|
||||
} else if ((unit == 'K') || (unit == 'k')) {
|
||||
strNoUnit = taosMemoryCalloc(sLen, 1);
|
||||
memcpy(strNoUnit, str, sLen - 1);
|
||||
|
||||
val = atoll(strNoUnit) * UNIT_ONE_KIBIBYTE;
|
||||
val = atof(strNoUnit) * UNIT_ONE_KIBIBYTE;
|
||||
} else {
|
||||
val = atoll(str);
|
||||
}
|
||||
|
|
|
@ -120,6 +120,10 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/slimit.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/slimit.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/slimit.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 4
|
||||
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False
|
||||
|
|
|
@ -30,17 +30,16 @@ class TDTestCase:
|
|||
tdSql.query(f'select to_iso8601(ts) from {self.ntbname}')
|
||||
for i in range(self.rowNum):
|
||||
tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}')
|
||||
timezone_list = ['+0000','+0100','+0200','+0300','+0330','+0400','+0500','+0530','+0600','+0700','+0800','+0900','+1000','+1100','+1200',\
|
||||
'+00','+01','+02','+03','+04','+05','+06','+07','+08','+09','+10','+11','+12',\
|
||||
'+00:00','+01:00','+02:00','+03:00','+03:30','+04:00','+05:00','+05:30','+06:00','+07:00','+08:00','+09:00','+10:00','+11:00','+12:00',\
|
||||
'-0000','-0100','-0200','-0300','-0400','-0500','-0600','-0700','-0800','-0900','-1000','-1100','-1200',\
|
||||
'-00','-01','-02','-03','-04','-05','-06','-07','-08','-09','-10','-11','-12',\
|
||||
'-00:00','-01:00','-02:00','-03:00','-04:00','-05:00','-06:00','-07:00','-08:00','-09:00','-10:00','-11:00','-12:00',\
|
||||
'z','Z']
|
||||
for j in timezone_list:
|
||||
tdSql.query(f'select to_iso8601(ts,"{j}") from {self.ntbname}')
|
||||
for i in range(self.rowNum):
|
||||
tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{j}')
|
||||
|
||||
tz_list = ['+0000','+0530', '+00', '+06', '+00:00', '+12:00', '-0000', '-0900', '-00', '-05', '-00:00', '-03:00','z', 'Z']
|
||||
res_list = ['2021-12-31T16:00:00.000+0000', '2021-12-31T21:30:00.000+0530', '2021-12-31T16:00:00.000+00', '2021-12-31T22:00:00.000+06',\
|
||||
'2021-12-31T16:00:00.000+00:00', '2022-01-01T04:00:00.000+12:00','2021-12-31T16:00:00.000-0000','2021-12-31T07:00:00.000-0900',\
|
||||
'2021-12-31T16:00:00.000-00', '2021-12-31T11:00:00.000-05','2021-12-31T16:00:00.000-00:00','2021-12-31T13:00:00.000-03:00',\
|
||||
'2021-12-31T16:00:00.000z', '2021-12-31T16:00:00.000Z']
|
||||
for i in range(len(tz_list)):
|
||||
tdSql.query(f'select to_iso8601(ts,"{tz_list[i]}") from {self.ntbname} where c1==1')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0],res_list[i])
|
||||
|
||||
error_param_list = [0,100.5,'a','!']
|
||||
for i in error_param_list:
|
||||
tdSql.error(f'select to_iso8601(ts,"{i}") from {self.ntbname}')
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
|
||||
import taos
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def checksql(self, sql):
|
||||
result = os.popen("taos -s '%s'" %sql)
|
||||
res = result.read()
|
||||
print(res)
|
||||
if ("Query OK" in res):
|
||||
tdLog.info(f"checkEqual success")
|
||||
else :
|
||||
tdLog.exit(f"checkEqual error")
|
||||
|
||||
def check(self):
|
||||
conn = taos.connect()
|
||||
sql = "select 'a;b' as x"
|
||||
tdSql.query(f"%s" %sql)
|
||||
tdSql.checkRows(1)
|
||||
|
||||
self.checksql('select "a;b" as x\G')
|
||||
self.checksql('select "a;b" as x >> /tmp/res.txt')
|
||||
return
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.check()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -62,6 +62,8 @@ static void shellCleanup(void *arg);
|
|||
static void *shellCancelHandler(void *arg);
|
||||
static void *shellThreadLoop(void *arg);
|
||||
|
||||
static bool shellCmdkilled = false;
|
||||
|
||||
bool shellIsEmptyCommand(const char *cmd) {
|
||||
for (char c = *cmd++; c != 0; c = *cmd++) {
|
||||
if (c != ' ' && c != '\t' && c != ';') {
|
||||
|
@ -72,6 +74,8 @@ bool shellIsEmptyCommand(const char *cmd) {
|
|||
}
|
||||
|
||||
int32_t shellRunSingleCommand(char *command) {
|
||||
shellCmdkilled = false;
|
||||
|
||||
if (shellIsEmptyCommand(command)) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -199,22 +203,17 @@ void shellRunSingleCommandImp(char *command) {
|
|||
bool printMode = false;
|
||||
|
||||
if ((sptr = strstr(command, ">>")) != NULL) {
|
||||
cptr = strstr(command, ";");
|
||||
if (cptr != NULL) {
|
||||
*cptr = '\0';
|
||||
}
|
||||
|
||||
fname = sptr + 2;
|
||||
while (*fname == ' ') fname++;
|
||||
*sptr = '\0';
|
||||
}
|
||||
|
||||
if ((sptr = strstr(command, "\\G")) != NULL) {
|
||||
cptr = strstr(command, ";");
|
||||
cptr = strstr(fname, ";");
|
||||
if (cptr != NULL) {
|
||||
*cptr = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
if ((sptr = strstr(command, "\\G")) != NULL) {
|
||||
*sptr = '\0';
|
||||
printMode = true; // When output to a file, the switch does not work.
|
||||
}
|
||||
|
@ -262,7 +261,8 @@ void shellRunSingleCommandImp(char *command) {
|
|||
if (error_no == 0) {
|
||||
printf("Query OK, %"PRId64 " row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6);
|
||||
} else {
|
||||
printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
|
||||
terrno = error_no;
|
||||
printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(NULL), numOfRows, (et - st) / 1E6);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
} else {
|
||||
|
@ -957,7 +957,11 @@ void shellDumpResultCallback(void *param, TAOS_RES *tres, int num_of_rows) {
|
|||
}
|
||||
}
|
||||
dump_info->numOfAllRows += num_of_rows;
|
||||
taos_fetch_rows_a(tres, shellDumpResultCallback, param);
|
||||
if (!shellCmdkilled) {
|
||||
taos_fetch_rows_a(tres, shellDumpResultCallback, param);
|
||||
} else {
|
||||
tsem_post(&dump_info->sem);
|
||||
}
|
||||
} else {
|
||||
if (num_of_rows < 0) {
|
||||
printf("\033[31masync retrieve failed, code: %d\033[0m\n", num_of_rows);
|
||||
|
@ -972,13 +976,15 @@ int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool ver
|
|||
num_of_rows = shellDumpResultToFile(fname, tres);
|
||||
} else {
|
||||
tsDumpInfo dump_info;
|
||||
init_dump_info(&dump_info, tres, sql, vertical);
|
||||
taos_fetch_rows_a(tres, shellDumpResultCallback, &dump_info);
|
||||
tsem_wait(&dump_info.sem);
|
||||
num_of_rows = dump_info.numOfAllRows;
|
||||
if (!shellCmdkilled) {
|
||||
init_dump_info(&dump_info, tres, sql, vertical);
|
||||
taos_fetch_rows_a(tres, shellDumpResultCallback, &dump_info);
|
||||
tsem_wait(&dump_info.sem);
|
||||
num_of_rows = dump_info.numOfAllRows;
|
||||
}
|
||||
}
|
||||
|
||||
*error_no = taos_errno(tres);
|
||||
*error_no = shellCmdkilled ? TSDB_CODE_TSC_QUERY_KILLED : taos_errno(tres);
|
||||
return num_of_rows;
|
||||
}
|
||||
|
||||
|
@ -1202,6 +1208,7 @@ void *shellCancelHandler(void *arg) {
|
|||
} else {
|
||||
#endif
|
||||
if (shell.conn) {
|
||||
shellCmdkilled = true;
|
||||
taos_kill_query(shell.conn);
|
||||
}
|
||||
#ifdef WEBSOCKET
|
||||
|
|
|
@ -236,22 +236,17 @@ void shellRunSingleCommandWebsocketImp(char *command) {
|
|||
bool printMode = false;
|
||||
|
||||
if ((sptr = strstr(command, ">>")) != NULL) {
|
||||
cptr = strstr(command, ";");
|
||||
if (cptr != NULL) {
|
||||
*cptr = '\0';
|
||||
}
|
||||
|
||||
fname = sptr + 2;
|
||||
while (*fname == ' ') fname++;
|
||||
*sptr = '\0';
|
||||
}
|
||||
|
||||
if ((sptr = strstr(command, "\\G")) != NULL) {
|
||||
cptr = strstr(command, ";");
|
||||
cptr = strstr(fname, ";");
|
||||
if (cptr != NULL) {
|
||||
*cptr = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
if ((sptr = strstr(command, "\\G")) != NULL) {
|
||||
*sptr = '\0';
|
||||
printMode = true; // When output to a file, the switch does not work.
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue