Merge remote-tracking branch 'origin/3.0' into enh/TD-23926-3.0

This commit is contained in:
xjzhou 2024-06-27 17:02:54 +08:00
commit 12c64a1c45
27 changed files with 563 additions and 425 deletions

View File

@ -398,7 +398,7 @@ Conversion functions change the data type of a value.
CAST(expr AS type_name)
```
**Description**: Convert the input data `expr` into the type specified by `type_name`. This function can be used only in SELECT statements.
**Description**: Convert the input data `expr` into the type specified by `type_name`.
**Return value type**: The type specified by parameter `type_name`
@ -435,8 +435,7 @@ TO_ISO8601(expr [, timezone])
**More explanations**:
- You can specify a time zone in the following format: [z/Z, +/-hhmm, +/-hh, +/-hh:mm]. For example, TO_ISO8601(1, "+00:00").
- If the input is a UNIX timestamp, the precision of the returned value is determined by the digits of the input timestamp
- If the input is a column of TIMESTAMP type, the precision of the returned value is same as the precision set for the current data base in use
- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milliseconds will be used if no table is specified.
#### TO_JSON
@ -650,6 +649,7 @@ use_current_timezone: {
- Time unit specified by `time_unit` can be:
1b (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), or 1w (weeks)
- The precision of the returned timestamp is same as the precision set for the current data base in use
- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milliseconds will be used if no table is specified.
- If the input data is not formatted as a timestamp, the returned value is null.
- When using 1d/1w as the time unit to truncate timestamp, you can specify whether to truncate based on the current time zone by setting the use_current_timezone parameter.
Value 0 indicates truncation using the UTC time zone, value 1 indicates truncation using the current time zone.

View File

@ -398,7 +398,7 @@ UPPER(expr)
CAST(expr AS type_name)
```
**功能说明**:数据类型转换函数,返回 expr 转换为 type_name 指定的类型后的结果。只适用于 select 子句中。
**功能说明**:数据类型转换函数,返回 expr 转换为 type_name 指定的类型后的结果。
**返回结果类型**CAST 中指定的类型type_name)。
@ -435,8 +435,7 @@ TO_ISO8601(expr [, timezone])
**使用说明**
- timezone 参数允许输入的时区格式为: [z/Z, +/-hhmm, +/-hh, +/-hh:mm]。例如TO_ISO8601(1, "+00:00")。
- 如果输入是表示 UNIX 时间戳的整形,返回格式精度由时间戳的位数决定;
- 如果输入是 TIMESTAMP 类型的列,返回格式的时间戳精度与当前 DATABASE 设置的时间精度一致。
- 输入时间戳的精度由所查询表的精度确定, 若未指定表, 则精度为毫秒.
#### TO_JSON
@ -650,6 +649,7 @@ use_current_timezone: {
- 支持的时间单位 time_unit 如下:
1b(纳秒), 1u(微秒)1a(毫秒)1s(秒)1m(分)1h(小时)1d(天), 1w(周)。
- 返回的时间戳精度与当前 DATABASE 设置的时间精度一致。
- 输入时间戳的精度由所查询表的精度确定, 若未指定表, 则精度为毫秒.
- 输入包含不符合时间日期格式的字符串则返回 NULL。
- 当使用 1d/1w 作为时间单位对时间戳进行截断时, 可通过设置 use_current_timezone 参数指定是否根据当前时区进行截断处理。
值 0 表示使用 UTC 时区进行截断,值 1 表示使用当前时区进行截断。

View File

@ -277,6 +277,7 @@ void taosSetGlobalDebugFlag(int32_t flag);
void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal);
void taosLocalCfgForbiddenToChange(char *name, bool *forbidden);
int8_t taosGranted(int8_t type);
int32_t taosSetSlowLogScope(char *pScope);
#ifdef __cplusplus
}

View File

@ -31,9 +31,7 @@ typedef enum SQL_RESULT_CODE {
SQL_RESULT_CANCEL = 2,
} SQL_RESULT_CODE;
#define SLOW_LOG_SEND_SIZE 1024*1024
extern tsem2_t monitorSem;
extern STaosQueue* monitorQueue;
#define SLOW_LOG_SEND_SIZE 32*1024
typedef struct {
int64_t clusterId;
@ -55,18 +53,14 @@ typedef struct {
void monitorClose();
void monitorInit();
void monitorSendAllSlowLogFromTempDir(void* pInst);
void monitorClientSQLReqInit(int64_t clusterKey);
void monitorClientSlowQueryInit(int64_t clusterId);
void monitorCreateClient(int64_t clusterId);
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys);
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values);
void* monitorThreadFunc(void *param);
void monitorFreeSlowLogData(MonitorSlowLogData* pData);
const char* monitorResultStr(SQL_RESULT_CODE code);
void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet);
int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value);
#ifdef __cplusplus
}
#endif

View File

@ -107,17 +107,17 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
}
char clusterId[32] = {0};
if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0){
uError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId);
}
char startTs[32] = {0};
if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start/1000) < 0){
uError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000);
tscError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000);
}
char requestId[32] = {0};
if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0){
uError("failed to generate requestId:%" PRIu64, pRequest->requestId);
tscError("failed to generate requestId:%" PRIu64, pRequest->requestId);
}
cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId));
cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs));
@ -126,7 +126,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code));
cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code)));
cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType));
cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.totalRows));
cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows));
if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
@ -142,7 +142,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
char pid[32] = {0};
if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0){
uError("failed to generate pid:%d", appInfo.pid);
tscError("failed to generate pid:%d", appInfo.pid);
}
cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid));
@ -153,25 +153,14 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_
}else if(pRequest->pDb != NULL){
cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb));
}else{
cJSON_AddItemToObject(json, "db", cJSON_CreateString("unknown"));
cJSON_AddItemToObject(json, "db", cJSON_CreateString(""));
}
char* value = cJSON_PrintUnformatted(json);
if(monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, value) < 0){
taosMemoryFree(value);
}
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
if (slowLogData == NULL) {
cJSON_Delete(json);
tscError("[monitor] failed to allocate slow log data");
return;
}
slowLogData->clusterId = pTscObj->pAppInfo->clusterId;
slowLogData->value = cJSON_PrintUnformatted(json);
tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
tsem2_post(&monitorSem);
}else{
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
}
cJSON_Delete(json);
}

View File

@ -85,31 +85,6 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
return *(SAppInstInfo**)p;
}
static int32_t tscMonitortInit() {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
TdThread monitorThread;
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
uError("failed to create monitor thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
return 0;
}
static void tscMonitorStop() {
if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) {
uDebug("monitor thread already stopped");
return;
}
while (atomic_load_32(&slowLogFlag) > 0) {
taosMsleep(100);
}
}
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
if (TSDB_CODE_SUCCESS != code) {
uError("found error in monitorReport send callback, code:%d, please check the network.", code);
@ -161,7 +136,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO
return code;
}
void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){
static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){
char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log
int32_t offset = 0;
@ -268,8 +243,9 @@ static void sendAllSlowLog(){
uDebug("[monitor] sendAllSlowLog when client close");
}
void monitorSendAllSlowLogFromTempDir(void* inst){
SAppInstInfo* pInst = (SAppInstInfo*)inst;
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){
uInfo("[monitor] monitor is disabled, skip send slow log");
return;
@ -350,45 +326,6 @@ static void sendAllCounter(){
}
}
void monitorInit() {
uInfo("[monitor] tscMonitor init");
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) {
uError("failed to create monitorCounterHash");
}
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) {
uError("failed to create monitorSlowLogHash");
}
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) {
uError("failed to create monitor timer");
}
taosInitRWLatch(&monitorLock);
tscMonitortInit();
}
void monitorClose() {
uInfo("[monitor] tscMonitor close");
taosRLockLatch(&monitorLock);
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
uDebug("[monitor] monitorFlag is not 0");
}
tscMonitorStop();
sendAllSlowLog();
sendAllCounter();
taosHashCleanup(monitorCounterHash);
taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer);
taosRUnLockLatch(&monitorLock);
}
void monitorCreateClient(int64_t clusterId) {
MonitorClient* pMonitor = NULL;
taosWLockLatch(&monitorLock);
@ -477,7 +414,7 @@ end:
}
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
taosRLockLatch(&monitorLock);
taosWLockLatch(&monitorLock);
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
if (ppMonitor == NULL || *ppMonitor == NULL) {
uError("monitorCounterInc not found pMonitor %"PRId64, clusterId);
@ -494,7 +431,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char**
uInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName);
end:
taosRUnLockLatch(&monitorLock);
taosWUnLockLatch(&monitorLock);
}
const char* monitorResultStr(SQL_RESULT_CODE code) {
@ -502,16 +439,16 @@ const char* monitorResultStr(SQL_RESULT_CODE code) {
return result_state[code];
}
void monitorFreeSlowLogData(MonitorSlowLogData* pData) {
static void monitorFreeSlowLogData(MonitorSlowLogData* pData) {
if (pData == NULL) {
return;
}
taosMemoryFree(pData->value);
}
void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); }
void reportSlowLog(void* param, void* tmrId) {
static void reportSlowLog(void* param, void* tmrId) {
taosRLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosRUnLockLatch(&monitorLock);
@ -538,8 +475,8 @@ void reportSlowLog(void* param, void* tmrId) {
taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
}
void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
taosRLockLatch(&monitorLock);
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
taosWLockLatch(&monitorLock);
TdFilePtr pFile = NULL;
void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
if (tmp == NULL){
@ -594,10 +531,10 @@ void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){
uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId);
FAILED:
taosRUnLockLatch(&monitorLock);
taosWUnLockLatch(&monitorLock);
}
void* monitorThreadFunc(void *param){
static void* monitorThreadFunc(void *param){
setThreadName("client-monitor-slowlog");
#ifdef WINDOWS
@ -631,6 +568,7 @@ void* monitorThreadFunc(void *param){
uError("open queue error since %s", terrstr());
return NULL;
}
uDebug("monitorThreadFunc start");
while (1) {
if (slowLogFlag > 0) break;
@ -638,7 +576,11 @@ void* monitorThreadFunc(void *param){
taosReadQitem(monitorQueue, (void**)&slowLogData);
if (slowLogData != NULL) {
uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
monitorWriteSlowLog2File(slowLogData, tmpPath);
if (slowLogData->value == NULL){
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
}else{
monitorWriteSlowLog2File(slowLogData, tmpPath);
}
}
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
@ -650,3 +592,88 @@ void* monitorThreadFunc(void *param){
slowLogFlag = -2;
return NULL;
}
static int32_t tscMonitortInit() {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
TdThread monitorThread;
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
uError("failed to create monitor thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
return 0;
}
static void tscMonitorStop() {
if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) {
uDebug("monitor thread already stopped");
return;
}
while (atomic_load_32(&slowLogFlag) > 0) {
taosMsleep(100);
}
}
void monitorInit() {
uInfo("[monitor] tscMonitor init");
monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) {
uError("failed to create monitorCounterHash");
}
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) {
uError("failed to create monitorSlowLogHash");
}
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) {
uError("failed to create monitor timer");
}
taosInitRWLatch(&monitorLock);
tscMonitortInit();
}
void monitorClose() {
uInfo("[monitor] tscMonitor close");
taosWLockLatch(&monitorLock);
if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) {
uDebug("[monitor] monitorFlag is not 0");
}
tscMonitorStop();
sendAllSlowLog();
sendAllCounter();
taosHashCleanup(monitorCounterHash);
taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer);
taosWUnLockLatch(&monitorLock);
}
int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){
MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0);
if (slowLogData == NULL) {
uError("[monitor] failed to allocate slow log data");
return -1;
}
slowLogData->clusterId = clusterId;
slowLogData->value = value;
uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value);
while (monitorQueue == NULL) {
taosMsleep(100);
}
if (taosWriteQitem(monitorQueue, slowLogData) == 0){
tsem2_post(&monitorSem);
}else{
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
}
return 0;
}

View File

@ -155,7 +155,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
}
monitorSendAllSlowLogFromTempDir(pTscObj->pAppInfo);
monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, NULL);
monitorClientSlowQueryInit(connectRsp.clusterId);
monitorClientSQLReqInit(connectRsp.clusterId);
}

View File

@ -769,34 +769,35 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
}
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
SMqHbRsp rsp = {0};
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
if (pMsg == NULL) {
return code;
}
SMqHbRsp rsp = {0};
tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp);
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
taosWLockLatch(&tmq->lock);
for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
if (privilege->noPrivilege == 1) {
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (strcmp(pTopicCur->topicName, privilege->topic) == 0) {
tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
pTopicCur->noPrivilege = 1;
}
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
taosWLockLatch(&tmq->lock);
for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) {
STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i);
if (privilege->noPrivilege == 1) {
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (strcmp(pTopicCur->topicName, privilege->topic) == 0) {
tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic);
pTopicCur->noPrivilege = 1;
}
}
}
taosWUnLockLatch(&tmq->lock);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
tDestroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosWUnLockLatch(&tmq->lock);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
tDestroySMqHbRsp(&rsp);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return 0;
}
@ -984,10 +985,16 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
}
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
if(param == NULL) {
return code;
}
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code;
taosMemoryFree(pMsg->pEpSet);
if(pMsg){
taosMemoryFree(pMsg->pEpSet);
}
tsem_post(&pParam->rspSem);
return 0;
}
@ -2565,6 +2572,7 @@ end:
}
int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if(param == NULL) return code;
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {
@ -2577,6 +2585,9 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
goto END;
}
if (pMsg == NULL) {
goto END;
}
SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
@ -2606,18 +2617,24 @@ END:
FAIL:
if (pParam->sync) {
SAskEpInfo* pInfo = pParam->pParam;
pInfo->code = code;
tsem_post(&pInfo->sem);
if(pInfo) {
pInfo->code = code;
tsem_post(&pInfo->sem);
}
}
if(pMsg){
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
}
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pParam);
return code;
}
int32_t syncAskEp(tmq_t* pTmq) {
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY;
tsem_init(&pInfo->sem, 0, 0);
askEp(pTmq, pInfo, true, false);
@ -2769,6 +2786,9 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
}
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
if(param == NULL) {
return code;
}
SMqVgWalInfoParam* pParam = param;
SMqVgCommon* pCommon = pParam->pCommon;
@ -2800,8 +2820,11 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&pCommon->rsp);
}
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
if(pMsg){
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
return 0;
}
@ -3233,6 +3256,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
if(param == NULL) {
return code;
}
SMqSeekParam* pParam = param;
pParam->code = code;
tsem_post(&pParam->sem);

View File

@ -107,7 +107,7 @@ TEST(clientMonitorTest, ReadOneFile) {
SEpSet* epSet = NULL;
// Call the function to be tested
monitorReadSendSlowLog(pFile, pTransporter, epSet);
// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet);
char value[size] = {0};
memset(value, '0', size - 1);
@ -115,7 +115,7 @@ TEST(clientMonitorTest, ReadOneFile) {
uError("failed to write len to file:%p since %s", pFile, terrstr());
}
monitorReadSendSlowLog(pFile, pTransporter, epSet);
// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet);
// Clean up any resources created for testing
taosCloseFile(&pFile);
@ -164,6 +164,6 @@ TEST(clientMonitorTest, ReadTwoFile) {
pAppInfo.clusterId = 2;
pAppInfo.monitorParas.tsEnableMonitor = 1;
strcpy(tsTempDir,"/tmp");
monitorSendAllSlowLogFromTempDir(&pAppInfo);
// monitorSendAllSlowLogFromTempDir(&pAppInfo);
}

View File

@ -180,7 +180,7 @@ int32_t tsMaxRetryWaitTime = 10000;
bool tsUseAdapter = false;
int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 10; // seconds
int32_t tsSlowLogThresholdTest = 10; // seconds
int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds
char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY;
char* tsSlowLogScopeString = "query";
@ -971,7 +971,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32;
}
static int32_t taosSetSlowLogScope(char *pScope) {
int32_t taosSetSlowLogScope(char *pScope) {
if (NULL == pScope || 0 == strlen(pScope)) {
return SLOW_LOG_TYPE_QUERY;
}
@ -982,7 +982,7 @@ static int32_t taosSetSlowLogScope(char *pScope) {
char *tmp = NULL;
while((scope = strsep(&pScope, "|")) != NULL){
taosMemoryFreeClear(tmp);
tmp = strdup(scope);
tmp = taosStrdup(scope);
strtrim(tmp);
if (0 == strcasecmp(tmp, "all")) {
slowScope |= SLOW_LOG_TYPE_ALL;

View File

@ -743,7 +743,7 @@ static int32_t taosSetSlowLogScope(char *pScope) {
char *tmp = NULL;
while((scope = strsep(&pScope, "|")) != NULL){
taosMemoryFreeClear(tmp);
tmp = strdup(scope);
tmp = taosStrdup(scope);
strtrim(tmp);
if (0 == strcasecmp(tmp, "all")) {
slowScope |= SLOW_LOG_TYPE_ALL;

View File

@ -144,6 +144,11 @@ typedef enum {
DND_REASON_ENABLE_WHITELIST_NOT_MATCH,
DND_REASON_ENCRYPTION_KEY_NOT_MATCH,
DND_REASON_STATUS_MONITOR_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH,
DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH,
DND_REASON_OTHERS
} EDndReason;

View File

@ -49,6 +49,12 @@ static const char *offlineReason[] = {
"ttlChangeOnWrite not match",
"enableWhiteList not match",
"encryptionKey not match",
"monitor not match",
"monitor switch not match",
"monitor interval not match",
"monitor slow log threshold not match",
"monitor slow log sql max len not match",
"monitor slow log scope not match",
"unknown",
};
@ -438,20 +444,20 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
}
}
#define CHECK_MONITOR_PARA(para) \
#define CHECK_MONITOR_PARA(para,err) \
if (pCfg->monitorParas.para != para) { \
mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \
terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS; \
return DND_REASON_STATUS_MONITOR_NOT_MATCH;\
terrno = err; \
return err;\
}
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
CHECK_MONITOR_PARA(tsEnableMonitor);
CHECK_MONITOR_PARA(tsMonitorInterval);
CHECK_MONITOR_PARA(tsSlowLogThreshold);
CHECK_MONITOR_PARA(tsSlowLogThresholdTest);
CHECK_MONITOR_PARA(tsSlowLogMaxLen);
CHECK_MONITOR_PARA(tsSlowLogScope);
CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH);
CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogThresholdTest, DND_REASON_STATUS_MONITOR_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH);
CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH);
if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) {
mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb);
@ -557,9 +563,6 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
SStatisReq statisReq = {0};
int32_t code = -1;
char strClusterId[TSDB_CLUSTER_ID_LEN] = {0};
sprintf(strClusterId, "%" PRId64, pMnode->clusterId);
if (tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return code;

View File

@ -116,7 +116,7 @@ int32_t hJoinLaunchPrimExpr(SSDataBlock* pBlock, SHJoinTableCtx* pTable, int32_t
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
if (0 != pCtx->timezoneUnit) {
for (int32_t i = startIdx; i <= endIdx; ++i) {
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit - pCtx->timezoneUnit;
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] - pCtx->timezoneUnit) % pCtx->truncateUnit;
}
} else {
for (int32_t i = startIdx; i <= endIdx; ++i) {

View File

@ -948,7 +948,7 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
if (0 != pCtx->timezoneUnit) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit - pCtx->timezoneUnit;
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
}
} else {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {

View File

@ -2166,15 +2166,6 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0))) {
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 0);
if (!validateTimestampDigits(pValue)) {
pFunc->node.resType = (SDataType){.bytes = 0, .type = TSDB_DATA_TYPE_BINARY};
return TSDB_CODE_SUCCESS;
}
}
// param1
if (numOfParams == 2) {
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);

View File

@ -624,6 +624,23 @@ static int32_t sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typ
}
return 0;
}
static int8_t sifShouldUseIndexBasedOnType(SIFParam *left, SIFParam *right) {
// not compress
if (left->colValType == TSDB_DATA_TYPE_FLOAT) return 0;
if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY ||
left->colValType == TSDB_DATA_TYPE_JSON || right->colValType == TSDB_DATA_TYPE_JSON) {
return 0;
}
if (IS_VAR_DATA_TYPE(left->colValType)) {
if (!IS_VAR_DATA_TYPE(right->colValType)) return 0;
} else if (IS_NUMERIC_TYPE(left->colValType)) {
if (left->colValType != right->colValType) return 0;
}
return 1;
}
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
int ret = 0;
SIndexMetaArg *arg = &output->arg;
@ -641,8 +658,10 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
indexMultiTermQueryDestroy(mtm);
} else {
if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY) {
return TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR;
int8_t useIndex = sifShouldUseIndexBasedOnType(left, right);
if (!useIndex) {
output->status = SFLT_NOT_INDEX;
return -1;
}
bool reverse = false, equal = false;
@ -660,15 +679,12 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
SDataTypeBuf typedata;
memset(&typedata, 0, sizeof(typedata));
if (IS_VAR_DATA_TYPE(left->colValType)) {
if (!IS_VAR_DATA_TYPE(right->colValType)) {
NUM_TO_STRING(right->colValType, right->condValue, sizeof(buf) - 2, buf + VARSTR_HEADER_SIZE);
varDataSetLen(buf, strlen(buf + VARSTR_HEADER_SIZE));
param.val = buf;
}
} else {
if (sifSetFltParam(left, right, &typedata, &param) != 0) return -1;
if (sifSetFltParam(left, right, &typedata, &param) != 0) {
output->status = SFLT_NOT_INDEX;
return -1;
}
ret = left->api.metaFilterTableIds(arg->metaEx, &param, output->result);
if (ret == 0) {
taosArraySort(output->result, uidCompare);

View File

@ -1095,24 +1095,45 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
char fraction[20] = {0};
bool hasFraction = false;
NUM_TO_STRING(type, input, sizeof(fraction), fraction);
int32_t tsDigits = (int32_t)strlen(fraction);
int32_t fractionLen;
char buf[64] = {0};
int64_t timeVal;
char* format = NULL;
int64_t quot = 0;
long mod = 0;
GET_TYPED_DATA(timeVal, int64_t, type, input);
if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) {
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal / 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal / ((int64_t)(1000 * 1000));
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / ((int64_t)(1000 * 1000 * 1000));
} else {
switch (pInput->columnData[0].info.precision) {
case TSDB_TIME_PRECISION_MILLI: {
quot = timeVal / 1000;
fractionLen = 5;
format = ".%03" PRId64;
mod = timeVal % 1000;
break;
}
case TSDB_TIME_PRECISION_MICRO: {
quot = timeVal / 1000000;
fractionLen = 8;
format = ".%06" PRId64;
mod = timeVal % 1000000;
break;
}
case TSDB_TIME_PRECISION_NANO: {
quot = timeVal / 1000000000;
fractionLen = 11;
format = ".%09" PRId64;
mod = timeVal % 1000000000;
break;
}
default: {
colDataSetNULL(pOutput->columnData, i);
continue;
}
hasFraction = true;
memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS);
}
// trans current timezone's unix ts to dest timezone
@ -1122,18 +1143,19 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
if (0 != offsetOfTimezone(tz, &offset)) {
goto _end;
}
timeVal -= offset + 3600 * ((int64_t)tsTimezone);
quot -= offset + 3600 * ((int64_t)tsTimezone);
struct tm tmInfo;
int32_t len = 0;
if (taosLocalTime((const time_t *)&timeVal, &tmInfo, buf) == NULL) {
if (taosLocalTime((const time_t *)&quot, &tmInfo, buf) == NULL) {
len = (int32_t)strlen(buf);
goto _end;
}
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo);
len = (int32_t)strlen(buf);
len = (int32_t)strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo);
len += snprintf(buf + len, fractionLen, format, mod);
// add timezone string
if (tzLen > 0) {
@ -1141,32 +1163,6 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
len += tzLen;
}
if (hasFraction) {
int32_t fracLen = (int32_t)strlen(fraction) + 1;
char *tzInfo;
if (buf[len - 1] == 'z' || buf[len - 1] == 'Z') {
tzInfo = &buf[len - 1];
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
} else {
tzInfo = strchr(buf, '+');
if (tzInfo) {
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
} else {
// search '-' backwards
tzInfo = strrchr(buf, '-');
if (tzInfo) {
memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo));
}
}
}
char tmp[32] = {0};
sprintf(tmp, ".%s", fraction);
memcpy(tzInfo, tmp, fracLen);
len += fracLen;
}
_end:
memmove(buf + VARSTR_HEADER_SIZE, buf, len);
varDataSetLen(buf, len);
@ -1347,9 +1343,6 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[timePrecIdx]), pInput[timePrecIdx].columnData->pData);
memcpy(timezone, varDataVal(pInput[timeZoneIdx].columnData->pData), varDataLen(pInput[timeZoneIdx].columnData->pData));
int64_t factor = TSDB_TICK_PER_SECOND(timePrec);
int64_t unit = timeUnit * 1000 / factor;
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) {
colDataSetNULL(pOutput->columnData, i);
@ -1359,201 +1352,27 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
char *input = colDataGetData(pInput[0].columnData, i);
if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */
int32_t ret = convertStringToTimestamp(type, input, TSDB_TIME_PRECISION_NANO, &timeVal);
int32_t ret = convertStringToTimestamp(type, input, timePrec, &timeVal);
if (ret != TSDB_CODE_SUCCESS) {
colDataSetNULL(pOutput->columnData, i);
continue;
}
// If converted value is less than 10digits in second, use value in second instead
int64_t timeValSec = timeVal / 1000000000;
if (timeValSec < 1000000000) {
timeVal = timeValSec;
}
} else if (type == TSDB_DATA_TYPE_BIGINT) { /* unix timestamp */
GET_TYPED_DATA(timeVal, int64_t, type, input);
} else if (type == TSDB_DATA_TYPE_TIMESTAMP) { /* timestamp column*/
GET_TYPED_DATA(timeVal, int64_t, type, input);
int64_t timeValSec = timeVal / factor;
if (timeValSec < 1000000000) {
timeVal = timeValSec;
}
}
char buf[20] = {0};
NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal, sizeof(buf), buf);
int32_t tsDigits = (int32_t)strlen(buf);
switch (unit) {
case 0: { /* 1u or 1b */
if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) {
timeVal = timeVal * 1;
} else {
timeVal = timeVal / 1000 * 1000;
}
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor;
} else {
timeVal = timeVal * 1;
}
break;
}
case 1: { /* 1a */
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal * 1;
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal / 1000 * 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000000 * 1000000;
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor;
} else {
colDataSetNULL(pOutput->columnData, i);
continue;
}
break;
}
case 1000: { /* 1s */
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal / 1000 * 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal / 1000000 * 1000000;
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000000000 * 1000000000;
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor;
} else {
colDataSetNULL(pOutput->columnData, i);
continue;
}
break;
}
case 60000: { /* 1m */
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal / 1000 / 60 * 60 * 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal / 1000000 / 60 * 60 * 1000000;
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000000000 / 60 * 60 * 1000000000;
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor / factor / 60 * 60 * factor;
} else {
colDataSetNULL(pOutput->columnData, i);
continue;
}
break;
}
case 3600000: { /* 1h */
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal / 1000 / 3600 * 3600 * 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal / 1000000 / 3600 * 3600 * 1000000;
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000000000 / 3600 * 3600 * 1000000000;
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor / factor / 3600 * 3600 * factor;
} else {
colDataSetNULL(pOutput->columnData, i);
continue;
}
break;
}
case 86400000: { /* 1d */
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000)) % (((int64_t)86400) * 1000);
} else {
timeVal = timeVal / 1000 / 86400 * 86400 * 1000;
}
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000)) % (((int64_t)86400) * 1000000);
} else {
timeVal = timeVal / 1000000 / 86400 * 86400 * 1000000;
}
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000000)) % (((int64_t)86400) * 1000000000);
} else {
timeVal = timeVal / 1000000000 / 86400 * 86400 * 1000000000;
}
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
if (ignoreTz) {
timeVal = (timeVal - (timeVal + offsetFromTz(timezone, 1)) % (86400L)) * factor;
} else {
timeVal = timeVal * factor / factor / 86400 * 86400 * factor;
}
} else {
colDataSetNULL(pOutput->columnData, i);
continue;
}
break;
}
case 604800000: { /* 1w */
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000)) % (((int64_t)604800) * 1000);
} else {
timeVal = timeVal / 1000 / 604800 * 604800 * 1000;
}
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000)) % (((int64_t)604800) * 1000000);
} else {
timeVal = timeVal / 1000000 / 604800 * 604800 * 1000000;
}
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000000)) % (((int64_t)604800) * 1000000000);
} else {
timeVal = timeVal / 1000000000 / 604800 * 604800 * 1000000000;
}
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
if (ignoreTz) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1)) % (((int64_t)604800L) * factor);
} else {
timeVal = timeVal * factor / factor / 604800 * 604800 * factor;
}
} else {
colDataSetNULL(pOutput->columnData, i);
continue;
}
break;
}
default: {
timeVal = timeVal * 1;
break;
}
// truncate the timestamp to time_unit precision
int64_t seconds = timeUnit / TSDB_TICK_PER_SECOND(timePrec);
if (ignoreTz && (seconds == 604800 || seconds == 86400)) {
timeVal = timeVal - (timeVal + offsetFromTz(timezone, TSDB_TICK_PER_SECOND(timePrec))) % timeUnit;
} else {
timeVal = timeVal / timeUnit * timeUnit;
}
// truncate the timestamp to db precision
switch (timePrec) {
case TSDB_TIME_PRECISION_MILLI: {
if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal / 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000000;
}
break;
}
case TSDB_TIME_PRECISION_MICRO: {
if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal * 1000;
}
break;
}
case TSDB_TIME_PRECISION_NANO: {
if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
timeVal = timeVal * 1000;
} else if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
timeVal = timeVal * 1000000;
}
break;
}
}
colDataSetVal(pOutput->columnData, i, (char *)&timeVal, false);
}

View File

@ -958,6 +958,8 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
taosArrayDestroy(pRecycleList);
(void)streamMetaCommit(pMeta);
}
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {

View File

@ -1222,20 +1222,24 @@ int32_t taosUmaskFile(int32_t maskVal) {
int32_t taosGetErrorFile(TdFilePtr pFile) { return errno; }
int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
int64_t ret = -1;
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
if (pFile == NULL || ptrBuf == NULL) {
return -1;
goto END;
}
if (*ptrBuf != NULL) {
taosMemoryFreeClear(*ptrBuf);
}
ASSERT(pFile->fp != NULL);
if (pFile->fp == NULL) {
return -1;
goto END;
}
#ifdef WINDOWS
size_t bufferSize = 512;
*ptrBuf = taosMemoryMalloc(bufferSize);
if (*ptrBuf == NULL) return -1;
if (*ptrBuf == NULL) goto END;
size_t bytesRead = 0;
size_t totalBytesRead = 0;
@ -1244,7 +1248,7 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
char *result = fgets(*ptrBuf + totalBytesRead, bufferSize - totalBytesRead, pFile->fp);
if (result == NULL) {
taosMemoryFreeClear(*ptrBuf);
return -1;
goto END;
}
bytesRead = strlen(*ptrBuf + totalBytesRead);
totalBytesRead += bytesRead;
@ -1257,18 +1261,24 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) {
void *newBuf = taosMemoryRealloc(*ptrBuf, bufferSize);
if (newBuf == NULL) {
taosMemoryFreeClear(*ptrBuf);
return -1;
goto END;
}
*ptrBuf = newBuf;
}
(*ptrBuf)[totalBytesRead] = '\0';
return totalBytesRead;
ret = totalBytesRead;
#else
size_t len = 0;
return getline(ptrBuf, &len, pFile->fp);
ret = getline(ptrBuf, &len, pFile->fp);
#endif
END:
#if FILE_WITH_LOCK
taosThreadRwlockUnlock(&(pFile->rwlock));
#endif
return ret;
}
int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) {

View File

@ -23,6 +23,7 @@
#include "tlog.h"
#include "tunit.h"
#include "tutil.h"
#include "tglobal.h"
#define CFG_NAME_PRINT_LEN 24
#define CFG_SRC_PRINT_LEN 12
@ -432,6 +433,18 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
}
switch (pItem->dtype) {
case CFG_DTYPE_STRING:{
if(strcasecmp(name, "slowLogScope") == 0){
char* tmp = taosStrdup(pVal);
if(taosSetSlowLogScope(tmp) < 0){
terrno = TSDB_CODE_INVALID_CFG;
cfgUnLock(pCfg);
taosMemoryFree(tmp);
return -1;
}
taosMemoryFree(tmp);
}
} break;
case CFG_DTYPE_BOOL: {
int32_t ival = (int32_t)atoi(pVal);
if (ival != 0 && ival != 1) {

View File

@ -1488,12 +1488,14 @@
,,y,script,./test.sh -f tmp/monitor.sim
,,y,script,./test.sh -f tsim/tagindex/add_index.sim
,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim
,,y,script,./test.sh -f tsim/tagindex/indexOverflow.sim
,,y,script,./test.sh -f tsim/view/view.sim
,,y,script,./test.sh -f tsim/query/cache_last.sim
,,y,script,./test.sh -f tsim/query/const.sim
,,y,script,./test.sh -f tsim/query/nestedJoinView.sim
#develop test
,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py
,,n,develop-test,python3 ./test.py -f 2-query/pseudo_column.py

View File

@ -0,0 +1,82 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step0
$dbPrefix = ta_3_db
$tbPrefix = ta_3_tb
$mtPrefix = ta_3_mt
$lastRowNum = 0
$tbNum = 100000
$rowNum = 20
$totalNum = 200
print =============== create database
sql create database $dbPrefix
sql use $dbPrefix
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 tinyint, t1c tinyint)
$i = 0
$tinyLimit = 127
$tinyTable = tinyTable
while $i < $tinyLimit
$tb = $tinyTable . $i
sql insert into $tb using $mtPrefix tags( $i , $i ) values( now , $i )
$i = $i + 1
endw
$i = 0
$maxTinyLimit = 200
# 1. compress index and no-index to verify resultset
# 2. compress resultset of index filter and scalar filter
while $i < $maxTinyLimit
sql select * from $mtPrefix where t1 <= $i
$lastRowNum = $rows
sql select * from $mtPrefix where t1c <= $i
if $lastRowNum != $rows then
return -1
endi
$i = $i + 1
endw
$tbPrefix = ta_3_tb_c
$mtPrefix = ta_3_mt_c
$colPrefix = 'col'
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 nchar(18), t1c nchar(18))
$i = 0
$tinyLimit = 127
while $i < $tinyLimit
$tb = $tbPrefix . $i
sql insert into $tb using $mtPrefix tags( $colPrefix , $colPrefix ) values( now , $i )
$i = $i + 1
endw
$i = 0
$maxTinyLimit = 200
# 1. compress index and no-index to verify resultset
# 2. compress resultset of index filter and scalar filter
while $i < $maxTinyLimit
sql select * from $mtPrefix where t1 <= $i
$lastRowNum = $rows
sql select * from $mtPrefix where t1c <= $i
if $lastRowNum != $rows then
return -1
endi
$i = $i + 1
endw
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -401,6 +401,7 @@
./test.sh -f tsim/tag/tbNameIn.sim
./test.sh -f tmp/monitor.sim
./test.sh -f tsim/tagindex/add_index.sim
./test.sh -f tsim/tagindex/indexOverflow.sim
./test.sh -f tsim/tagindex/sma_and_tag_index.sim
./test.sh -f tsim/view/view.sim
./test.sh -f tsim/query/cache_last.sim

View File

@ -19,6 +19,61 @@ class TDTestCase:
self.dbname = 'db'
self.stbname = f'{self.dbname}.stb'
self.ntbname = f'{self.dbname}.ntb'
def check_timestamp_precision(self):
time_zone = time.strftime('%z')
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname} precision "us"')
tdSql.execute(f'use {self.dbname}')
tdSql.execute(f'create table if not exists {self.ntbname}(ts timestamp, c1 int, c2 timestamp)')
tdSql.execute(f'insert into {self.ntbname} values(now,1,today())')
ts_list = ['1', '11', '111', '1111', '11111', '111111', '1111111', '11111111', '111111111', '1111111111',
'11111111111','111111111111','1111111111111','11111111111111','111111111111111','1111111111111111',
'11111111111111111','111111111111111111','1111111111111111111']
res_list_ms = ['1970-01-01T08:00:00.001+0800', '1970-01-01T08:00:00.011+0800', '1970-01-01T08:00:00.111+0800',
'1970-01-01T08:00:01.111+0800', '1970-01-01T08:00:11.111+0800', '1970-01-01T08:01:51.111+0800',
'1970-01-01T08:18:31.111+0800', '1970-01-01T11:05:11.111+0800', '1970-01-02T14:51:51.111+0800',
'1970-01-14T04:38:31.111+0800', '1970-05-09T22:25:11.111+0800', '1973-07-10T08:11:51.111+0800',
'2005-03-18T09:58:31.111+0800', '2322-02-06T03:45:11.111+0800', '5490-12-21T13:31:51.111+0800',
'37179-09-17T15:18:31.111+0800', '354067-02-04T09:05:11.111+0800',
'3522940-12-11T18:51:51.111+0800', '35211679-06-14T20:38:31.111+0800']
res_list_us = ['1970-01-01T08:00:00.000001+0800', '1970-01-01T08:00:00.000011+0800',
'1970-01-01T08:00:00.000111+0800', '1970-01-01T08:00:00.001111+0800',
'1970-01-01T08:00:00.011111+0800', '1970-01-01T08:00:00.111111+0800',
'1970-01-01T08:00:01.111111+0800', '1970-01-01T08:00:11.111111+0800',
'1970-01-01T08:01:51.111111+0800', '1970-01-01T08:18:31.111111+0800',
'1970-01-01T11:05:11.111111+0800', '1970-01-02T14:51:51.111111+0800',
'1970-01-14T04:38:31.111111+0800', '1970-05-09T22:25:11.111111+0800',
'1973-07-10T08:11:51.111111+0800', '2005-03-18T09:58:31.111111+0800',
'2322-02-06T03:45:11.111111+0800', '5490-12-21T13:31:51.111111+0800',
'37179-09-17T15:18:31.111111+0800']
res_list_ns = ['1970-01-01T08:00:00.000000001+0800', '1970-01-01T08:00:00.000000011+0800',
'1970-01-01T08:00:00.000000111+0800', '1970-01-01T08:00:00.000001111+0800',
'1970-01-01T08:00:00.000011111+0800', '1970-01-01T08:00:00.000111111+0800',
'1970-01-01T08:00:00.001111111+0800', '1970-01-01T08:00:00.011111111+0800',
'1970-01-01T08:00:00.111111111+0800', '1970-01-01T08:00:01.111111111+0800',
'1970-01-01T08:00:11.111111111+0800', '1970-01-01T08:01:51.111111111+0800',
'1970-01-01T08:18:31.111111111+0800', '1970-01-01T11:05:11.111111111+0800',
'1970-01-02T14:51:51.111111111+0800', '1970-01-14T04:38:31.111111111+0800',
'1970-05-09T22:25:11.111111111+0800', '1973-07-10T08:11:51.111111111+0800',
'2005-03-18T09:58:31.111111111+0800']
# test to_iso8601's precision with default precision 'ms'
for i in range(len(ts_list)):
tdSql.query(f'select to_iso8601({ts_list[i]})')
tdSql.checkEqual(tdSql.queryResult[0][0],res_list_ms[i])
# test to_iso8601's precision with table's precision 'us'
for i in range(len(ts_list)):
tdSql.query(f'select to_iso8601({ts_list[i]}) from {self.ntbname}')
tdSql.checkEqual(tdSql.queryResult[0][0],res_list_us[i])
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database {self.dbname} precision "ns"')
tdSql.execute(f'use {self.dbname}')
tdSql.execute(f'create table if not exists {self.ntbname}(ts timestamp, c1 int, c2 timestamp)')
tdSql.execute(f'insert into {self.ntbname} values(now,1,today())')
# test to_iso8601's precision with table's precision 'ns'
for i in range(len(ts_list)):
tdSql.query(f'select to_iso8601({ts_list[i]}) from {self.ntbname}')
tdSql.checkEqual(tdSql.queryResult[0][0],res_list_ns[i])
def check_customize_param_ms(self):
time_zone = time.strftime('%z')
tdSql.execute(f'drop database if exists {self.dbname}')
@ -65,7 +120,7 @@ class TDTestCase:
tdSql.checkRows(1)
for i in range(0,3):
tdSql.query("select to_iso8601(1) from db.ntb")
tdSql.checkData(i,0,"1970-01-01T08:00:01+0800")
tdSql.checkData(i,0,"1970-01-01T08:00:00.001+0800")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(ts) from db.ntb")
tdSql.checkRows(3)
@ -97,7 +152,7 @@ class TDTestCase:
tdSql.checkRows(3)
tdSql.query("select to_iso8601(1) from db.stb")
for i in range(0,3):
tdSql.checkData(i,0,"1970-01-01T08:00:01+0800")
tdSql.checkData(i,0,"1970-01-01T08:00:00.001+0800")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(ts) from db.stb")
tdSql.checkRows(3)
@ -113,6 +168,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method
self.check_base_function()
self.check_customize_param_ms()
self.check_timestamp_precision()
def stop(self):
tdSql.close()

View File

@ -22,6 +22,7 @@ class TDTestCase:
'2020-4-1 00:00:00.001002',
'2020-5-1 00:00:00.001002001'
]
self.unix_ts = ['1','1111','1111111','1111111111','1111111111111']
self.db_param_precision = ['ms','us','ns']
self.time_unit = ['1w','1d','1h','1m','1s','1a','1u','1b']
self.error_unit = ['2w','2d','2h','2m','2s','2a','2u','1c','#1']
@ -134,7 +135,7 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 )
else:
# assuming the client timezone is UTC+0800
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i] - (date_time[i] + 8 * 3600 * 1000000) % (86400 * 1000000)))
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i] - (date_time[i] + 8 * 3600 * 1000000000) % (86400 * 1000000000)))
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
if self.rest_tag != 'rest':
@ -167,16 +168,49 @@ class TDTestCase:
self.check_tb_type(unit,tb_type,ignore_tz)
tdSql.checkRows(len(self.ts_str))
self.check_ms_timestamp(unit,date_time,ignore_tz)
for uts in self.unix_ts:
ans_time = []
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.stbname}')
for i in range(len(self.ts_str)):
ans_time.append(int(uts))
self.check_ms_timestamp(unit, ans_time, ignore_tz)
elif precision.lower() == 'us':
for ignore_tz in tz_options:
self.check_tb_type(unit,tb_type,ignore_tz)
tdSql.checkRows(len(self.ts_str))
self.check_us_timestamp(unit,date_time,ignore_tz)
for uts in self.unix_ts:
ans_time = []
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.stbname}')
for i in range(len(self.ts_str)):
ans_time.append(int(uts))
self.check_us_timestamp(unit, ans_time, ignore_tz)
elif precision.lower() == 'ns':
for ignore_tz in tz_options:
self.check_tb_type(unit,tb_type, ignore_tz)
tdSql.checkRows(len(self.ts_str))
self.check_ns_timestamp(unit,date_time,ignore_tz)
for uts in self.unix_ts:
ans_time = []
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.stbname}')
for i in range(len(self.ts_str)):
ans_time.append(int(uts))
self.check_ns_timestamp(unit, ans_time, ignore_tz)
for unit in self.error_unit:
if tb_type.lower() == 'ntb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.ntbname}')

View File

@ -516,17 +516,17 @@ class TDTestCase:
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
consumer1 = Consumer(consumer_dict)
try:
consumer.subscribe(["t1"])
consumer1.subscribe(["t1"])
except TmqError:
tdLog.exit(f"subscribe error")
index = 0
try:
while True:
res = consumer.poll(1)
res = consumer1.poll(1)
if not res:
if index != 1:
tdLog.exit("consume error")
@ -543,18 +543,85 @@ class TDTestCase:
index += 1
finally:
consumer.close()
consumer1.close()
consumer1 = Consumer(consumer_dict)
consumer2 = Consumer(consumer_dict)
try:
consumer1.subscribe(["t2"])
consumer2.subscribe(["t2"])
except TmqError:
tdLog.exit(f"subscribe error")
tdSql.query(f'show subscriptions')
tdSql.checkRows(2)
tdSql.checkData(0, 0, "t2")
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(1, 0, 't1')
tdSql.checkData(1, 1, 'g1')
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.execute(f'drop consumer group g1 on t1')
tdSql.query(f'show consumers')
tdSql.checkRows(1)
consumer1.close()
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.query(f'show subscriptions')
tdSql.checkRows(1)
tdSql.checkData(0, 0, "t2")
tdSql.checkData(0, 1, 'g1')
index = 0
try:
while True:
res = consumer2.poll(1)
if not res:
if index != 1:
tdLog.exit("consume error")
break
val = res.value()
if val is None:
continue
cnt = 0;
for block in val:
cnt += len(block.fetchall())
if cnt != 8:
tdLog.exit("consume error")
index += 1
finally:
consumer2.close()
consumer3 = Consumer(consumer_dict)
try:
consumer3.subscribe(["t2"])
except TmqError:
tdLog.exit(f"subscribe error")
tdSql.query(f'show consumers')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'g1')
tdSql.checkData(0, 4, 't2')
tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)')
try:
res = consumer3.poll(1)
if not res:
tdLog.exit("consume1 error")
finally:
consumer3.close()
tdSql.query(f'show consumers')
tdSql.checkRows(0)
tdSql.query(f'show subscriptions')
tdSql.checkRows(1)
tdSql.checkData(0, 0, "t2")
tdSql.checkData(0, 1, 'g1')
tdSql.execute(f'drop topic t1')
tdSql.execute(f'drop topic t2')
tdSql.execute(f'drop database d1')